signet_cold/task/handle.rs
1//! Ergonomic handles for interacting with cold storage.
2//!
3//! This module provides two handle types:
4//!
5//! - [`ColdStorageHandle`]: Full access (reads and writes)
6//! - [`ColdStorageReadHandle`]: Read-only access
7//!
8//! Both handles can be cloned and shared across tasks. They use separate
9//! channels for reads and writes, allowing concurrent read processing while
10//! maintaining sequential write ordering.
11
12use crate::{
13 AppendBlockRequest, BlockData, ColdReadRequest, ColdReceipt, ColdResult, ColdStorageError,
14 ColdWriteRequest, Confirmed, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog,
15 SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
16};
17use alloy::primitives::{B256, BlockNumber};
18use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
19use std::time::Duration;
20use tokio::sync::{mpsc, oneshot};
21
22/// Map a [`mpsc::error::TrySendError`] to the appropriate
23/// [`ColdStorageError`] variant.
24fn map_dispatch_error<T>(e: mpsc::error::TrySendError<T>) -> ColdStorageError {
25 match e {
26 mpsc::error::TrySendError::Full(_) => ColdStorageError::Backpressure,
27 mpsc::error::TrySendError::Closed(_) => ColdStorageError::TaskTerminated,
28 }
29}
30
31/// Read-only handle for interacting with the cold storage task.
32///
33/// This handle provides read access only and cannot perform write operations.
34/// It shares the read channel with [`ColdStorageHandle`], allowing multiple
35/// readers to coexist without affecting write throughput.
36///
37/// # Usage
38///
39/// Obtain a read handle via [`ColdStorageHandle::reader`]:
40///
41/// ```ignore
42/// let handle = ColdStorageTask::spawn(backend, cancel);
43/// let reader = handle.reader();
44///
45/// // Use reader for queries
46/// let header = reader.get_header_by_number(100).await?;
47/// ```
48///
49/// # Thread Safety
50///
51/// This handle is `Clone + Send + Sync` and can be shared across tasks.
52/// Multiple readers can query concurrently without blocking writes.
53#[derive(Clone, Debug)]
54pub struct ColdStorageReadHandle {
55 sender: mpsc::Sender<ColdReadRequest>,
56}
57
58impl ColdStorageReadHandle {
59 /// Create a new read-only handle with the given sender.
60 pub(crate) const fn new(sender: mpsc::Sender<ColdReadRequest>) -> Self {
61 Self { sender }
62 }
63
64 /// Send a read request and wait for the response.
65 async fn send<T>(
66 &self,
67 req: ColdReadRequest,
68 rx: oneshot::Receiver<ColdResult<T>>,
69 ) -> ColdResult<T> {
70 self.sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
71 rx.await.map_err(|_| ColdStorageError::Cancelled)?
72 }
73
74 // ==========================================================================
75 // Headers
76 // ==========================================================================
77
78 /// Get a header by specifier.
79 pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
80 let (resp, rx) = oneshot::channel();
81 self.send(ColdReadRequest::GetHeader { spec, resp }, rx).await
82 }
83
84 /// Get a header by block number.
85 pub async fn get_header_by_number(
86 &self,
87 block: BlockNumber,
88 ) -> ColdResult<Option<SealedHeader>> {
89 self.get_header(HeaderSpecifier::Number(block)).await
90 }
91
92 /// Get a header by block hash.
93 pub async fn get_header_by_hash(&self, hash: B256) -> ColdResult<Option<SealedHeader>> {
94 self.get_header(HeaderSpecifier::Hash(hash)).await
95 }
96
97 /// Get multiple headers by specifiers.
98 pub async fn get_headers(
99 &self,
100 specs: Vec<HeaderSpecifier>,
101 ) -> ColdResult<Vec<Option<SealedHeader>>> {
102 let (resp, rx) = oneshot::channel();
103 self.send(ColdReadRequest::GetHeaders { specs, resp }, rx).await
104 }
105
106 // ==========================================================================
107 // Transactions
108 // ==========================================================================
109
110 /// Get a transaction by specifier, with block confirmation metadata.
111 pub async fn get_transaction(
112 &self,
113 spec: TransactionSpecifier,
114 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
115 let (resp, rx) = oneshot::channel();
116 self.send(ColdReadRequest::GetTransaction { spec, resp }, rx).await
117 }
118
119 /// Get a transaction by hash.
120 pub async fn get_tx_by_hash(&self, hash: B256) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
121 self.get_transaction(TransactionSpecifier::Hash(hash)).await
122 }
123
124 /// Get a transaction by block number and index.
125 pub async fn get_tx_by_block_and_index(
126 &self,
127 block: BlockNumber,
128 index: u64,
129 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
130 self.get_transaction(TransactionSpecifier::BlockAndIndex { block, index }).await
131 }
132
133 /// Get a transaction by block hash and index.
134 pub async fn get_tx_by_block_hash_and_index(
135 &self,
136 block_hash: B256,
137 index: u64,
138 ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
139 self.get_transaction(TransactionSpecifier::BlockHashAndIndex { block_hash, index }).await
140 }
141
142 /// Get all transactions in a block.
143 pub async fn get_transactions_in_block(
144 &self,
145 block: BlockNumber,
146 ) -> ColdResult<Vec<RecoveredTx>> {
147 let (resp, rx) = oneshot::channel();
148 self.send(ColdReadRequest::GetTransactionsInBlock { block, resp }, rx).await
149 }
150
151 /// Get the transaction count for a block.
152 pub async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
153 let (resp, rx) = oneshot::channel();
154 self.send(ColdReadRequest::GetTransactionCount { block, resp }, rx).await
155 }
156
157 // ==========================================================================
158 // Receipts
159 // ==========================================================================
160
161 /// Get a receipt by specifier.
162 pub async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
163 let (resp, rx) = oneshot::channel();
164 self.send(ColdReadRequest::GetReceipt { spec, resp }, rx).await
165 }
166
167 /// Get a receipt by transaction hash.
168 pub async fn get_receipt_by_tx_hash(&self, hash: B256) -> ColdResult<Option<ColdReceipt>> {
169 self.get_receipt(ReceiptSpecifier::TxHash(hash)).await
170 }
171
172 /// Get a receipt by block number and index.
173 pub async fn get_receipt_by_block_and_index(
174 &self,
175 block: BlockNumber,
176 index: u64,
177 ) -> ColdResult<Option<ColdReceipt>> {
178 self.get_receipt(ReceiptSpecifier::BlockAndIndex { block, index }).await
179 }
180
181 /// Get all receipts in a block.
182 pub async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
183 let (resp, rx) = oneshot::channel();
184 self.send(ColdReadRequest::GetReceiptsInBlock { block, resp }, rx).await
185 }
186
187 // ==========================================================================
188 // SignetEvents
189 // ==========================================================================
190
191 /// Get signet events by specifier.
192 pub async fn get_signet_events(
193 &self,
194 spec: SignetEventsSpecifier,
195 ) -> ColdResult<Vec<DbSignetEvent>> {
196 let (resp, rx) = oneshot::channel();
197 self.send(ColdReadRequest::GetSignetEvents { spec, resp }, rx).await
198 }
199
200 /// Get signet events in a block.
201 pub async fn get_signet_events_in_block(
202 &self,
203 block: BlockNumber,
204 ) -> ColdResult<Vec<DbSignetEvent>> {
205 self.get_signet_events(SignetEventsSpecifier::Block(block)).await
206 }
207
208 /// Get signet events in a range of blocks.
209 pub async fn get_signet_events_in_range(
210 &self,
211 start: BlockNumber,
212 end: BlockNumber,
213 ) -> ColdResult<Vec<DbSignetEvent>> {
214 self.get_signet_events(SignetEventsSpecifier::BlockRange { start, end }).await
215 }
216
217 // ==========================================================================
218 // ZenithHeaders
219 // ==========================================================================
220
221 /// Get a zenith header by block number.
222 pub async fn get_zenith_header(
223 &self,
224 block: BlockNumber,
225 ) -> ColdResult<Option<DbZenithHeader>> {
226 let (resp, rx) = oneshot::channel();
227 self.send(
228 ColdReadRequest::GetZenithHeader { spec: ZenithHeaderSpecifier::Number(block), resp },
229 rx,
230 )
231 .await
232 }
233
234 /// Get zenith headers by specifier.
235 pub async fn get_zenith_headers(
236 &self,
237 spec: ZenithHeaderSpecifier,
238 ) -> ColdResult<Vec<DbZenithHeader>> {
239 let (resp, rx) = oneshot::channel();
240 self.send(ColdReadRequest::GetZenithHeaders { spec, resp }, rx).await
241 }
242
243 /// Get zenith headers in a range of blocks.
244 pub async fn get_zenith_headers_in_range(
245 &self,
246 start: BlockNumber,
247 end: BlockNumber,
248 ) -> ColdResult<Vec<DbZenithHeader>> {
249 self.get_zenith_headers(ZenithHeaderSpecifier::Range { start, end }).await
250 }
251
252 // ==========================================================================
253 // Logs
254 // ==========================================================================
255
256 /// Filter logs by block range, address, and topics.
257 ///
258 /// Follows `eth_getLogs` semantics. Returns matching logs ordered by
259 /// `(block_number, tx_index, log_index)`.
260 ///
261 /// # Errors
262 ///
263 /// Returns [`ColdStorageError::TooManyLogs`] if the query would produce
264 /// more than `max_logs` results.
265 pub async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
266 let (resp, rx) = oneshot::channel();
267 self.send(ColdReadRequest::GetLogs { filter: Box::new(filter), max_logs, resp }, rx).await
268 }
269
270 /// Stream logs matching a filter.
271 ///
272 /// Returns a [`LogStream`] that yields matching logs in order.
273 /// Consume with `StreamExt::next()` until `None`. If the last item
274 /// is `Err(...)`, an error occurred (deadline, too many logs, etc.).
275 ///
276 /// The `deadline` is clamped to the task's configured maximum.
277 ///
278 /// # Partial Delivery
279 ///
280 /// One or more `Ok(log)` items may be delivered before a terminal
281 /// `Err(...)`. Consumers must be prepared for partial results — for
282 /// example, a reorg or deadline expiry can interrupt a stream that
283 /// has already yielded some logs.
284 ///
285 /// # Resource Management
286 ///
287 /// The stream holds a backend concurrency permit. Dropping the
288 /// stream releases the permit. Drop early if results are no
289 /// longer needed.
290 pub async fn stream_logs(
291 &self,
292 filter: Filter,
293 max_logs: usize,
294 deadline: Duration,
295 ) -> ColdResult<LogStream> {
296 let (resp, rx) = oneshot::channel();
297 self.send(
298 ColdReadRequest::StreamLogs { filter: Box::new(filter), max_logs, deadline, resp },
299 rx,
300 )
301 .await
302 }
303
304 // ==========================================================================
305 // Metadata
306 // ==========================================================================
307
308 /// Get the latest block number in storage.
309 pub async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
310 let (resp, rx) = oneshot::channel();
311 self.send(ColdReadRequest::GetLatestBlock { resp }, rx).await
312 }
313}
314
315/// Handle for interacting with the cold storage task.
316///
317/// This handle provides full access to both read and write operations.
318/// It can be cloned and shared across tasks.
319///
320/// # Channel Separation
321///
322/// Internally, this handle uses separate channels for reads and writes:
323///
324/// - **Read channel**: Shared with [`ColdStorageReadHandle`]. Reads are
325/// processed concurrently (up to 64 in flight).
326/// - **Write channel**: Exclusive to this handle. Writes are processed
327/// sequentially to maintain ordering.
328///
329/// This design allows read-heavy workloads to proceed without being blocked
330/// by write operations, while ensuring write ordering is preserved.
331///
332/// # Read Access
333///
334/// All read methods from [`ColdStorageReadHandle`] are available on this
335/// handle via [`Deref`](std::ops::Deref).
336///
337/// # Usage
338///
339/// ```ignore
340/// let handle = ColdStorageTask::spawn(backend, cancel);
341///
342/// // Full access: reads and writes
343/// handle.append_block(data).await?;
344/// let header = handle.get_header_by_number(100).await?;
345///
346/// // Get a read-only handle for query-only use cases
347/// let reader = handle.reader();
348/// ```
349///
350/// # Thread Safety
351///
352/// This handle is `Clone + Send + Sync` and can be shared across tasks.
353#[derive(Clone, Debug)]
354pub struct ColdStorageHandle {
355 reader: ColdStorageReadHandle,
356 write_sender: mpsc::Sender<ColdWriteRequest>,
357}
358
359impl std::ops::Deref for ColdStorageHandle {
360 type Target = ColdStorageReadHandle;
361
362 fn deref(&self) -> &Self::Target {
363 &self.reader
364 }
365}
366
367impl ColdStorageHandle {
368 /// Create a new handle with the given senders.
369 pub(crate) const fn new(
370 read_sender: mpsc::Sender<ColdReadRequest>,
371 write_sender: mpsc::Sender<ColdWriteRequest>,
372 ) -> Self {
373 Self { reader: ColdStorageReadHandle::new(read_sender), write_sender }
374 }
375
376 /// Get a read-only handle that shares the read channel.
377 ///
378 /// The returned handle can only perform read operations and cannot
379 /// modify storage. Multiple read handles can coexist and query
380 /// concurrently without affecting write throughput.
381 pub fn reader(&self) -> ColdStorageReadHandle {
382 self.reader.clone()
383 }
384
385 /// Send a write request and wait for the response.
386 async fn send_write<T>(
387 &self,
388 req: ColdWriteRequest,
389 rx: oneshot::Receiver<ColdResult<T>>,
390 ) -> ColdResult<T> {
391 self.write_sender.send(req).await.map_err(|_| ColdStorageError::Cancelled)?;
392 rx.await.map_err(|_| ColdStorageError::Cancelled)?
393 }
394
395 // ==========================================================================
396 // Write Operations
397 // ==========================================================================
398
399 /// Append a single block to cold storage.
400 pub async fn append_block(&self, data: BlockData) -> ColdResult<()> {
401 let (resp, rx) = oneshot::channel();
402 self.send_write(
403 ColdWriteRequest::AppendBlock(Box::new(AppendBlockRequest { data, resp })),
404 rx,
405 )
406 .await
407 }
408
409 /// Append multiple blocks to cold storage.
410 pub async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
411 let (resp, rx) = oneshot::channel();
412 self.send_write(ColdWriteRequest::AppendBlocks { data, resp }, rx).await
413 }
414
415 /// Truncate all data above the given block number.
416 ///
417 /// This removes block N+1 and higher from all tables.
418 pub async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
419 let (resp, rx) = oneshot::channel();
420 self.send_write(ColdWriteRequest::TruncateAbove { block, resp }, rx).await
421 }
422
423 // ==========================================================================
424 // Synchronous Fire-and-Forget Dispatch
425 // ==========================================================================
426
427 /// Dispatch append blocks without waiting for response (non-blocking).
428 ///
429 /// Unlike [`append_blocks`](Self::append_blocks), this method returns
430 /// immediately without waiting for the write to complete. The write
431 /// result is discarded.
432 ///
433 /// # Errors
434 ///
435 /// - [`ColdStorageError::Backpressure`]: Channel is full. The task is alive
436 /// but cannot keep up. Transient; may retry or accept the gap.
437 /// - [`ColdStorageError::TaskTerminated`]: Channel is closed. The task has
438 /// stopped and must be restarted.
439 ///
440 /// In both cases, hot storage already contains the data and remains
441 /// authoritative.
442 pub fn dispatch_append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
443 let (resp, _rx) = oneshot::channel();
444 self.write_sender
445 .try_send(ColdWriteRequest::AppendBlocks { data, resp })
446 .map_err(map_dispatch_error)
447 }
448
449 /// Dispatch truncate without waiting for response (non-blocking).
450 ///
451 /// Unlike [`truncate_above`](Self::truncate_above), this method returns
452 /// immediately without waiting for the truncate to complete. The result
453 /// is discarded.
454 ///
455 /// # Errors
456 ///
457 /// Same as [`dispatch_append_blocks`](Self::dispatch_append_blocks). If
458 /// cold storage falls behind during a reorg, it may temporarily contain
459 /// stale data until the truncate is processed or replayed.
460 pub fn dispatch_truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
461 let (resp, _rx) = oneshot::channel();
462 self.write_sender
463 .try_send(ColdWriteRequest::TruncateAbove { block, resp })
464 .map_err(map_dispatch_error)
465 }
466}