Skip to main content

signet_cold/
traits.rs

1//! Core trait definition for cold storage backends.
2//!
3//! The [`ColdStorage`] trait defines the interface that all cold storage
4//! backends must implement. Backends are responsible for data organization,
5//! indexing, and keying - the trait is agnostic to these implementation
6//! details.
7
8use crate::{
9    ColdReceipt, ColdResult, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
10    SignetEventsSpecifier, StreamParams, TransactionSpecifier, ZenithHeaderSpecifier,
11};
12use alloy::primitives::BlockNumber;
13use signet_storage_types::{
14    DbSignetEvent, DbZenithHeader, ExecutedBlock, Receipt, RecoveredTx, SealedHeader,
15};
16use std::future::Future;
17use tokio_stream::wrappers::ReceiverStream;
18
19/// A stream of log results backed by a bounded channel.
20///
21/// Each item is a `ColdResult<RpcLog>`. The stream produces `Ok(log)` items
22/// until complete, or yields a final `Err(e)` on failure. The stream ends
23/// (`None`) when all matching logs have been delivered or after an error.
24///
25/// # Partial Delivery
26///
27/// One or more `Ok(log)` items may be delivered before a terminal
28/// `Err(...)`. Consumers must be prepared for partial results — for
29/// example, a reorg or deadline expiry can interrupt a stream that has
30/// already yielded some logs.
31///
32/// # Resource Management
33///
34/// The stream holds a backend concurrency permit. Dropping the stream
35/// releases the permit. Drop early if results are no longer needed.
36pub type LogStream = ReceiverStream<ColdResult<RpcLog>>;
37
38/// Data for appending a complete block to cold storage.
39#[derive(Debug, Clone)]
40pub struct BlockData {
41    /// The sealed block header (contains cached hash).
42    pub header: SealedHeader,
43    /// The transactions in the block, with recovered senders.
44    pub transactions: Vec<RecoveredTx>,
45    /// The receipts for the transactions.
46    pub receipts: Vec<Receipt>,
47    /// The signet events in the block.
48    pub signet_events: Vec<DbSignetEvent>,
49    /// The zenith header for the block, if present.
50    pub zenith_header: Option<DbZenithHeader>,
51}
52
53impl BlockData {
54    /// Create new block data.
55    pub const fn new(
56        header: SealedHeader,
57        transactions: Vec<RecoveredTx>,
58        receipts: Vec<Receipt>,
59        signet_events: Vec<DbSignetEvent>,
60        zenith_header: Option<DbZenithHeader>,
61    ) -> Self {
62        Self { header, transactions, receipts, signet_events, zenith_header }
63    }
64
65    /// Get the block number of the block.
66    pub fn block_number(&self) -> BlockNumber {
67        self.header.number
68    }
69}
70
71impl From<ExecutedBlock> for BlockData {
72    fn from(block: ExecutedBlock) -> Self {
73        Self::new(
74            block.header,
75            block.transactions,
76            block.receipts,
77            block.signet_events,
78            block.zenith_header,
79        )
80    }
81}
82
83/// Unified cold storage backend trait.
84///
85/// Backend is responsible for all data organization, indexing, and keying.
86/// The trait is agnostic to how the backend stores or indexes data.
87///
88/// All methods are async and return futures that are `Send`.
89///
90/// # Implementation Guide
91///
92/// Implementers must ensure:
93///
94/// - **Append-only ordering**: `append_block` must enforce monotonically
95///   increasing block numbers. Attempting to append a block with a number <=
96///   the current latest should return an error.
97///
98/// - **Atomic truncation**: `truncate_above` must remove all data for blocks
99///   N+1 and higher atomically. Partial truncation is not acceptable.
100///
101/// - **Index maintenance**: Hash-based lookups (e.g., header by hash,
102///   transaction by hash) require the implementation to maintain appropriate
103///   indexes. These indexes must be updated during `append_block` and cleaned
104///   during `truncate_above`.
105///
106/// - **Consistent reads**: Read operations should return consistent snapshots.
107///   A read started before a write completes should not see partial data from
108///   that write.
109///
110pub trait ColdStorage: Send + Sync + 'static {
111    // --- Headers ---
112
113    /// Get a header by specifier.
114    fn get_header(
115        &self,
116        spec: HeaderSpecifier,
117    ) -> impl Future<Output = ColdResult<Option<SealedHeader>>> + Send;
118
119    /// Get multiple headers by specifiers.
120    fn get_headers(
121        &self,
122        specs: Vec<HeaderSpecifier>,
123    ) -> impl Future<Output = ColdResult<Vec<Option<SealedHeader>>>> + Send;
124
125    // --- Transactions ---
126
127    /// Get a transaction by specifier, with block confirmation metadata.
128    fn get_transaction(
129        &self,
130        spec: TransactionSpecifier,
131    ) -> impl Future<Output = ColdResult<Option<Confirmed<RecoveredTx>>>> + Send;
132
133    /// Get all transactions in a block.
134    fn get_transactions_in_block(
135        &self,
136        block: BlockNumber,
137    ) -> impl Future<Output = ColdResult<Vec<RecoveredTx>>> + Send;
138
139    /// Get the number of transactions in a block.
140    fn get_transaction_count(
141        &self,
142        block: BlockNumber,
143    ) -> impl Future<Output = ColdResult<u64>> + Send;
144
145    // --- Receipts ---
146
147    /// Get a receipt by specifier.
148    fn get_receipt(
149        &self,
150        spec: ReceiptSpecifier,
151    ) -> impl Future<Output = ColdResult<Option<ColdReceipt>>> + Send;
152
153    /// Get all receipts in a block.
154    fn get_receipts_in_block(
155        &self,
156        block: BlockNumber,
157    ) -> impl Future<Output = ColdResult<Vec<ColdReceipt>>> + Send;
158
159    // --- SignetEvents ---
160
161    /// Get signet events by specifier.
162    fn get_signet_events(
163        &self,
164        spec: SignetEventsSpecifier,
165    ) -> impl Future<Output = ColdResult<Vec<DbSignetEvent>>> + Send;
166
167    // --- ZenithHeaders ---
168
169    /// Get a zenith header by specifier.
170    fn get_zenith_header(
171        &self,
172        spec: ZenithHeaderSpecifier,
173    ) -> impl Future<Output = ColdResult<Option<DbZenithHeader>>> + Send;
174
175    /// Get multiple zenith headers by specifier.
176    fn get_zenith_headers(
177        &self,
178        spec: ZenithHeaderSpecifier,
179    ) -> impl Future<Output = ColdResult<Vec<DbZenithHeader>>> + Send;
180
181    // --- Metadata ---
182
183    /// Get the latest block number in storage.
184    fn get_latest_block(&self) -> impl Future<Output = ColdResult<Option<BlockNumber>>> + Send;
185
186    // --- Logs ---
187
188    /// Filter logs by block range, address, and topics.
189    ///
190    /// Follows `eth_getLogs` semantics: returns all logs matching the
191    /// filter criteria, ordered by `(block_number, tx_index, log_index)`.
192    ///
193    /// # Errors
194    ///
195    /// Returns [`ColdStorageError::TooManyLogs`] if the query would produce
196    /// more than `max_logs` results. No partial results are returned — the
197    /// caller must narrow the filter or increase the limit.
198    ///
199    /// [`ColdStorageError::TooManyLogs`]: crate::ColdStorageError::TooManyLogs
200    fn get_logs(
201        &self,
202        filter: &Filter,
203        max_logs: usize,
204    ) -> impl Future<Output = ColdResult<Vec<RpcLog>>> + Send;
205
206    // --- Streaming ---
207
208    /// Produce a log stream by iterating blocks and sending matching logs.
209    ///
210    /// # Concurrency
211    ///
212    /// Stream producers run concurrently with writes (`append_block`,
213    /// `truncate_above`, `drain_above`). They are NOT serialized by the
214    /// task runner's read/write barrier. Implementations MUST hold a
215    /// consistent read snapshot for the duration of the stream.
216    ///
217    /// Backends with snapshot semantics (MDBX read transactions,
218    /// PostgreSQL `REPEATABLE READ`) naturally satisfy this requirement.
219    ///
220    /// Backends without snapshot semantics can delegate to
221    /// [`produce_log_stream_default`], which uses per-block
222    /// [`get_header`] / [`get_logs`] calls with anchor-hash reorg
223    /// detection. This provides best-effort consistency but is not
224    /// immune to partial reads during concurrent writes.
225    ///
226    /// All errors are sent through `sender`. When this method returns,
227    /// the sender is dropped, closing the stream.
228    ///
229    /// [`get_header`]: ColdStorage::get_header
230    /// [`get_logs`]: ColdStorage::get_logs
231    /// [`produce_log_stream_default`]: crate::produce_log_stream_default
232    fn produce_log_stream(
233        &self,
234        filter: &Filter,
235        params: StreamParams,
236    ) -> impl Future<Output = ()> + Send;
237
238    // --- Write operations ---
239
240    /// Append a single block to cold storage.
241    fn append_block(&self, data: BlockData) -> impl Future<Output = ColdResult<()>> + Send;
242
243    /// Append multiple blocks to cold storage.
244    fn append_blocks(&self, data: Vec<BlockData>) -> impl Future<Output = ColdResult<()>> + Send;
245
246    /// Truncate all data above the given block number (exclusive).
247    ///
248    /// This removes block N+1 and higher from all tables. Used for reorg handling.
249    fn truncate_above(&self, block: BlockNumber) -> impl Future<Output = ColdResult<()>> + Send;
250
251    /// Read and remove all blocks above the given block number.
252    ///
253    /// Returns receipts for each block above `block` in ascending order,
254    /// then truncates. Index 0 = block+1, index 1 = block+2, etc.
255    /// Blocks with no receipts have empty vecs.
256    ///
257    /// The default implementation composes `get_latest_block` +
258    /// `get_receipts_in_block` + `truncate_above`. It is correct but
259    /// not atomic. Backends should override with an atomic version
260    /// when possible.
261    fn drain_above(
262        &self,
263        block: BlockNumber,
264    ) -> impl Future<Output = ColdResult<Vec<Vec<ColdReceipt>>>> + Send {
265        async move {
266            let mut all_receipts = Vec::new();
267            if let Some(latest) = self.get_latest_block().await? {
268                for n in (block + 1)..=latest {
269                    all_receipts.push(self.get_receipts_in_block(n).await?);
270                }
271            }
272            self.truncate_above(block).await?;
273            Ok(all_receipts)
274        }
275    }
276}