signet_cold/stream.rs
1//! Log-streaming helper for backends without snapshot semantics.
2
3use crate::{ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, RpcLog};
4use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption};
5use tokio::sync::mpsc;
6
7/// Parameters for a log-streaming request.
8///
9/// Bundles the block range, limits, channel, and deadline that every
10/// [`ColdStorage::produce_log_stream`] implementation needs.
11#[derive(Debug)]
12pub struct StreamParams {
13 /// First block in range (inclusive).
14 pub from: BlockNumber,
15 /// Last block in range (inclusive).
16 pub to: BlockNumber,
17 /// Maximum number of matching logs before aborting with
18 /// [`ColdStorageError::TooManyLogs`].
19 pub max_logs: usize,
20 /// Channel for sending log results.
21 pub sender: mpsc::Sender<ColdResult<RpcLog>>,
22 /// Deadline after which the stream is aborted with
23 /// [`ColdStorageError::StreamDeadlineExceeded`].
24 pub deadline: tokio::time::Instant,
25}
26
27/// Log-streaming implementation for backends without snapshot semantics.
28///
29/// Captures an anchor hash from the `to` block at the start and
30/// re-checks it before each block to detect reorgs. Uses
31/// [`ColdStorage::get_header`] for anchor checks and
32/// [`ColdStorage::get_logs`] with single-block filters per block.
33///
34/// Backends that hold a consistent read snapshot (MDBX, PostgreSQL
35/// with REPEATABLE READ) should provide their own
36/// [`ColdStorage::produce_log_stream`] implementation instead.
37pub async fn produce_log_stream_default<B: ColdStorage + ?Sized>(
38 backend: &B,
39 filter: &Filter,
40 params: StreamParams,
41) {
42 let StreamParams { from, to, max_logs, sender, deadline } = params;
43
44 // Capture the hash of the `to` block before we start iterating.
45 // Without snapshot isolation we have no guarantee that the
46 // underlying data stays consistent, so we re-check this hash
47 // before each block to detect reorgs.
48 let anchor_hash = match backend.get_header(HeaderSpecifier::Number(to)).await {
49 Ok(Some(h)) => h.hash(),
50 Ok(None) => return, // block doesn't exist; empty stream
51 Err(e) => {
52 let _ = sender.send(Err(e)).await;
53 return;
54 }
55 };
56
57 let mut total = 0usize;
58
59 // Clone the filter once; we reuse it across blocks by mutating
60 // only the block range, avoiding per-block clones of the address
61 // and topic arrays.
62 let mut block_filter = filter.clone();
63
64 // Walk through blocks one at a time, fetching matching logs from
65 // each block and sending them over the channel individually.
66 for block_num in from..=to {
67 // Check the deadline before starting each block so we
68 // don't begin a new query after the caller's timeout.
69 if tokio::time::Instant::now() > deadline {
70 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
71 return;
72 }
73
74 // Re-check the anchor hash to detect reorgs that may have
75 // occurred since we started streaming.
76 match backend.get_header(HeaderSpecifier::Number(to)).await {
77 Ok(Some(h)) if h.hash() == anchor_hash => {}
78 Ok(_) => {
79 let _ = sender.send(Err(ColdStorageError::ReorgDetected)).await;
80 return;
81 }
82 Err(e) => {
83 let _ = sender.send(Err(e)).await;
84 return;
85 }
86 }
87
88 // Fetch all matching logs for this single block. The
89 // remaining budget shrinks as we accumulate results so
90 // `get_logs` can reject early if a single block overflows.
91 let remaining = max_logs.saturating_sub(total);
92 block_filter.block_option = FilterBlockOption::Range {
93 from_block: Some(block_num.into()),
94 to_block: Some(block_num.into()),
95 };
96 let block_logs = match backend.get_logs(&block_filter, remaining).await {
97 Ok(logs) => logs,
98 Err(ColdStorageError::TooManyLogs { .. }) => {
99 let _ = sender.send(Err(ColdStorageError::TooManyLogs { limit: max_logs })).await;
100 return;
101 }
102 Err(e) => {
103 let _ = sender.send(Err(e)).await;
104 return;
105 }
106 };
107
108 total += block_logs.len();
109
110 // Send each log individually over the channel. The timeout
111 // ensures we stop if the deadline passes while back-pressured
112 // by a slow receiver.
113 for log in block_logs {
114 match tokio::time::timeout_at(deadline, sender.send(Ok(log))).await {
115 Ok(Ok(())) => {}
116 Ok(Err(_)) => return, // receiver dropped
117 Err(_) => {
118 let _ = sender.send(Err(ColdStorageError::StreamDeadlineExceeded)).await;
119 return;
120 }
121 }
122 }
123
124 // Early exit if we've already hit the limit — no need to
125 // query the next block.
126 if total >= max_logs {
127 return;
128 }
129 }
130}