Skip to main content

signet_cold/
traits.rs

1//! Core trait definitions for cold storage backends.
2//!
3//! The cold storage interface is split into three traits:
4//!
5//! - [`ColdStorageRead`] — read-only access (`&self`, `Clone`)
6//! - [`ColdStorageWrite`] — write access (`&mut self`)
7//! - [`ColdStorage`] — supertrait combining both, with `drain_above`
8
9use crate::{
10    ColdReceipt, ColdResult, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
11    SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier,
12};
13use alloy::primitives::BlockNumber;
14use signet_storage_types::{
15    DbSignetEvent, DbZenithHeader, ExecutedBlock, Receipt, RecoveredTx, SealedHeader,
16};
17use std::future::Future;
18use tokio_stream::wrappers::ReceiverStream;
19
20/// A stream of log results backed by a bounded channel.
21///
22/// Each item is a `ColdResult<RpcLog>`. The stream produces `Ok(log)` items
23/// until complete, or yields a final `Err(e)` on failure. The stream ends
24/// (`None`) when all matching logs have been delivered or after an error.
25///
26/// # Partial Delivery
27///
28/// One or more `Ok(log)` items may be delivered before a terminal
29/// `Err(...)`. Consumers must be prepared for partial results — for
30/// example, a reorg or deadline expiry can interrupt a stream that has
31/// already yielded some logs.
32///
33/// # Resource Management
34///
35/// The stream holds a backend concurrency permit. Dropping the stream
36/// releases the permit. Drop early if results are no longer needed.
37pub type LogStream = ReceiverStream<ColdResult<RpcLog>>;
38
39/// Data for appending a complete block to cold storage.
40#[derive(Debug, Clone)]
41pub struct BlockData {
42    /// The sealed block header (contains cached hash).
43    pub header: SealedHeader,
44    /// The transactions in the block, with recovered senders.
45    pub transactions: Vec<RecoveredTx>,
46    /// The receipts for the transactions.
47    pub receipts: Vec<Receipt>,
48    /// The signet events in the block.
49    pub signet_events: Vec<DbSignetEvent>,
50    /// The zenith header for the block, if present.
51    pub zenith_header: Option<DbZenithHeader>,
52}
53
54impl BlockData {
55    /// Create new block data.
56    pub const fn new(
57        header: SealedHeader,
58        transactions: Vec<RecoveredTx>,
59        receipts: Vec<Receipt>,
60        signet_events: Vec<DbSignetEvent>,
61        zenith_header: Option<DbZenithHeader>,
62    ) -> Self {
63        Self { header, transactions, receipts, signet_events, zenith_header }
64    }
65
66    /// Get the block number of the block.
67    pub fn block_number(&self) -> BlockNumber {
68        self.header.number
69    }
70}
71
72impl From<ExecutedBlock> for BlockData {
73    fn from(block: ExecutedBlock) -> Self {
74        Self::new(
75            block.header,
76            block.transactions,
77            block.receipts,
78            block.signet_events,
79            block.zenith_header,
80        )
81    }
82}
83
84/// Read-only cold storage backend trait.
85///
86/// All methods take `&self` and return `Send` futures. Implementations
87/// must be `Clone + Send + Sync + 'static` so that read backends can be
88/// shared across tasks (e.g. via `Arc` or cheap cloning).
89///
90/// # Implementation Guide
91///
92/// Implementers must ensure:
93///
94/// - **Consistent reads**: Read operations should return consistent
95///   snapshots. A read started before a write completes should not see
96///   partial data from that write.
97pub trait ColdStorageRead: Clone + Send + Sync + 'static {
98    // --- Headers ---
99
100    /// Get a header by specifier.
101    fn get_header(
102        &self,
103        spec: HeaderSpecifier,
104    ) -> impl Future<Output = ColdResult<Option<SealedHeader>>> + Send;
105
106    /// Get multiple headers by specifiers.
107    fn get_headers(
108        &self,
109        specs: Vec<HeaderSpecifier>,
110    ) -> impl Future<Output = ColdResult<Vec<Option<SealedHeader>>>> + Send;
111
112    // --- Transactions ---
113
114    /// Get a transaction by specifier, with block confirmation metadata.
115    fn get_transaction(
116        &self,
117        spec: TransactionSpecifier,
118    ) -> impl Future<Output = ColdResult<Option<Confirmed<RecoveredTx>>>> + Send;
119
120    /// Get all transactions in a block.
121    fn get_transactions_in_block(
122        &self,
123        block: BlockNumber,
124    ) -> impl Future<Output = ColdResult<Vec<RecoveredTx>>> + Send;
125
126    /// Get the number of transactions in a block.
127    fn get_transaction_count(
128        &self,
129        block: BlockNumber,
130    ) -> impl Future<Output = ColdResult<u64>> + Send;
131
132    // --- Receipts ---
133
134    /// Get a receipt by specifier.
135    fn get_receipt(
136        &self,
137        spec: ReceiptSpecifier,
138    ) -> impl Future<Output = ColdResult<Option<ColdReceipt>>> + Send;
139
140    /// Get all receipts in a block.
141    fn get_receipts_in_block(
142        &self,
143        block: BlockNumber,
144    ) -> impl Future<Output = ColdResult<Vec<ColdReceipt>>> + Send;
145
146    // --- SignetEvents ---
147
148    /// Get signet events by specifier.
149    fn get_signet_events(
150        &self,
151        spec: SignetEventsSpecifier,
152    ) -> impl Future<Output = ColdResult<Vec<DbSignetEvent>>> + Send;
153
154    // --- ZenithHeaders ---
155
156    /// Get a zenith header by specifier.
157    fn get_zenith_header(
158        &self,
159        spec: ZenithHeaderSpecifier,
160    ) -> impl Future<Output = ColdResult<Option<DbZenithHeader>>> + Send;
161
162    /// Get multiple zenith headers by specifier.
163    fn get_zenith_headers(
164        &self,
165        spec: ZenithHeaderSpecifier,
166    ) -> impl Future<Output = ColdResult<Vec<DbZenithHeader>>> + Send;
167
168    // --- Metadata ---
169
170    /// Get the latest block number in storage.
171    fn get_latest_block(&self) -> impl Future<Output = ColdResult<Option<BlockNumber>>> + Send;
172
173    // --- Logs ---
174
175    /// Filter logs by block range, address, and topics.
176    ///
177    /// Follows `eth_getLogs` semantics: returns all logs matching the
178    /// filter criteria, ordered by `(block_number, tx_index, log_index)`.
179    ///
180    /// # Errors
181    ///
182    /// Returns [`ColdStorageError::TooManyLogs`] if the query would produce
183    /// more than `max_logs` results. No partial results are returned — the
184    /// caller must narrow the filter or increase the limit.
185    ///
186    /// [`ColdStorageError::TooManyLogs`]: crate::ColdStorageError::TooManyLogs
187    fn get_logs(
188        &self,
189        filter: &Filter,
190        max_logs: usize,
191    ) -> impl Future<Output = ColdResult<Vec<RpcLog>>> + Send;
192
193    // --- Streaming ---
194
195    /// Produce a log stream by iterating blocks and sending matching logs.
196    ///
197    /// # Concurrency
198    ///
199    /// Stream producers run concurrently with writes (`append_block`,
200    /// `truncate_above`, `drain_above`). They are NOT serialized by the
201    /// task runner's read/write barrier. Implementations MUST hold a
202    /// consistent read snapshot for the duration of the stream.
203    ///
204    /// Backends with snapshot semantics (MDBX read transactions,
205    /// PostgreSQL `REPEATABLE READ`) naturally satisfy this requirement.
206    ///
207    /// Backends without snapshot semantics can delegate to
208    /// [`produce_log_stream_default`], which uses per-block
209    /// [`get_header`] / [`get_logs`] calls with anchor-hash reorg
210    /// detection. This provides best-effort consistency but is not
211    /// immune to partial reads during concurrent writes.
212    ///
213    /// All errors are sent through `sender`. When this method returns,
214    /// the sender is dropped, closing the stream.
215    ///
216    /// [`get_header`]: ColdStorageRead::get_header
217    /// [`get_logs`]: ColdStorageRead::get_logs
218    /// [`produce_log_stream_default`]: crate::produce_log_stream_default
219    fn produce_log_stream(
220        &self,
221        filter: &Filter,
222        params: StreamParams,
223    ) -> impl Future<Output = ()> + Send;
224}
225
226/// Write-only cold storage backend trait.
227///
228/// All methods take `&mut self` and return `Send` futures. The write
229/// backend is exclusively owned by the task runner — no synchronization
230/// is needed.
231///
232/// # Implementation Guide
233///
234/// Implementers must ensure:
235///
236/// - **Append-only ordering**: `append_block` must enforce monotonically
237///   increasing block numbers. Attempting to append a block with a number <=
238///   the current latest should return an error.
239///
240/// - **Atomic truncation**: `truncate_above` must remove all data for blocks
241///   N+1 and higher atomically. Partial truncation is not acceptable.
242///
243/// - **Index maintenance**: Hash-based lookups (e.g., header by hash,
244///   transaction by hash) require the implementation to maintain appropriate
245///   indexes. These indexes must be updated during `append_block` and cleaned
246///   during `truncate_above`.
247pub trait ColdStorageWrite: Send + 'static {
248    /// Append a single block to cold storage.
249    fn append_block(&mut self, data: BlockData) -> impl Future<Output = ColdResult<()>> + Send;
250
251    /// Append multiple blocks to cold storage.
252    fn append_blocks(
253        &mut self,
254        data: Vec<BlockData>,
255    ) -> impl Future<Output = ColdResult<()>> + Send;
256
257    /// Truncate all data above the given block number (exclusive).
258    ///
259    /// This removes block N+1 and higher from all tables. Used for reorg handling.
260    fn truncate_above(&mut self, block: BlockNumber)
261    -> impl Future<Output = ColdResult<()>> + Send;
262}
263
264/// Combined read and write cold storage backend trait.
265///
266/// Combines [`ColdStorageRead`] and [`ColdStorageWrite`] and provides
267/// [`drain_above`](ColdStorage::drain_above), which reads receipts then
268/// truncates. The default implementation is correct but not atomic;
269/// backends should override with an atomic version when possible.
270pub trait ColdStorage: ColdStorageRead + ColdStorageWrite {
271    /// Read and remove all blocks above the given block number.
272    ///
273    /// Returns receipts for each block above `block` in ascending order,
274    /// then truncates. Index 0 = block+1, index 1 = block+2, etc.
275    /// Blocks with no receipts have empty vecs.
276    ///
277    /// The default implementation composes `get_latest_block` +
278    /// `get_receipts_in_block` + `truncate_above`. It is correct but
279    /// not atomic. Backends should override with an atomic version
280    /// when possible.
281    fn drain_above(
282        &mut self,
283        block: BlockNumber,
284    ) -> impl Future<Output = ColdResult<Vec<Vec<ColdReceipt>>>> + Send {
285        async move {
286            let mut all_receipts = Vec::new();
287            if let Some(latest) = self.get_latest_block().await? {
288                for n in (block + 1)..=latest {
289                    all_receipts.push(self.get_receipts_in_block(n).await?);
290                }
291            }
292            self.truncate_above(block).await?;
293            Ok(all_receipts)
294        }
295    }
296}