1use super::cache::ColdCache;
20use crate::{
21 ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle,
22 ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier,
23 TransactionSpecifier,
24};
25use signet_storage_types::{RecoveredTx, SealedHeader};
26use std::{sync::Arc, time::Duration};
27use tokio::sync::{Mutex, Semaphore, mpsc};
28use tokio_stream::wrappers::ReceiverStream;
29use tokio_util::{sync::CancellationToken, task::TaskTracker};
30use tracing::{debug, instrument};
31
32const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60);
34
35const READ_CHANNEL_SIZE: usize = 256;
37
38const WRITE_CHANNEL_SIZE: usize = 256;
40
41const MAX_CONCURRENT_READERS: usize = 64;
43
44const MAX_CONCURRENT_STREAMS: usize = 8;
46
47const STREAM_CHANNEL_BUFFER: usize = 256;
49
50struct ColdStorageTaskInner<B> {
55 backend: B,
56 cache: Mutex<ColdCache>,
57 max_stream_deadline: Duration,
58 stream_semaphore: Arc<Semaphore>,
59}
60
61impl<B: ColdStorage> ColdStorageTaskInner<B> {
62 async fn fetch_and_cache_header(
64 &self,
65 spec: HeaderSpecifier,
66 ) -> ColdResult<Option<SealedHeader>> {
67 let r = self.backend.get_header(spec).await;
68 if let Ok(Some(ref h)) = r {
69 self.cache.lock().await.put_header(h.number, h.clone());
70 }
71 r
72 }
73
74 async fn fetch_and_cache_tx(
76 &self,
77 spec: TransactionSpecifier,
78 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
79 let r = self.backend.get_transaction(spec).await;
80 if let Ok(Some(ref c)) = r {
81 let meta = c.meta();
82 self.cache
83 .lock()
84 .await
85 .put_tx((meta.block_number(), meta.transaction_index()), c.clone());
86 }
87 r
88 }
89
90 async fn fetch_and_cache_receipt(
92 &self,
93 spec: ReceiptSpecifier,
94 ) -> ColdResult<Option<ColdReceipt>> {
95 let r = self.backend.get_receipt(spec).await;
96 if let Ok(Some(ref c)) = r {
97 self.cache.lock().await.put_receipt((c.block_number, c.transaction_index), c.clone());
98 }
99 r
100 }
101
102 async fn handle_read(self: &Arc<Self>, req: ColdReadRequest) {
104 match req {
105 ColdReadRequest::GetHeader { spec, resp } => {
106 let result = if let HeaderSpecifier::Number(n) = &spec {
107 if let Some(hit) = self.cache.lock().await.get_header(n) {
108 Ok(Some(hit))
109 } else {
110 self.fetch_and_cache_header(spec).await
111 }
112 } else {
113 self.fetch_and_cache_header(spec).await
114 };
115 let _ = resp.send(result);
116 }
117 ColdReadRequest::GetHeaders { specs, resp } => {
118 let _ = resp.send(self.backend.get_headers(specs).await);
119 }
120 ColdReadRequest::GetTransaction { spec, resp } => {
121 let result = if let TransactionSpecifier::BlockAndIndex { block, index } = &spec {
122 if let Some(hit) = self.cache.lock().await.get_tx(&(*block, *index)) {
123 Ok(Some(hit))
124 } else {
125 self.fetch_and_cache_tx(spec).await
126 }
127 } else {
128 self.fetch_and_cache_tx(spec).await
129 };
130 let _ = resp.send(result);
131 }
132 ColdReadRequest::GetTransactionsInBlock { block, resp } => {
133 let _ = resp.send(self.backend.get_transactions_in_block(block).await);
134 }
135 ColdReadRequest::GetTransactionCount { block, resp } => {
136 let _ = resp.send(self.backend.get_transaction_count(block).await);
137 }
138 ColdReadRequest::GetReceipt { spec, resp } => {
139 let result = if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec {
140 if let Some(hit) = self.cache.lock().await.get_receipt(&(*block, *index)) {
141 Ok(Some(hit))
142 } else {
143 self.fetch_and_cache_receipt(spec).await
144 }
145 } else {
146 self.fetch_and_cache_receipt(spec).await
147 };
148 let _ = resp.send(result);
149 }
150 ColdReadRequest::GetReceiptsInBlock { block, resp } => {
151 let _ = resp.send(self.backend.get_receipts_in_block(block).await);
152 }
153 ColdReadRequest::GetSignetEvents { spec, resp } => {
154 let _ = resp.send(self.backend.get_signet_events(spec).await);
155 }
156 ColdReadRequest::GetZenithHeader { spec, resp } => {
157 let _ = resp.send(self.backend.get_zenith_header(spec).await);
158 }
159 ColdReadRequest::GetZenithHeaders { spec, resp } => {
160 let _ = resp.send(self.backend.get_zenith_headers(spec).await);
161 }
162 ColdReadRequest::GetLogs { filter, max_logs, resp } => {
163 let _ = resp.send(self.backend.get_logs(&filter, max_logs).await);
164 }
165 ColdReadRequest::StreamLogs { filter, max_logs, deadline, resp } => {
166 let _ = resp.send(self.handle_stream_logs(*filter, max_logs, deadline).await);
167 }
168 ColdReadRequest::GetLatestBlock { resp } => {
169 let _ = resp.send(self.backend.get_latest_block().await);
170 }
171 }
172 }
173
174 async fn handle_stream_logs(
180 self: &Arc<Self>,
181 filter: crate::Filter,
182 max_logs: usize,
183 deadline: Duration,
184 ) -> ColdResult<LogStream> {
185 let permit = self
186 .stream_semaphore
187 .clone()
188 .acquire_owned()
189 .await
190 .map_err(|_| ColdStorageError::Cancelled)?;
191
192 let from = filter.get_from_block().unwrap_or(0);
193 let to = match filter.get_to_block() {
194 Some(to) => to,
195 None => {
196 let Some(latest) = self.backend.get_latest_block().await? else {
197 let (_tx, rx) = mpsc::channel(1);
198 return Ok(ReceiverStream::new(rx));
199 };
200 latest
201 }
202 };
203
204 let effective = deadline.min(self.max_stream_deadline);
205 let deadline_instant = tokio::time::Instant::now() + effective;
206 let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
207 let inner = Arc::clone(self);
208
209 tokio::spawn(async move {
210 let _permit = permit;
211 let params =
212 crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant };
213 inner.backend.produce_log_stream(&filter, params).await;
214 });
215
216 Ok(ReceiverStream::new(rx))
217 }
218
219 async fn handle_write(&self, req: ColdWriteRequest) {
221 match req {
222 ColdWriteRequest::AppendBlock(boxed) => {
223 let result = self.backend.append_block(boxed.data).await;
224 let _ = boxed.resp.send(result);
225 }
226 ColdWriteRequest::AppendBlocks { data, resp } => {
227 let result = self.backend.append_blocks(data).await;
228 let _ = resp.send(result);
229 }
230 ColdWriteRequest::TruncateAbove { block, resp } => {
231 let result = self.backend.truncate_above(block).await;
232 if result.is_ok() {
233 self.cache.lock().await.invalidate_above(block);
234 }
235 let _ = resp.send(result);
236 }
237 }
238 }
239}
240
241pub struct ColdStorageTask<B: ColdStorage> {
270 inner: Arc<ColdStorageTaskInner<B>>,
271 read_receiver: mpsc::Receiver<ColdReadRequest>,
272 write_receiver: mpsc::Receiver<ColdWriteRequest>,
273 cancel_token: CancellationToken,
274 task_tracker: TaskTracker,
276}
277
278impl<B: ColdStorage> std::fmt::Debug for ColdStorageTask<B> {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 f.debug_struct("ColdStorageTask").finish_non_exhaustive()
281 }
282}
283
284impl<B: ColdStorage> ColdStorageTask<B> {
285 pub fn new(backend: B, cancel_token: CancellationToken) -> (Self, ColdStorageHandle) {
287 let (read_sender, read_receiver) = mpsc::channel(READ_CHANNEL_SIZE);
288 let (write_sender, write_receiver) = mpsc::channel(WRITE_CHANNEL_SIZE);
289 let task = Self {
290 inner: Arc::new(ColdStorageTaskInner {
291 backend,
292 cache: Mutex::new(ColdCache::new()),
293 max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE,
294 stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)),
295 }),
296 read_receiver,
297 write_receiver,
298 cancel_token,
299 task_tracker: TaskTracker::new(),
300 };
301 let handle = ColdStorageHandle::new(read_sender, write_sender);
302 (task, handle)
303 }
304
305 pub fn spawn(backend: B, cancel_token: CancellationToken) -> ColdStorageHandle {
310 let (task, handle) = Self::new(backend, cancel_token);
311 tokio::spawn(task.run());
312 handle
313 }
314
315 #[instrument(skip(self), name = "cold_storage_task")]
317 pub async fn run(mut self) {
318 debug!("Cold storage task started");
319
320 loop {
321 tokio::select! {
322 biased;
323
324 _ = self.cancel_token.cancelled() => {
325 debug!("Cold storage task received cancellation signal");
326 break;
327 }
328
329 maybe_write = self.write_receiver.recv() => {
330 let Some(req) = maybe_write else {
331 debug!("Cold storage write channel closed");
332 break;
333 };
334 self.inner.handle_write(req).await;
335 }
336
337 maybe_read = self.read_receiver.recv() => {
338 let Some(req) = maybe_read else {
339 debug!("Cold storage read channel closed");
340 break;
341 };
342
343 while self.task_tracker.len() >= MAX_CONCURRENT_READERS {
345 tokio::select! {
346 _ = self.cancel_token.cancelled() => {
347 debug!("Cancellation while waiting for read task slot");
348 break;
349 }
350 _ = self.task_tracker.wait() => {}
351 }
352 }
353
354 let inner = Arc::clone(&self.inner);
355 self.task_tracker.spawn(async move {
356 inner.handle_read(req).await;
357 });
358 }
359 }
360 }
361
362 debug!("Waiting for in-progress read handlers to complete");
364 self.task_tracker.close();
365 self.task_tracker.wait().await;
366 debug!("Cold storage task shut down gracefully");
367 }
368}