Skip to main content

signet_cold/task/
runner.rs

1//! Cold storage task runner.
2//!
3//! The [`ColdStorageTask`] processes requests from channels and dispatches
4//! them to the storage backend. Reads and writes use separate channels:
5//!
6//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks
7//! - **Writes**: Processed sequentially (inline await) to maintain ordering
8//!
9//! Transaction, receipt, and header lookups are served from an LRU cache,
10//! avoiding repeated backend reads for frequently queried items.
11//!
12//! # Log Streaming
13//!
14//! The task owns the streaming configuration (max deadline, concurrency
15//! limit) and delegates the streaming loop to the backend via
16//! [`ColdStorage::produce_log_stream`]. Callers supply a per-request
17//! deadline that is clamped to the task's configured maximum.
18
19use super::cache::ColdCache;
20use crate::{
21    ColdReadRequest, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageHandle,
22    ColdWriteRequest, Confirmed, HeaderSpecifier, LogStream, ReceiptSpecifier,
23    TransactionSpecifier,
24};
25use signet_storage_types::{RecoveredTx, SealedHeader};
26use std::{sync::Arc, time::Duration};
27use tokio::sync::{Mutex, Semaphore, mpsc};
28use tokio_stream::wrappers::ReceiverStream;
29use tokio_util::{sync::CancellationToken, task::TaskTracker};
30use tracing::{debug, instrument};
31
32/// Default maximum deadline for streaming operations.
33const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60);
34
35/// Channel size for cold storage read requests.
36const READ_CHANNEL_SIZE: usize = 256;
37
38/// Channel size for cold storage write requests.
39const WRITE_CHANNEL_SIZE: usize = 256;
40
41/// Maximum concurrent read request handlers.
42const MAX_CONCURRENT_READERS: usize = 64;
43
44/// Maximum concurrent streaming operations.
45const MAX_CONCURRENT_STREAMS: usize = 8;
46
47/// Channel buffer size for streaming operations.
48const STREAM_CHANNEL_BUFFER: usize = 256;
49
50/// Shared state for the cold storage task, holding the backend and cache.
51///
52/// This is wrapped in an `Arc` so that spawned read handlers can access
53/// the backend and cache without moving ownership.
54struct ColdStorageTaskInner<B> {
55    backend: B,
56    cache: Mutex<ColdCache>,
57    max_stream_deadline: Duration,
58    stream_semaphore: Arc<Semaphore>,
59}
60
61impl<B: ColdStorage> ColdStorageTaskInner<B> {
62    /// Fetch a header from the backend and cache the result.
63    async fn fetch_and_cache_header(
64        &self,
65        spec: HeaderSpecifier,
66    ) -> ColdResult<Option<SealedHeader>> {
67        let r = self.backend.get_header(spec).await;
68        if let Ok(Some(ref h)) = r {
69            self.cache.lock().await.put_header(h.number, h.clone());
70        }
71        r
72    }
73
74    /// Fetch a transaction from the backend and cache the result.
75    async fn fetch_and_cache_tx(
76        &self,
77        spec: TransactionSpecifier,
78    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
79        let r = self.backend.get_transaction(spec).await;
80        if let Ok(Some(ref c)) = r {
81            let meta = c.meta();
82            self.cache
83                .lock()
84                .await
85                .put_tx((meta.block_number(), meta.transaction_index()), c.clone());
86        }
87        r
88    }
89
90    /// Fetch a receipt from the backend and cache the result.
91    async fn fetch_and_cache_receipt(
92        &self,
93        spec: ReceiptSpecifier,
94    ) -> ColdResult<Option<ColdReceipt>> {
95        let r = self.backend.get_receipt(spec).await;
96        if let Ok(Some(ref c)) = r {
97            self.cache.lock().await.put_receipt((c.block_number, c.transaction_index), c.clone());
98        }
99        r
100    }
101
102    /// Handle a read request, checking the cache first where applicable.
103    async fn handle_read(self: &Arc<Self>, req: ColdReadRequest) {
104        match req {
105            ColdReadRequest::GetHeader { spec, resp } => {
106                let result = if let HeaderSpecifier::Number(n) = &spec {
107                    if let Some(hit) = self.cache.lock().await.get_header(n) {
108                        Ok(Some(hit))
109                    } else {
110                        self.fetch_and_cache_header(spec).await
111                    }
112                } else {
113                    self.fetch_and_cache_header(spec).await
114                };
115                let _ = resp.send(result);
116            }
117            ColdReadRequest::GetHeaders { specs, resp } => {
118                let _ = resp.send(self.backend.get_headers(specs).await);
119            }
120            ColdReadRequest::GetTransaction { spec, resp } => {
121                let result = if let TransactionSpecifier::BlockAndIndex { block, index } = &spec {
122                    if let Some(hit) = self.cache.lock().await.get_tx(&(*block, *index)) {
123                        Ok(Some(hit))
124                    } else {
125                        self.fetch_and_cache_tx(spec).await
126                    }
127                } else {
128                    self.fetch_and_cache_tx(spec).await
129                };
130                let _ = resp.send(result);
131            }
132            ColdReadRequest::GetTransactionsInBlock { block, resp } => {
133                let _ = resp.send(self.backend.get_transactions_in_block(block).await);
134            }
135            ColdReadRequest::GetTransactionCount { block, resp } => {
136                let _ = resp.send(self.backend.get_transaction_count(block).await);
137            }
138            ColdReadRequest::GetReceipt { spec, resp } => {
139                let result = if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec {
140                    if let Some(hit) = self.cache.lock().await.get_receipt(&(*block, *index)) {
141                        Ok(Some(hit))
142                    } else {
143                        self.fetch_and_cache_receipt(spec).await
144                    }
145                } else {
146                    self.fetch_and_cache_receipt(spec).await
147                };
148                let _ = resp.send(result);
149            }
150            ColdReadRequest::GetReceiptsInBlock { block, resp } => {
151                let _ = resp.send(self.backend.get_receipts_in_block(block).await);
152            }
153            ColdReadRequest::GetSignetEvents { spec, resp } => {
154                let _ = resp.send(self.backend.get_signet_events(spec).await);
155            }
156            ColdReadRequest::GetZenithHeader { spec, resp } => {
157                let _ = resp.send(self.backend.get_zenith_header(spec).await);
158            }
159            ColdReadRequest::GetZenithHeaders { spec, resp } => {
160                let _ = resp.send(self.backend.get_zenith_headers(spec).await);
161            }
162            ColdReadRequest::GetLogs { filter, max_logs, resp } => {
163                let _ = resp.send(self.backend.get_logs(&filter, max_logs).await);
164            }
165            ColdReadRequest::StreamLogs { filter, max_logs, deadline, resp } => {
166                let _ = resp.send(self.handle_stream_logs(*filter, max_logs, deadline).await);
167            }
168            ColdReadRequest::GetLatestBlock { resp } => {
169                let _ = resp.send(self.backend.get_latest_block().await);
170            }
171        }
172    }
173
174    /// Stream logs matching a filter.
175    ///
176    /// Acquires a concurrency permit, resolves the block range, then
177    /// spawns a producer task that delegates to
178    /// [`ColdStorage::produce_log_stream`].
179    async fn handle_stream_logs(
180        self: &Arc<Self>,
181        filter: crate::Filter,
182        max_logs: usize,
183        deadline: Duration,
184    ) -> ColdResult<LogStream> {
185        let permit = self
186            .stream_semaphore
187            .clone()
188            .acquire_owned()
189            .await
190            .map_err(|_| ColdStorageError::Cancelled)?;
191
192        let from = filter.get_from_block().unwrap_or(0);
193        let to = match filter.get_to_block() {
194            Some(to) => to,
195            None => {
196                let Some(latest) = self.backend.get_latest_block().await? else {
197                    let (_tx, rx) = mpsc::channel(1);
198                    return Ok(ReceiverStream::new(rx));
199                };
200                latest
201            }
202        };
203
204        let effective = deadline.min(self.max_stream_deadline);
205        let deadline_instant = tokio::time::Instant::now() + effective;
206        let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
207        let inner = Arc::clone(self);
208
209        tokio::spawn(async move {
210            let _permit = permit;
211            let params =
212                crate::StreamParams { from, to, max_logs, sender, deadline: deadline_instant };
213            inner.backend.produce_log_stream(&filter, params).await;
214        });
215
216        Ok(ReceiverStream::new(rx))
217    }
218
219    /// Handle a write request, invalidating the cache on truncation.
220    async fn handle_write(&self, req: ColdWriteRequest) {
221        match req {
222            ColdWriteRequest::AppendBlock(boxed) => {
223                let result = self.backend.append_block(boxed.data).await;
224                let _ = boxed.resp.send(result);
225            }
226            ColdWriteRequest::AppendBlocks { data, resp } => {
227                let result = self.backend.append_blocks(data).await;
228                let _ = resp.send(result);
229            }
230            ColdWriteRequest::TruncateAbove { block, resp } => {
231                let result = self.backend.truncate_above(block).await;
232                if result.is_ok() {
233                    self.cache.lock().await.invalidate_above(block);
234                }
235                let _ = resp.send(result);
236            }
237        }
238    }
239}
240
241/// The cold storage task that processes requests.
242///
243/// This task receives requests over separate read and write channels and
244/// dispatches them to the storage backend. It supports graceful shutdown
245/// via a cancellation token.
246///
247/// # Processing Model
248///
249/// - **Reads**: Spawned as concurrent tasks (up to 64 in flight).
250///   Multiple reads can execute in parallel.
251/// - **Writes**: Processed inline (sequential). Each write completes before
252///   the next is started, ensuring ordering.
253///
254/// This design prioritizes write ordering for correctness while allowing
255/// read throughput to scale with concurrency.
256///
257/// # Log Streaming
258///
259/// The task owns the streaming configuration (max deadline, concurrency
260/// limit) and delegates the streaming loop to the backend via
261/// [`ColdStorage::produce_log_stream`]. Callers supply a per-request
262/// deadline that is clamped to the task's configured maximum.
263///
264/// # Caching
265///
266/// Transaction, receipt, and header lookups are served from an LRU cache
267/// when possible. Cache entries are invalidated on
268/// [`truncate_above`](crate::ColdStorage::truncate_above) to handle reorgs.
269pub struct ColdStorageTask<B: ColdStorage> {
270    inner: Arc<ColdStorageTaskInner<B>>,
271    read_receiver: mpsc::Receiver<ColdReadRequest>,
272    write_receiver: mpsc::Receiver<ColdWriteRequest>,
273    cancel_token: CancellationToken,
274    /// Task tracker for concurrent read handlers only.
275    task_tracker: TaskTracker,
276}
277
278impl<B: ColdStorage> std::fmt::Debug for ColdStorageTask<B> {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        f.debug_struct("ColdStorageTask").finish_non_exhaustive()
281    }
282}
283
284impl<B: ColdStorage> ColdStorageTask<B> {
285    /// Create a new cold storage task and return its handle.
286    pub fn new(backend: B, cancel_token: CancellationToken) -> (Self, ColdStorageHandle) {
287        let (read_sender, read_receiver) = mpsc::channel(READ_CHANNEL_SIZE);
288        let (write_sender, write_receiver) = mpsc::channel(WRITE_CHANNEL_SIZE);
289        let task = Self {
290            inner: Arc::new(ColdStorageTaskInner {
291                backend,
292                cache: Mutex::new(ColdCache::new()),
293                max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE,
294                stream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)),
295            }),
296            read_receiver,
297            write_receiver,
298            cancel_token,
299            task_tracker: TaskTracker::new(),
300        };
301        let handle = ColdStorageHandle::new(read_sender, write_sender);
302        (task, handle)
303    }
304
305    /// Spawn the task and return the handle.
306    ///
307    /// The task will run until the cancellation token is triggered or the
308    /// channels are closed.
309    pub fn spawn(backend: B, cancel_token: CancellationToken) -> ColdStorageHandle {
310        let (task, handle) = Self::new(backend, cancel_token);
311        tokio::spawn(task.run());
312        handle
313    }
314
315    /// Run the task, processing requests until shutdown.
316    #[instrument(skip(self), name = "cold_storage_task")]
317    pub async fn run(mut self) {
318        debug!("Cold storage task started");
319
320        loop {
321            tokio::select! {
322                biased;
323
324                _ = self.cancel_token.cancelled() => {
325                    debug!("Cold storage task received cancellation signal");
326                    break;
327                }
328
329                maybe_write = self.write_receiver.recv() => {
330                    let Some(req) = maybe_write else {
331                        debug!("Cold storage write channel closed");
332                        break;
333                    };
334                    self.inner.handle_write(req).await;
335                }
336
337                maybe_read = self.read_receiver.recv() => {
338                    let Some(req) = maybe_read else {
339                        debug!("Cold storage read channel closed");
340                        break;
341                    };
342
343                    // Apply backpressure: wait if we've hit the concurrent reader limit
344                    while self.task_tracker.len() >= MAX_CONCURRENT_READERS {
345                        tokio::select! {
346                            _ = self.cancel_token.cancelled() => {
347                                debug!("Cancellation while waiting for read task slot");
348                                break;
349                            }
350                            _ = self.task_tracker.wait() => {}
351                        }
352                    }
353
354                    let inner = Arc::clone(&self.inner);
355                    self.task_tracker.spawn(async move {
356                        inner.handle_read(req).await;
357                    });
358                }
359            }
360        }
361
362        // Graceful shutdown: wait for in-progress read tasks to complete
363        debug!("Waiting for in-progress read handlers to complete");
364        self.task_tracker.close();
365        self.task_tracker.wait().await;
366        debug!("Cold storage task shut down gracefully");
367    }
368}