Skip to main content

signet_cold/
stream.rs

1//! Log-streaming helper for backends without snapshot semantics.
2
3use crate::{ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, RpcLog};
4use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption};
5use tokio::sync::mpsc;
6
7/// Parameters for a log-streaming request.
8///
9/// Bundles the block range, limits, channel, and deadline that every
10/// [`ColdStorage::produce_log_stream`] implementation needs.
11#[derive(Debug)]
12pub struct StreamParams {
13    /// First block in range (inclusive).
14    pub from: BlockNumber,
15    /// Last block in range (inclusive).
16    pub to: BlockNumber,
17    /// Maximum number of matching logs before aborting with
18    /// [`ColdStorageError::TooManyLogs`].
19    pub max_logs: usize,
20    /// Channel for sending log results.
21    pub sender: mpsc::Sender<ColdResult<RpcLog>>,
22    /// Deadline after which the stream is aborted with
23    /// [`ColdStorageError::StreamDeadlineExceeded`].
24    pub deadline: tokio::time::Instant,
25}
26
27/// Log-streaming implementation for backends without snapshot semantics.
28///
29/// Captures an anchor hash from the `to` block at the start and
30/// re-checks it before each block to detect reorgs. Uses
31/// [`ColdStorage::get_header`] for anchor checks and
32/// [`ColdStorage::get_logs`] with single-block filters per block.
33///
34/// Backends that hold a consistent read snapshot (MDBX, PostgreSQL
35/// with REPEATABLE READ) should provide their own
36/// [`ColdStorage::produce_log_stream`] implementation instead.
37pub async fn produce_log_stream_default<B: ColdStorage + ?Sized>(
38    backend: &B,
39    filter: &Filter,
40    params: StreamParams,
41) {
42    let StreamParams { from, to, max_logs, sender, deadline } = params;
43
44    // Capture the hash of the `to` block before we start iterating.
45    // Without snapshot isolation we have no guarantee that the
46    // underlying data stays consistent, so we re-check this hash
47    // before each block to detect reorgs.
48    let anchor_hash = match backend.get_header(HeaderSpecifier::Number(to)).await {
49        Ok(Some(h)) => h.hash(),
50        Ok(None) => return, // block doesn't exist; empty stream
51        Err(e) => {
52            let _ = sender.send(Err(e)).await;
53            return;
54        }
55    };
56
57    let mut total = 0usize;
58
59    // Clone the filter once; we reuse it across blocks by mutating
60    // only the block range, avoiding per-block clones of the address
61    // and topic arrays.
62    let mut block_filter = filter.clone();
63
64    // Walk through blocks one at a time, fetching matching logs from
65    // each block and sending them over the channel individually.
66    for block_num in from..=to {
67        // Check the deadline before starting each block so we
68        // don't begin a new query after the caller's timeout.
69        if tokio::time::Instant::now() > deadline {
70            let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
71            return;
72        }
73
74        // Re-check the anchor hash to detect reorgs that may have
75        // occurred since we started streaming.
76        match backend.get_header(HeaderSpecifier::Number(to)).await {
77            Ok(Some(h)) if h.hash() == anchor_hash => {}
78            Ok(_) => {
79                let _ = sender.send(Err(ColdStorageError::ReorgDetected)).await;
80                return;
81            }
82            Err(e) => {
83                let _ = sender.send(Err(e)).await;
84                return;
85            }
86        }
87
88        // Fetch all matching logs for this single block. The
89        // remaining budget shrinks as we accumulate results so
90        // `get_logs` can reject early if a single block overflows.
91        let remaining = max_logs.saturating_sub(total);
92        block_filter.block_option = FilterBlockOption::Range {
93            from_block: Some(block_num.into()),
94            to_block: Some(block_num.into()),
95        };
96        let block_logs = match backend.get_logs(&block_filter, remaining).await {
97            Ok(logs) => logs,
98            Err(ColdStorageError::TooManyLogs { .. }) => {
99                let _ = sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
100                return;
101            }
102            Err(e) => {
103                let _ = sender.send(Err(e)).await;
104                return;
105            }
106        };
107
108        total += block_logs.len();
109
110        // Send each log individually over the channel. The timeout
111        // ensures we stop if the deadline passes while back-pressured
112        // by a slow receiver.
113        for log in block_logs {
114            match tokio::time::timeout_at(deadline, sender.send(Ok(log))).await {
115                Ok(Ok(())) => {}
116                Ok(Err(_)) => return, // receiver dropped
117                Err(_) => {
118                    let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
119                    return;
120                }
121            }
122        }
123
124        // Early exit if we've already hit the limit — no need to
125        // query the next block.
126        if total >= max_logs {
127            return;
128        }
129    }
130}