Skip to main content

signet_cold/
handle.rs

1//! Unified cold-storage handle.
2//!
3//! A single [`ColdStorage<B>`] type wraps an `Arc<Inner<B>>` and provides all
4//! read, write, and streaming operations. Concurrency is enforced via
5//! semaphores rather than dedicated reader/writer tasks.
6//!
7//! # Concurrency
8//!
9//! - Reads are gated by a `read_sem` (up to 64 in flight).
10//! - Writes are gated by a `write_sem` (serialized: 1 in flight) and
11//!   additionally acquire all `read_sem` permits as a drain barrier, so no
12//!   read is in flight while a write commits.
13//! - Log streams are gated by a `stream_sem` (up to 8 in flight) and do not
14//!   participate in the drain barrier.
15
16use crate::{
17    BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageError, Confirmed,
18    ErasedBackend, Filter, HeaderSpecifier, LogStream, ReceiptSpecifier, RpcLog,
19    SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier,
20    cache::ColdCache, metrics,
21};
22use alloy::primitives::{B256, BlockNumber};
23use parking_lot::Mutex;
24use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHeader};
25use std::{
26    sync::{Arc, Weak},
27    time::Duration,
28};
29use tokio::{
30    sync::{Semaphore, mpsc},
31    time::Instant,
32};
33use tokio_stream::wrappers::ReceiverStream;
34use tokio_util::{
35    sync::{CancellationToken, DropGuard},
36    task::TaskTracker,
37};
38use tracing::Instrument;
39
40/// Default maximum deadline for streaming operations.
41const DEFAULT_MAX_STREAM_DEADLINE: Duration = Duration::from_secs(60);
42
43/// Default fallback for the stream-setup `get_latest_block` deadline
44/// when the backend does not advertise a [`read_timeout`]. Picked to
45/// match the SQL/MDBX defaults so behaviour is predictable.
46///
47/// [`read_timeout`]: crate::ColdStorageBackend::read_timeout
48const DEFAULT_STREAM_SETUP_TIMEOUT: Duration = Duration::from_millis(500);
49
50/// Emit an advisory WARN if a successful write exceeded its end-to-end
51/// SLO target. Only fires on `Ok`: a failed write already surfaces an
52/// error to the caller, and a noisy overrun WARN on top would poison
53/// alerting built on this signal.
54fn warn_on_write_overrun(
55    op: &'static str,
56    elapsed: Duration,
57    threshold: Option<Duration>,
58    is_ok: bool,
59) {
60    let Some(threshold) = threshold else { return };
61    if is_ok && elapsed > threshold {
62        tracing::warn!(
63            op,
64            elapsed_ms = elapsed.as_millis() as u64,
65            threshold_ms = threshold.as_millis() as u64,
66            "cold write exceeded end-to-end write timeout (queue + drain + commit)",
67        );
68    }
69}
70
71/// Log a `JoinError` from a tracked spawn before mapping to
72/// [`ColdStorageError::TaskTerminated`]. A panic inside the spawned body
73/// is otherwise indistinguishable from graceful shutdown for the
74/// caller, which is a poor on-call signal.
75fn log_join_error(op: &'static str, e: &tokio::task::JoinError) {
76    if e.is_panic() {
77        tracing::error!(op, error = %e, "cold storage spawned task panicked");
78    } else if e.is_cancelled() {
79        tracing::debug!(op, "cold storage spawned task cancelled");
80    }
81}
82
83/// Maximum concurrent read operations.
84const MAX_CONCURRENT_READERS: usize = 64;
85
86/// Maximum concurrent write operations.
87const MAX_CONCURRENT_WRITES: usize = 1;
88
89/// Maximum concurrent streaming operations.
90const MAX_CONCURRENT_STREAMS: usize = 8;
91
92/// Channel buffer size for streaming operations.
93const STREAM_CHANNEL_BUFFER: usize = 256;
94
95/// Shared inner state for [`ColdStorage`].
96pub(crate) struct Inner<B> {
97    pub(crate) backend: B,
98    pub(crate) cache: Mutex<ColdCache>,
99    pub(crate) max_stream_deadline: Duration,
100    pub(crate) read_sem: Arc<Semaphore>,
101    pub(crate) write_sem: Arc<Semaphore>,
102    pub(crate) stream_sem: Arc<Semaphore>,
103    pub(crate) tracker: TaskTracker,
104    /// Fires `shutdown` (a child of the user's cancel token) when `Inner`
105    /// drops, waking the coordinator task so it exits without waiting on
106    /// user-side cancel.
107    _shutdown_guard: DropGuard,
108}
109
110/// Unified handle for interacting with a cold storage backend.
111///
112/// `ColdStorage<B>` is cheap to [`Clone`] — it is just an `Arc` around the
113/// shared inner state. All operations dispatch through semaphore-gated
114/// [`TaskTracker`]-spawned tasks.
115pub struct ColdStorage<B: ColdStorageBackend = ErasedBackend> {
116    inner: Arc<Inner<B>>,
117}
118
119impl<B: ColdStorageBackend> Clone for ColdStorage<B> {
120    fn clone(&self) -> Self {
121        Self { inner: Arc::clone(&self.inner) }
122    }
123}
124
125impl<B: ColdStorageBackend> std::fmt::Debug for ColdStorage<B> {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.debug_struct("ColdStorage").finish_non_exhaustive()
128    }
129}
130
131impl<B: ColdStorageBackend> ColdStorage<B> {
132    /// Create a new cold storage handle wrapping `backend`.
133    ///
134    /// A hidden coordinator task watches `cancel` and, on fire, closes the
135    /// read/write/stream semaphores and the task tracker. After cancel,
136    /// all handle methods fail fast with [`ColdStorageError::TaskTerminated`]
137    /// on permit acquisition; in-flight spawned tasks drain to completion
138    /// bounded by backend timeouts.
139    pub fn new(backend: B, cancel: CancellationToken) -> Self {
140        // `shutdown` fires on user-side cancel (via the parent) OR when
141        // `Inner` drops (via the DropGuard). The coordinator holds only a
142        // `Weak<Inner>` so it never pins the backend.
143        let shutdown = cancel.child_token();
144        let shutdown_guard = shutdown.clone().drop_guard();
145        let inner = Arc::new(Inner {
146            backend,
147            cache: Mutex::new(ColdCache::new()),
148            max_stream_deadline: DEFAULT_MAX_STREAM_DEADLINE,
149            read_sem: Arc::new(Semaphore::new(MAX_CONCURRENT_READERS)),
150            write_sem: Arc::new(Semaphore::new(MAX_CONCURRENT_WRITES)),
151            stream_sem: Arc::new(Semaphore::new(MAX_CONCURRENT_STREAMS)),
152            tracker: TaskTracker::new(),
153            _shutdown_guard: shutdown_guard,
154        });
155        let weak: Weak<Inner<B>> = Arc::downgrade(&inner);
156        // Shutdown coordinator: must NOT be tracked by `tracker`, otherwise
157        // `tracker.wait()` would deadlock waiting on this task. Holds only a
158        // `Weak` ref so `Inner` (and the backend) can drop when all handles
159        // and in-flight tasks are gone.
160        tokio::spawn(async move {
161            shutdown.cancelled().await;
162            let Some(inner) = weak.upgrade() else { return };
163            inner.read_sem.close();
164            inner.write_sem.close();
165            inner.stream_sem.close();
166            inner.tracker.close();
167        });
168        Self { inner }
169    }
170
171    /// Close the task tracker and wait for all in-flight tasks to finish.
172    ///
173    /// Idempotent with the shutdown coordinator: safe to call whether or not
174    /// the cancel token has fired.
175    pub async fn wait_shutdown(&self) {
176        self.inner.tracker.close();
177        self.inner.tracker.wait().await;
178    }
179
180    /// Spawn a read task under the `read_sem` permit.
181    async fn spawn_read<T, F, Fut>(&self, op: &'static str, f: F) -> ColdResult<T>
182    where
183        T: Send + 'static,
184        F: FnOnce(Arc<Inner<B>>) -> Fut + Send + 'static,
185        Fut: std::future::Future<Output = ColdResult<T>> + Send,
186    {
187        let wait = Instant::now();
188        let permit = self
189            .inner
190            .read_sem
191            .clone()
192            .acquire_owned()
193            .await
194            .map_err(|_| ColdStorageError::TaskTerminated)?;
195        metrics::record_permit_wait("read", wait.elapsed());
196        let inner = Arc::clone(&self.inner);
197        self.inner
198            .tracker
199            .spawn(
200                async move {
201                    let _p = permit;
202                    let _guard = metrics::InFlightGuard::new("read");
203                    let start = Instant::now();
204                    let result = f(inner).await;
205                    metrics::record_op_duration(op, start.elapsed());
206                    if let Err(ref e) = result {
207                        metrics::record_op_error(op, e.kind());
208                    }
209                    result
210                }
211                .in_current_span(),
212            )
213            .await
214            .map_err(|e| {
215                log_join_error(op, &e);
216                ColdStorageError::TaskTerminated
217            })?
218    }
219
220    /// Spawn a write task under the `write_sem` permit, holding a full drain
221    /// on `read_sem` for the duration of the write.
222    ///
223    /// Acquisition order is: `write_sem` first, then all `MAX_CONCURRENT_READERS`
224    /// read permits. This ensures concurrent writers queue on `write_sem` so
225    /// only one writer at a time waits on the drain, preventing starvation of
226    /// the read pool.
227    ///
228    /// Drop order inside the spawned task releases the drain before the write
229    /// permit, so readers regain access immediately once the write completes.
230    async fn spawn_write<T, F, Fut>(&self, op: &'static str, f: F) -> ColdResult<T>
231    where
232        T: Send + 'static,
233        F: FnOnce(Arc<Inner<B>>) -> Fut + Send + 'static,
234        Fut: std::future::Future<Output = ColdResult<T>> + Send,
235    {
236        // End-to-end SLO start: capture before permit acquisition so the
237        // measurement covers `write_sem` queueing and the read drain in
238        // addition to the backend commit. This is the failure shape the
239        // PR targets — a slow drain followed by a fast commit must surface
240        // as an SLO violation, not as a sub-threshold backend timing.
241        let e2e_start = Instant::now();
242        let threshold = self.inner.backend.write_timeout();
243
244        let write_permit = self
245            .inner
246            .write_sem
247            .clone()
248            .acquire_owned()
249            .await
250            .map_err(|_| ColdStorageError::TaskTerminated)?;
251        metrics::record_permit_wait("write", e2e_start.elapsed());
252
253        let drain_wait = Instant::now();
254        let drain = self
255            .inner
256            .read_sem
257            .clone()
258            .acquire_many_owned(MAX_CONCURRENT_READERS as u32)
259            .await
260            .map_err(|_| ColdStorageError::TaskTerminated)?;
261        metrics::record_permit_wait("drain", drain_wait.elapsed());
262
263        let inner = Arc::clone(&self.inner);
264        self.inner
265            .tracker
266            .spawn(
267                async move {
268                    let _w = write_permit;
269                    let _d = drain;
270                    let _guard = metrics::InFlightGuard::new("write");
271                    let start = Instant::now();
272                    let result = f(inner).await;
273                    metrics::record_op_duration(op, start.elapsed());
274                    if let Err(ref e) = result {
275                        metrics::record_op_error(op, e.kind());
276                    }
277                    warn_on_write_overrun(op, e2e_start.elapsed(), threshold, result.is_ok());
278                    result
279                }
280                .in_current_span(),
281            )
282            .await
283            .map_err(|e| {
284                log_join_error(op, &e);
285                ColdStorageError::TaskTerminated
286            })?
287    }
288
289    // ==========================================================================
290    // Headers
291    // ==========================================================================
292
293    /// Get a header by specifier.
294    #[tracing::instrument(skip(self, spec), fields(op = "get_header"))]
295    pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
296        let op_start = Instant::now();
297        if let HeaderSpecifier::Number(n) = &spec
298            && let Some(hit) = self.inner.cache.lock().get_header(n)
299        {
300            metrics::record_op_duration("get_header", op_start.elapsed());
301            return Ok(Some(hit));
302        }
303        self.spawn_read("get_header", move |inner| async move {
304            let result = inner.backend.get_header(spec).await;
305            if let Ok(Some(ref h)) = result {
306                inner.cache.lock().put_header(h.number, h.clone());
307            }
308            result
309        })
310        .await
311    }
312
313    /// Get a header by block number.
314    pub async fn get_header_by_number(
315        &self,
316        block: BlockNumber,
317    ) -> ColdResult<Option<SealedHeader>> {
318        self.get_header(HeaderSpecifier::Number(block)).await
319    }
320
321    /// Get a header by block hash.
322    pub async fn get_header_by_hash(&self, hash: B256) -> ColdResult<Option<SealedHeader>> {
323        self.get_header(HeaderSpecifier::Hash(hash)).await
324    }
325
326    /// Get multiple headers by specifiers.
327    #[tracing::instrument(skip(self, specs), fields(op = "get_headers"))]
328    pub async fn get_headers(
329        &self,
330        specs: Vec<HeaderSpecifier>,
331    ) -> ColdResult<Vec<Option<SealedHeader>>> {
332        self.spawn_read("get_headers", move |inner| async move {
333            inner.backend.get_headers(specs).await
334        })
335        .await
336    }
337
338    // ==========================================================================
339    // Transactions
340    // ==========================================================================
341
342    /// Get a transaction by specifier, with block confirmation metadata.
343    #[tracing::instrument(skip(self, spec), fields(op = "get_transaction"))]
344    pub async fn get_transaction(
345        &self,
346        spec: TransactionSpecifier,
347    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
348        let op_start = Instant::now();
349        if let TransactionSpecifier::BlockAndIndex { block, index } = &spec
350            && let Some(hit) = self.inner.cache.lock().get_tx(&(*block, *index))
351        {
352            metrics::record_op_duration("get_transaction", op_start.elapsed());
353            return Ok(Some(hit));
354        }
355        self.spawn_read("get_transaction", move |inner| async move {
356            let result = inner.backend.get_transaction(spec).await;
357            if let Ok(Some(ref c)) = result {
358                let meta = c.meta();
359                inner
360                    .cache
361                    .lock()
362                    .put_tx((meta.block_number(), meta.transaction_index()), c.clone());
363            }
364            result
365        })
366        .await
367    }
368
369    /// Get a transaction by hash.
370    pub async fn get_tx_by_hash(&self, hash: B256) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
371        self.get_transaction(TransactionSpecifier::Hash(hash)).await
372    }
373
374    /// Get a transaction by block number and index.
375    pub async fn get_tx_by_block_and_index(
376        &self,
377        block: BlockNumber,
378        index: u64,
379    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
380        self.get_transaction(TransactionSpecifier::BlockAndIndex { block, index }).await
381    }
382
383    /// Get a transaction by block hash and index.
384    pub async fn get_tx_by_block_hash_and_index(
385        &self,
386        block_hash: B256,
387        index: u64,
388    ) -> ColdResult<Option<Confirmed<RecoveredTx>>> {
389        self.get_transaction(TransactionSpecifier::BlockHashAndIndex { block_hash, index }).await
390    }
391
392    /// Get all transactions in a block.
393    #[tracing::instrument(skip(self), fields(op = "get_transactions_in_block"))]
394    pub async fn get_transactions_in_block(
395        &self,
396        block: BlockNumber,
397    ) -> ColdResult<Vec<RecoveredTx>> {
398        self.spawn_read("get_transactions_in_block", move |inner| async move {
399            inner.backend.get_transactions_in_block(block).await
400        })
401        .await
402    }
403
404    /// Get the transaction count for a block.
405    #[tracing::instrument(skip(self), fields(op = "get_transaction_count"))]
406    pub async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult<u64> {
407        self.spawn_read("get_transaction_count", move |inner| async move {
408            inner.backend.get_transaction_count(block).await
409        })
410        .await
411    }
412
413    // ==========================================================================
414    // Receipts
415    // ==========================================================================
416
417    /// Get a receipt by specifier.
418    #[tracing::instrument(skip(self, spec), fields(op = "get_receipt"))]
419    pub async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult<Option<ColdReceipt>> {
420        let op_start = Instant::now();
421        if let ReceiptSpecifier::BlockAndIndex { block, index } = &spec
422            && let Some(hit) = self.inner.cache.lock().get_receipt(&(*block, *index))
423        {
424            metrics::record_op_duration("get_receipt", op_start.elapsed());
425            return Ok(Some(hit));
426        }
427        self.spawn_read("get_receipt", move |inner| async move {
428            let result = inner.backend.get_receipt(spec).await;
429            if let Ok(Some(ref c)) = result {
430                inner.cache.lock().put_receipt((c.block_number, c.transaction_index), c.clone());
431            }
432            result
433        })
434        .await
435    }
436
437    /// Get a receipt by transaction hash.
438    pub async fn get_receipt_by_tx_hash(&self, hash: B256) -> ColdResult<Option<ColdReceipt>> {
439        self.get_receipt(ReceiptSpecifier::TxHash(hash)).await
440    }
441
442    /// Get a receipt by block number and index.
443    pub async fn get_receipt_by_block_and_index(
444        &self,
445        block: BlockNumber,
446        index: u64,
447    ) -> ColdResult<Option<ColdReceipt>> {
448        self.get_receipt(ReceiptSpecifier::BlockAndIndex { block, index }).await
449    }
450
451    /// Get all receipts in a block.
452    #[tracing::instrument(skip(self), fields(op = "get_receipts_in_block"))]
453    pub async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult<Vec<ColdReceipt>> {
454        self.spawn_read("get_receipts_in_block", move |inner| async move {
455            inner.backend.get_receipts_in_block(block).await
456        })
457        .await
458    }
459
460    // ==========================================================================
461    // SignetEvents
462    // ==========================================================================
463
464    /// Get signet events by specifier.
465    #[tracing::instrument(skip(self, spec), fields(op = "get_signet_events"))]
466    pub async fn get_signet_events(
467        &self,
468        spec: SignetEventsSpecifier,
469    ) -> ColdResult<Vec<DbSignetEvent>> {
470        self.spawn_read("get_signet_events", move |inner| async move {
471            inner.backend.get_signet_events(spec).await
472        })
473        .await
474    }
475
476    /// Get signet events in a block.
477    pub async fn get_signet_events_in_block(
478        &self,
479        block: BlockNumber,
480    ) -> ColdResult<Vec<DbSignetEvent>> {
481        self.get_signet_events(SignetEventsSpecifier::Block(block)).await
482    }
483
484    /// Get signet events in a range of blocks.
485    pub async fn get_signet_events_in_range(
486        &self,
487        start: BlockNumber,
488        end: BlockNumber,
489    ) -> ColdResult<Vec<DbSignetEvent>> {
490        self.get_signet_events(SignetEventsSpecifier::BlockRange { start, end }).await
491    }
492
493    // ==========================================================================
494    // ZenithHeaders
495    // ==========================================================================
496
497    /// Get a zenith header by block number.
498    pub async fn get_zenith_header(
499        &self,
500        block: BlockNumber,
501    ) -> ColdResult<Option<DbZenithHeader>> {
502        self.get_zenith_header_by_spec(ZenithHeaderSpecifier::Number(block)).await
503    }
504
505    /// Get a zenith header by specifier.
506    #[tracing::instrument(skip(self, spec), fields(op = "get_zenith_header_by_spec"))]
507    async fn get_zenith_header_by_spec(
508        &self,
509        spec: ZenithHeaderSpecifier,
510    ) -> ColdResult<Option<DbZenithHeader>> {
511        self.spawn_read("get_zenith_header_by_spec", move |inner| async move {
512            inner.backend.get_zenith_header(spec).await
513        })
514        .await
515    }
516
517    /// Get zenith headers by specifier.
518    #[tracing::instrument(skip(self, spec), fields(op = "get_zenith_headers"))]
519    pub async fn get_zenith_headers(
520        &self,
521        spec: ZenithHeaderSpecifier,
522    ) -> ColdResult<Vec<DbZenithHeader>> {
523        self.spawn_read("get_zenith_headers", move |inner| async move {
524            inner.backend.get_zenith_headers(spec).await
525        })
526        .await
527    }
528
529    /// Get zenith headers in a range of blocks.
530    pub async fn get_zenith_headers_in_range(
531        &self,
532        start: BlockNumber,
533        end: BlockNumber,
534    ) -> ColdResult<Vec<DbZenithHeader>> {
535        self.get_zenith_headers(ZenithHeaderSpecifier::Range { start, end }).await
536    }
537
538    // ==========================================================================
539    // Logs
540    // ==========================================================================
541
542    /// Filter logs by block range, address, and topics.
543    ///
544    /// Follows `eth_getLogs` semantics. Returns matching logs ordered by
545    /// `(block_number, tx_index, log_index)`.
546    #[tracing::instrument(skip(self, filter), fields(op = "get_logs"))]
547    pub async fn get_logs(&self, filter: Filter, max_logs: usize) -> ColdResult<Vec<RpcLog>> {
548        self.spawn_read("get_logs", move |inner| async move {
549            inner.backend.get_logs(&filter, max_logs).await
550        })
551        .await
552    }
553
554    /// Stream logs matching a filter.
555    ///
556    /// Returns a [`LogStream`] that yields matching logs in order.
557    /// Consume with `StreamExt::next()` until `None`. If the last item is
558    /// `Err(...)`, an error occurred (deadline, too many logs, reorg).
559    ///
560    /// The `deadline` is clamped to the handle's configured maximum.
561    #[tracing::instrument(skip(self, filter), fields(op = "stream_logs"))]
562    pub async fn stream_logs(
563        &self,
564        filter: Filter,
565        max_logs: usize,
566        deadline: Duration,
567    ) -> ColdResult<LogStream> {
568        let from = filter.get_from_block().unwrap_or(0);
569        // Resolve `to` BEFORE acquiring `stream_sem`. Holding the permit
570        // across setup I/O (especially when falling back to
571        // `get_latest_block`) lets a stuck backend pin all 8 permits and
572        // prevent any new stream from starting. Setup reads intentionally
573        // bypass `read_sem` and the drain barrier: a stream asking for
574        // "latest" should observe latest at setup time even alongside an
575        // in-flight write.
576        //
577        // Wrap the setup read in a wall-clock timeout so a stuck backend
578        // (cold MDBX page, saturated PG pool) cannot stall N concurrent
579        // setup callers indefinitely. The future drops on timeout but the
580        // backend work continues — same trade-off the rest of the design
581        // accepts.
582        let to = match filter.get_to_block() {
583            Some(to) => to,
584            None => {
585                let setup_to =
586                    self.inner.backend.read_timeout().unwrap_or(DEFAULT_STREAM_SETUP_TIMEOUT);
587                let latest = tokio::time::timeout(setup_to, self.inner.backend.get_latest_block())
588                    .await
589                    .map_err(|_| ColdStorageError::DeadlineExceeded(setup_to))??;
590                match latest {
591                    Some(latest) => latest,
592                    None => {
593                        let (_tx, rx) = mpsc::channel(1);
594                        return Ok(ReceiverStream::new(rx));
595                    }
596                }
597            }
598        };
599
600        let wait = Instant::now();
601        let permit = self
602            .inner
603            .stream_sem
604            .clone()
605            .acquire_owned()
606            .await
607            .map_err(|_| ColdStorageError::TaskTerminated)?;
608        metrics::record_permit_wait("stream", wait.elapsed());
609
610        let effective = deadline.min(self.inner.max_stream_deadline);
611        let deadline_instant = Instant::now() + effective;
612        let (sender, rx) = mpsc::channel(STREAM_CHANNEL_BUFFER);
613        let inner = Arc::clone(&self.inner);
614        let started = Instant::now();
615        self.inner.tracker.spawn(
616            async move {
617                let _p = permit;
618                let _guard = metrics::InFlightGuard::new("stream");
619                let params =
620                    StreamParams { from, to, max_logs, sender, deadline: deadline_instant };
621                inner.backend.produce_log_stream(&filter, params).await;
622                metrics::record_stream_lifetime(started.elapsed());
623            }
624            .in_current_span(),
625        );
626        Ok(ReceiverStream::new(rx))
627    }
628
629    // ==========================================================================
630    // Metadata
631    // ==========================================================================
632
633    /// Get the latest block number in storage.
634    #[tracing::instrument(skip(self), fields(op = "get_latest_block"))]
635    pub async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
636        self.spawn_read("get_latest_block", move |inner| async move {
637            inner.backend.get_latest_block().await
638        })
639        .await
640    }
641
642    // ==========================================================================
643    // Writes
644    // ==========================================================================
645
646    /// Append a single block to cold storage.
647    #[tracing::instrument(skip(self, data), fields(op = "append_block"))]
648    pub async fn append_block(&self, data: BlockData) -> ColdResult<()> {
649        self.spawn_write("append_block", move |inner| async move {
650            inner.backend.append_block(data).await
651        })
652        .await
653    }
654
655    /// Append multiple blocks to cold storage.
656    #[tracing::instrument(skip(self, data), fields(op = "append_blocks"))]
657    pub async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
658        self.spawn_write("append_blocks", move |inner| async move {
659            inner.backend.append_blocks(data).await
660        })
661        .await
662    }
663
664    /// Truncate all data above the given block number.
665    ///
666    /// This removes block N+1 and higher from all tables and invalidates any
667    /// cached lookups above `block`.
668    #[tracing::instrument(skip(self), fields(op = "truncate_above"))]
669    pub async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
670        self.spawn_write("truncate_above", move |inner| async move {
671            let result = inner.backend.truncate_above(block).await;
672            if result.is_ok() {
673                inner.cache.lock().invalidate_above(block);
674            }
675            result
676        })
677        .await
678    }
679
680    /// Read and remove all blocks above the given block number.
681    ///
682    /// Returns receipts for each block above `block` in ascending order,
683    /// then truncates. Index 0 = block+1, index 1 = block+2, etc.
684    #[tracing::instrument(skip(self), fields(op = "drain_above"))]
685    pub async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
686        self.spawn_write("drain_above", move |inner| async move {
687            let result = inner.backend.drain_above(block).await;
688            if result.is_ok() {
689                inner.cache.lock().invalidate_above(block);
690            }
691            result
692        })
693        .await
694    }
695}
696
697impl ColdStorage<ErasedBackend> {
698    /// Construct a type-erased cold storage handle.
699    ///
700    /// Wraps `backend` in [`ErasedBackend`] so the resulting handle
701    /// has no `B` type parameter to propagate through downstream
702    /// signatures. Equivalent to
703    /// `ColdStorage::new(ErasedBackend::new(backend), cancel)`.
704    ///
705    /// Choose this constructor when you want runtime swappability of
706    /// the backend; use [`new`](Self::new) directly for fully
707    /// monomorphized call sites.
708    pub fn new_erased<B: ColdStorageBackend>(backend: B, cancel: CancellationToken) -> Self {
709        Self::new(ErasedBackend::new(backend), cancel)
710    }
711}