Skip to main content

signet_cold/task/
handle.rs

1//! Ergonomic handles for interacting with cold storage.
2//!
3//! This module provides two handle types:
4//!
5//! - [`ColdStorageHandle`]: Full access (reads and writes)
6//! - [`ColdStorageReadHandle`]: Read-only access
7//!
8//! Both handles can be cloned and shared across tasks. They use separate
9//! channels for reads and writes, allowing concurrent read processing while
10//! maintaining sequential write ordering.
11
12use crate::{
13    AppendBlockRequest, BlockData, ColdReadRequest, ColdReceipt, ColdResult, ColdStorageError,
14    ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog,
15    SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
16};
17use alloy::primitives::{B256, BlockNumber};
18use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
19use std::time::Duration;
20use tokio::sync::{mpsc, oneshot};
21
22/// Map a [`mpsc::error::TrySendError`] to the appropriate
23/// [`ColdStorageError`] variant.
24fn map_dispatch_error<T>(e: mpsc::error::TrySendError<T>) -> ColdStorageError {
25    match e {
26        mpsc::error::TrySendError::Full(_) => ColdStorageError::Backpressure,
27        mpsc::error::TrySendError::Closed(_) => ColdStorageError::TaskTerminated,
28    }
29}
30
31/// Read-only handle for interacting with the cold storage task.
32///
33/// This handle provides read access only and cannot perform write operations.
34/// It shares the read channel with [`ColdStorageHandle`], allowing multiple
35/// readers to coexist without affecting write throughput.
36///
37/// # Usage
38///
39/// Obtain a read handle via [`ColdStorageHandle::reader`]:
40///
41/// ```ignore
42/// let handle = ColdStorageTask::spawn(backend, cancel);
43/// let reader = handle.reader();
44///
45/// // Use reader for queries
46/// let header = reader.get_header_by_number(100).await?;
47/// ```
48///
49/// # Thread Safety
50///
51/// This handle is `Clone + Send + Sync` and can be shared across tasks.
52/// Multiple readers can query concurrently without blocking writes.
53#[derive(Clone, Debug)]
54pub struct ColdStorageReadHandle {
55    sender: mpsc::Sender<ColdReadRequest>,
56}
57
58impl ColdStorageReadHandle {
59    /// Create a new read-only handle with the given sender.
60    pub(crate) const fn new(sender: mpsc::Sender<ColdReadRequest>) -> Self {
61        Self { sender }
62    }
63
64    /// Send a read request and wait for the response.
65    async fn send<T>(
66        &self,
67        req: ColdReadRequest,
68        rx: oneshot::Receiver<ColdResult<T>>,
69    ) -> ColdResult<T> {
70        self.sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
71        rx.await.map_err(|_| ColdStorageError::Cancelled)?
72    }
73
74    // ==========================================================================
75    // Headers
76    // ==========================================================================
77
78    /// Get a header by specifier.
79    pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
80        let (resp, rx) = oneshot::channel();
81        self.send(ColdReadRequest::GetHeader { spec, resp }, rx).await
82    }
83
84    /// Get a header by block number.
85    pub async fn get_header_by_number(
86        &self,
87        block: BlockNumber,
88    ) -> ColdResult<Option<SealedHeader>> {
89        self.get_header(HeaderSpecifier::Number(block)).await
90    }
91
92    /// Get a header by block hash.
93    pub async fn get_header_by_hash(&self, hash: B256) -> ColdResult<Option<SealedHeader>> {
94        self.get_header(HeaderSpecifier::Hash(hash)).await
95    }
96
97    /// Get multiple headers by specifiers.
98    pub async fn get_headers(
99        &self,
100        specs: Vec<HeaderSpecifier>,
101    ) -> ColdResult<Vec<Option<SealedHeader>>> {
102        let (resp, rx) = oneshot::channel();
103        self.send(ColdReadRequest::GetHeaders { specs, resp }, rx).await
104    }
105
106    // ==========================================================================
107    // Transactions
108    // ==========================================================================
109
110    /// Get a transaction by specifier, with block confirmation metadata.
111    pub async fn get_transaction(
112        &self,
113        spec: TransactionSpecifier,
114    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
115        let (resp, rx) = oneshot::channel();
116        self.send(ColdReadRequest::GetTransaction { spec, resp }, rx).await
117    }
118
119    /// Get a transaction by hash.
120    pub async fn get_tx_by_hash(&self, hash: B256) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
121        self.get_transaction(TransactionSpecifier::Hash(hash)).await
122    }
123
124    /// Get a transaction by block number and index.
125    pub async fn get_tx_by_block_and_index(
126        &self,
127        block: BlockNumber,
128        index: u64,
129    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
130        self.get_transaction(TransactionSpecifier::BlockAndIndex { block, index }).await
131    }
132
133    /// Get a transaction by block hash and index.
134    pub async fn get_tx_by_block_hash_and_index(
135        &self,
136        block_hash: B256,
137        index: u64,
138    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
139        self.get_transaction(TransactionSpecifier::BlockHashAndIndex { block_hash, index }).await
140    }
141
142    /// Get all transactions in a block.
143    pub async fn get_transactions_in_block(
144        &self,
145        block: BlockNumber,
146    ) -> ColdResult<Vec<RecoveredTx>> {
147        let (resp, rx) = oneshot::channel();
148        self.send(ColdReadRequest::GetTransactionsInBlock { block, resp }, rx).await
149    }
150
151    /// Get the transaction count for a block.
152    pub async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
153        let (resp, rx) = oneshot::channel();
154        self.send(ColdReadRequest::GetTransactionCount { block, resp }, rx).await
155    }
156
157    // ==========================================================================
158    // Receipts
159    // ==========================================================================
160
161    /// Get a receipt by specifier.
162    pub async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
163        let (resp, rx) = oneshot::channel();
164        self.send(ColdReadRequest::GetReceipt { spec, resp }, rx).await
165    }
166
167    /// Get a receipt by transaction hash.
168    pub async fn get_receipt_by_tx_hash(&self, hash: B256) -> ColdResult<Option<ColdReceipt>> {
169        self.get_receipt(ReceiptSpecifier::TxHash(hash)).await
170    }
171
172    /// Get a receipt by block number and index.
173    pub async fn get_receipt_by_block_and_index(
174        &self,
175        block: BlockNumber,
176        index: u64,
177    ) -> ColdResult<Option<ColdReceipt>> {
178        self.get_receipt(ReceiptSpecifier::BlockAndIndex { block, index }).await
179    }
180
181    /// Get all receipts in a block.
182    pub async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
183        let (resp, rx) = oneshot::channel();
184        self.send(ColdReadRequest::GetReceiptsInBlock { block, resp }, rx).await
185    }
186
187    // ==========================================================================
188    // SignetEvents
189    // ==========================================================================
190
191    /// Get signet events by specifier.
192    pub async fn get_signet_events(
193        &self,
194        spec: SignetEventsSpecifier,
195    ) -> ColdResult<Vec<DbSignetEvent>> {
196        let (resp, rx) = oneshot::channel();
197        self.send(ColdReadRequest::GetSignetEvents { spec, resp }, rx).await
198    }
199
200    /// Get signet events in a block.
201    pub async fn get_signet_events_in_block(
202        &self,
203        block: BlockNumber,
204    ) -> ColdResult<Vec<DbSignetEvent>> {
205        self.get_signet_events(SignetEventsSpecifier::Block(block)).await
206    }
207
208    /// Get signet events in a range of blocks.
209    pub async fn get_signet_events_in_range(
210        &self,
211        start: BlockNumber,
212        end: BlockNumber,
213    ) -> ColdResult<Vec<DbSignetEvent>> {
214        self.get_signet_events(SignetEventsSpecifier::BlockRange { start, end }).await
215    }
216
217    // ==========================================================================
218    // ZenithHeaders
219    // ==========================================================================
220
221    /// Get a zenith header by block number.
222    pub async fn get_zenith_header(
223        &self,
224        block: BlockNumber,
225    ) -> ColdResult<Option<DbZenithHeader>> {
226        let (resp, rx) = oneshot::channel();
227        self.send(
228            ColdReadRequest::GetZenithHeader { spec: ZenithHeaderSpecifier::Number(block), resp },
229            rx,
230        )
231        .await
232    }
233
234    /// Get zenith headers by specifier.
235    pub async fn get_zenith_headers(
236        &self,
237        spec: ZenithHeaderSpecifier,
238    ) -> ColdResult<Vec<DbZenithHeader>> {
239        let (resp, rx) = oneshot::channel();
240        self.send(ColdReadRequest::GetZenithHeaders { spec, resp }, rx).await
241    }
242
243    /// Get zenith headers in a range of blocks.
244    pub async fn get_zenith_headers_in_range(
245        &self,
246        start: BlockNumber,
247        end: BlockNumber,
248    ) -> ColdResult<Vec<DbZenithHeader>> {
249        self.get_zenith_headers(ZenithHeaderSpecifier::Range { start, end }).await
250    }
251
252    // ==========================================================================
253    // Logs
254    // ==========================================================================
255
256    /// Filter logs by block range, address, and topics.
257    ///
258    /// Follows `eth_getLogs` semantics. Returns matching logs ordered by
259    /// `(block_number, tx_index, log_index)`.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`ColdStorageError::TooManyLogs`] if the query would produce
264    /// more than `max_logs` results.
265    pub async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
266        let (resp, rx) = oneshot::channel();
267        self.send(ColdReadRequest::GetLogs { filter: Box::new(filter), max_logs, resp }, rx).await
268    }
269
270    /// Stream logs matching a filter.
271    ///
272    /// Returns a [`LogStream`] that yields matching logs in order.
273    /// Consume with `StreamExt::next()` until `None`. If the last item
274    /// is `Err(...)`, an error occurred (deadline, too many logs, etc.).
275    ///
276    /// The `deadline` is clamped to the task's configured maximum.
277    ///
278    /// # Partial Delivery
279    ///
280    /// One or more `Ok(log)` items may be delivered before a terminal
281    /// `Err(...)`. Consumers must be prepared for partial results — for
282    /// example, a reorg or deadline expiry can interrupt a stream that
283    /// has already yielded some logs.
284    ///
285    /// # Resource Management
286    ///
287    /// The stream holds a backend concurrency permit. Dropping the
288    /// stream releases the permit. Drop early if results are no
289    /// longer needed.
290    pub async fn stream_logs(
291        &self,
292        filter: Filter,
293        max_logs: usize,
294        deadline: Duration,
295    ) -> ColdResult<LogStream> {
296        let (resp, rx) = oneshot::channel();
297        self.send(
298            ColdReadRequest::StreamLogs { filter: Box::new(filter), max_logs, deadline, resp },
299            rx,
300        )
301        .await
302    }
303
304    // ==========================================================================
305    // Metadata
306    // ==========================================================================
307
308    /// Get the latest block number in storage.
309    pub async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
310        let (resp, rx) = oneshot::channel();
311        self.send(ColdReadRequest::GetLatestBlock { resp }, rx).await
312    }
313}
314
315/// Handle for interacting with the cold storage task.
316///
317/// This handle provides full access to both read and write operations.
318/// It can be cloned and shared across tasks.
319///
320/// # Channel Separation
321///
322/// Internally, this handle uses separate channels for reads and writes:
323///
324/// - **Read channel**: Shared with [`ColdStorageReadHandle`]. Reads are
325///   processed concurrently (up to 64 in flight).
326/// - **Write channel**: Exclusive to this handle. Writes are processed
327///   sequentially to maintain ordering.
328///
329/// This design allows read-heavy workloads to proceed without being blocked
330/// by write operations, while ensuring write ordering is preserved.
331///
332/// # Read Access
333///
334/// All read methods from [`ColdStorageReadHandle`] are available on this
335/// handle via [`Deref`](std::ops::Deref).
336///
337/// # Usage
338///
339/// ```ignore
340/// let handle = ColdStorageTask::spawn(backend, cancel);
341///
342/// // Full access: reads and writes
343/// handle.append_block(data).await?;
344/// let header = handle.get_header_by_number(100).await?;
345///
346/// // Get a read-only handle for query-only use cases
347/// let reader = handle.reader();
348/// ```
349///
350/// # Thread Safety
351///
352/// This handle is `Clone + Send + Sync` and can be shared across tasks.
353#[derive(Clone, Debug)]
354pub struct ColdStorageHandle {
355    reader: ColdStorageReadHandle,
356    write_sender: mpsc::Sender<ColdWriteRequest>,
357}
358
359impl std::ops::Deref for ColdStorageHandle {
360    type Target = ColdStorageReadHandle;
361
362    fn deref(&self) -> &Self::Target {
363        &self.reader
364    }
365}
366
367impl ColdStorageHandle {
368    /// Create a new handle with the given senders.
369    pub(crate) const fn new(
370        read_sender: mpsc::Sender<ColdReadRequest>,
371        write_sender: mpsc::Sender<ColdWriteRequest>,
372    ) -> Self {
373        Self { reader: ColdStorageReadHandle::new(read_sender), write_sender }
374    }
375
376    /// Get a read-only handle that shares the read channel.
377    ///
378    /// The returned handle can only perform read operations and cannot
379    /// modify storage. Multiple read handles can coexist and query
380    /// concurrently without affecting write throughput.
381    pub fn reader(&self) -> ColdStorageReadHandle {
382        self.reader.clone()
383    }
384
385    /// Send a write request and wait for the response.
386    async fn send_write<T>(
387        &self,
388        req: ColdWriteRequest,
389        rx: oneshot::Receiver<ColdResult<T>>,
390    ) -> ColdResult<T> {
391        self.write_sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
392        rx.await.map_err(|_| ColdStorageError::Cancelled)?
393    }
394
395    // ==========================================================================
396    // Write Operations
397    // ==========================================================================
398
399    /// Append a single block to cold storage.
400    pub async fn append_block(&self, data: BlockData) -> ColdResult<()> {
401        let (resp, rx) = oneshot::channel();
402        self.send_write(
403            ColdWriteRequest::AppendBlock(Box::new(AppendBlockRequest { data, resp })),
404            rx,
405        )
406        .await
407    }
408
409    /// Append multiple blocks to cold storage.
410    pub async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
411        let (resp, rx) = oneshot::channel();
412        self.send_write(ColdWriteRequest::AppendBlocks { data, resp }, rx).await
413    }
414
415    /// Truncate all data above the given block number.
416    ///
417    /// This removes block N+1 and higher from all tables.
418    pub async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
419        let (resp, rx) = oneshot::channel();
420        self.send_write(ColdWriteRequest::TruncateAbove { block, resp }, rx).await
421    }
422
423    // ==========================================================================
424    // Synchronous Fire-and-Forget Dispatch
425    // ==========================================================================
426
427    /// Dispatch append blocks without waiting for response (non-blocking).
428    ///
429    /// Unlike [`append_blocks`](Self::append_blocks), this method returns
430    /// immediately without waiting for the write to complete. The write
431    /// result is discarded.
432    ///
433    /// # Errors
434    ///
435    /// - [`ColdStorageError::Backpressure`]: Channel is full. The task is alive
436    ///   but cannot keep up. Transient; may retry or accept the gap.
437    /// - [`ColdStorageError::TaskTerminated`]: Channel is closed. The task has
438    ///   stopped and must be restarted.
439    ///
440    /// In both cases, hot storage already contains the data and remains
441    /// authoritative.
442    pub fn dispatch_append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
443        let (resp, _rx) = oneshot::channel();
444        self.write_sender
445            .try_send(ColdWriteRequest::AppendBlocks { data, resp })
446            .map_err(map_dispatch_error)
447    }
448
449    /// Dispatch truncate without waiting for response (non-blocking).
450    ///
451    /// Unlike [`truncate_above`](Self::truncate_above), this method returns
452    /// immediately without waiting for the truncate to complete. The result
453    /// is discarded.
454    ///
455    /// # Errors
456    ///
457    /// Same as [`dispatch_append_blocks`](Self::dispatch_append_blocks). If
458    /// cold storage falls behind during a reorg, it may temporarily contain
459    /// stale data until the truncate is processed or replayed.
460    pub fn dispatch_truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
461        let (resp, _rx) = oneshot::channel();
462        self.write_sender
463            .try_send(ColdWriteRequest::TruncateAbove { block, resp })
464            .map_err(map_dispatch_error)
465    }
466}