1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
use crate::rpc::{
LogRange, RpcError,
backfill::backfill,
config::{HttpRpcSettings, ProviderSettings},
heads::{collect_window_hashes, fetch_logs_by_hash_range, last_block, stream_heads_with_logs},
};
use alloy::providers::Provider;
use alloy::rpc::types::{BlockNumberOrTag, Filter};
use async_stream::stream;
use futures_util::{Stream, StreamExt, pin_mut};
use tracing::debug;
pub struct LogStreamConfig {
pub wss_endpoints: Vec<String>,
pub http_options: Vec<HttpRpcSettings>,
pub events: Vec<String>,
pub from_block: u64,
pub to_block: Option<u64>,
}
pub async fn stream_logs(
providers: &ProviderSettings,
filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
stream! {
let heads = stream_heads_with_logs(providers, filter).await;
pin_mut!(heads);
while let Some(result) = heads.next().await {
yield result;
}
}
}
pub async fn backfill_then_watch_logs(
providers: &ProviderSettings,
filter: Option<&Filter>,
) -> impl Stream<Item = Result<LogRange, RpcError>> {
let init_from_block = filter.and_then(|f| f.get_from_block()).unwrap_or(0);
let init_to_block = filter.and_then(|f| f.get_to_block());
let depth = providers.reorg_detect_depth;
stream! {
let mut from_block = init_from_block;
let mut tip = match last_block(providers).await {
Ok(b) => init_to_block.unwrap_or(b - 2),
Err(e) => {
yield Err(e);
return;
}
};
// The reorg-exposed window boundary: blocks above this number are in the
// window and must be fetched by hash to prevent orphaned data from leaking
// through. When depth == 0, window_floor is u64::MAX (no window).
let window_floor = if depth > 0 { tip.saturating_sub(depth) } else { u64::MAX };
// Loop backfills until lag is resolved. When reorg detection is active the
// loop stops at the window boundary for unbounded (live) fetches; a bounded
// historical fetch is not the reorg-exposed live path, so its whole range is
// fetched by number without a floor stop. When disabled, the legacy < 30 block
// dormancy threshold applies. The two cutoffs do not interact.
while tip > from_block {
let stop_condition = if depth > 0 {
// Only apply the window-floor stop for unbounded live fetches.
// A bounded historical fetch (init_to_block.is_some()) must not stop
// at the floor — doing so silently drops [window_floor, to_block].
init_to_block.is_none() && from_block >= window_floor
} else {
// Legacy: stop when close enough to hand off to the live stream.
tip - from_block < 30 && init_to_block.is_none()
};
if stop_condition {
break;
}
// For unbounded live fetches: clamp below the window (by-hash tail follows).
// For bounded historical fetches: fetch to tip without clamping.
let backfill_to = if depth > 0 && init_to_block.is_none() {
tip.min(window_floor.saturating_sub(1))
} else {
tip
};
if backfill_to < from_block {
break;
}
debug!("Starting backfill for block range {} to {}", from_block, backfill_to);
let batch_filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block).to_block(backfill_to);
let stream = backfill(providers, &batch_filter).await;
pin_mut!(stream);
while let Some(result) = stream.next().await {
match result {
Ok(range) => {
from_block = range.1 + 1;
yield Ok(range);
}
Err(e) => {
yield Err(e);
return;
}
}
}
// Update tip for the next iteration (only when not bounded by an explicit to_block).
if init_to_block.is_none() {
match last_block(providers).await {
Ok(b) => tip = b - 2,
Err(e) => {
yield Err(e);
return;
}
}
}
}
// If reorg detection is active, fetch the in-window tail [from_block, tip]
// by hash. This ensures no orphaned blocks can contaminate events in the
// reorg-exposed zone before the live stream takes over.
if depth > 0 && from_block <= tip && init_to_block.is_none() {
debug!(
"Fetching reorg-exposed window [{}, {}] by canonical block hash",
from_block, tip
);
// Obtain the tip block to get its hash, then walk backward.
let http_provider = providers.connect_http(0);
let tip_block = match http_provider
.get_block_by_number(BlockNumberOrTag::Number(tip))
.await
{
Ok(Some(block)) => block,
Ok(None) => {
yield Err(RpcError::ConnectionError(format!(
"tip block {} not found during window hash walk",
tip
)));
return;
}
Err(e) => {
yield Err(RpcError::ConnectionError(format!(
"failed to fetch tip block {}: {}",
tip, e
)));
return;
}
};
let tip_hash = tip_block.header.hash;
// Walk backward from tip to from_block collecting canonical (number, hash) pairs.
let pairs_desc = match collect_window_hashes(
&http_provider,
tip_hash,
tip,
from_block,
)
.await
{
Ok(pairs) => pairs,
Err(e) => {
yield Err(e);
return;
}
};
// Reverse to ascending order (from_block → tip) for sequential fetch.
let mut pairs = pairs_desc;
pairs.reverse();
// Fetch logs per block pinned to the canonical hash.
match fetch_logs_by_hash_range(providers, &pairs, filter).await {
Ok(ranges) => {
for range in ranges {
from_block = range.1 + 1;
yield Ok(range);
}
}
Err(e) => {
yield Err(e);
return;
}
}
}
if filter.and_then(|f| f.get_to_block()).is_none() {
debug!("No more backfilling needed, streaming live logs from block {}", from_block);
let live_filter = filter.cloned().unwrap_or_else(Filter::new).from_block(from_block);
let heads = stream_heads_with_logs(providers, Some(&live_filter)).await;
pin_mut!(heads);
while let Some(result) = heads.next().await {
yield result;
}
}
}
}
#[cfg(test)]
mod tests {
/// Bounded historical fetch (`init_to_block.is_some()`) with `depth > 0` must
/// not apply the window-floor stop or clamp `backfill_to` below the requested
/// `to_block`. Doing so silently drops `[window_floor, to_block]`.
///
/// This test exercises the predicate logic directly, mirroring `window_boundary`
/// in heads.rs. The full stream I/O is covered by integration tests.
#[test]
fn bounded_fetch_with_depth_no_window_floor_stop() {
let tip: u64 = 200;
let depth: u64 = 10;
let init_to_block: Option<u64> = Some(200);
let window_floor = tip.saturating_sub(depth); // 190
// At window_floor (190), a bounded fetch must NOT stop.
let from_block = window_floor;
let stop_condition = if depth > 0 {
init_to_block.is_none() && from_block >= window_floor
} else {
tip - from_block < 30 && init_to_block.is_none()
};
assert!(
!stop_condition,
"bounded fetch must not stop at the window floor (would drop blocks)"
);
// backfill_to must not be clamped for a bounded fetch — fetch to tip.
let backfill_to = if depth > 0 && init_to_block.is_none() {
tip.min(window_floor.saturating_sub(1))
} else {
tip
};
assert_eq!(
backfill_to, tip,
"bounded fetch must backfill to tip, not window_floor - 1"
);
// Unbounded fetch (init_to_block == None) still stops at the floor.
let unbounded_stop = if depth > 0 {
None::<u64>.is_none() && from_block >= window_floor
} else {
false
};
assert!(unbounded_stop, "unbounded fetch must stop at window floor");
}
}