net_sdk/stream.rs
1//! Async stream-based event consumption.
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use futures::Stream;
10use futures::StreamExt;
11
12use net::consumer::Ordering;
13use net::{ConsumeRequest, EventBus, Filter, StoredEvent};
14
15use crate::error::{Result, SdkError};
16
17/// Options for subscribing to events.
18#[derive(Clone, Debug)]
19pub struct SubscribeOpts {
20 pub(crate) limit: usize,
21 pub(crate) filter: Option<Filter>,
22 pub(crate) ordering: Ordering,
23 pub(crate) poll_interval: Duration,
24 pub(crate) max_backoff: Duration,
25}
26
27impl Default for SubscribeOpts {
28 fn default() -> Self {
29 Self {
30 limit: 100,
31 filter: None,
32 ordering: Ordering::None,
33 poll_interval: Duration::from_millis(1),
34 max_backoff: Duration::from_millis(100),
35 }
36 }
37}
38
39impl SubscribeOpts {
40 /// Set the maximum number of events per poll.
41 pub fn limit(mut self, limit: usize) -> Self {
42 self.limit = limit;
43 self
44 }
45
46 /// Set an event filter.
47 pub fn filter(mut self, filter: Filter) -> Self {
48 self.filter = Some(filter);
49 self
50 }
51
52 /// Set the event ordering.
53 pub fn ordering(mut self, ordering: Ordering) -> Self {
54 self.ordering = ordering;
55 self
56 }
57
58 /// Set the base poll interval.
59 ///
60 /// Pre-fix, `Duration::ZERO` was accepted verbatim,
61 /// and combined with a zero `max_backoff` the doubling loop
62 /// at `current_interval = (current_interval * 2).min(max_backoff)`
63 /// resolved to zero forever. The poll-then-zero-sleep-then-
64 /// wake_by_ref path then ran at 100 % CPU on an idle stream.
65 /// Clamp to a minimum of 1 ns so even pathological inputs stay
66 /// out of the spin path; production callers wanting a tight
67 /// poll should set `Duration::from_millis(1)` or similar.
68 pub fn poll_interval(mut self, interval: Duration) -> Self {
69 self.poll_interval = interval.max(MIN_BACKOFF_INTERVAL);
70 self
71 }
72
73 /// Set the maximum backoff interval.
74 ///
75 /// See `poll_interval` — the same zero-collapse hazard
76 /// applies. Clamped to a minimum of 1 ns so the doubling loop
77 /// always parks the task on a real timer rather than spinning.
78 pub fn max_backoff(mut self, max: Duration) -> Self {
79 self.max_backoff = max.max(MIN_BACKOFF_INTERVAL);
80 self
81 }
82}
83
84/// Lower bound on `poll_interval` and `max_backoff`. Anything
85/// shorter would let the doubling-and-sleep loop in `poll_next`
86/// resolve to zero and burn CPU instead of parking on a timer.
87/// 1 ns is below any realistic timer resolution but cleanly above
88/// `Duration::ZERO`, which is the actual danger.
89const MIN_BACKOFF_INTERVAL: Duration = Duration::from_nanos(1);
90
91type PollFuture = Pin<
92 Box<
93 dyn Future<Output = std::result::Result<net::ConsumeResponse, net::error::ConsumerError>>
94 + Send,
95 >,
96>;
97
98/// An async stream of events from the event bus.
99///
100/// Internally polls the bus with adaptive backoff — polls tightly when
101/// events are flowing, backs off when idle.
102pub struct EventStream {
103 bus: Arc<EventBus>,
104 opts: SubscribeOpts,
105 cursor: Option<String>,
106 buffer: Vec<StoredEvent>,
107 buffer_idx: usize,
108 current_interval: Duration,
109 sleep: Option<Pin<Box<tokio::time::Sleep>>>,
110 inflight: Option<PollFuture>,
111}
112
113impl EventStream {
114 pub(crate) fn new(bus: Arc<EventBus>, opts: SubscribeOpts) -> Self {
115 let interval = opts.poll_interval;
116 Self {
117 bus,
118 opts,
119 cursor: None,
120 buffer: Vec::new(),
121 buffer_idx: 0,
122 current_interval: interval,
123 sleep: None,
124 inflight: None,
125 }
126 }
127}
128
129impl Stream for EventStream {
130 type Item = Result<StoredEvent>;
131
132 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 let this = self.get_mut();
134
135 // Return buffered events first.
136 if this.buffer_idx < this.buffer.len() {
137 let event = this.buffer[this.buffer_idx].clone();
138 this.buffer_idx += 1;
139 return Poll::Ready(Some(Ok(event)));
140 }
141
142 // If we have a sleep pending, wait for it.
143 if let Some(sleep) = &mut this.sleep {
144 match Pin::new(sleep).poll(cx) {
145 Poll::Pending => return Poll::Pending,
146 Poll::Ready(()) => {
147 this.sleep = None;
148 }
149 }
150 }
151
152 // If we have an in-flight poll, resume it.
153 if this.inflight.is_none() {
154 let mut request = ConsumeRequest::new(this.opts.limit);
155 if let Some(cursor) = &this.cursor {
156 request = request.from(cursor);
157 }
158 if let Some(filter) = &this.opts.filter {
159 request = request.filter(filter.clone());
160 }
161 request = request.ordering(this.opts.ordering);
162
163 let bus = this.bus.clone();
164 this.inflight = Some(Box::pin(async move { bus.poll(request).await }));
165 }
166
167 let fut = this.inflight.as_mut().unwrap();
168 match fut.as_mut().poll(cx) {
169 Poll::Pending => Poll::Pending,
170 Poll::Ready(Err(e)) => {
171 this.inflight = None;
172 Poll::Ready(Some(Err(SdkError::from(e))))
173 }
174 Poll::Ready(Ok(response)) => {
175 this.inflight = None;
176 if response.events.is_empty() {
177 // Backoff only when the poll made no forward
178 // progress. Pre-fix this branch fired on
179 // `events.is_empty()` regardless of
180 // `response.has_more` / `next_id`: a poll that
181 // advanced the cursor past records this shard's
182 // filter didn't match returned an empty batch
183 // AND a fresh `next_id`, but the doubling fired
184 // anyway and the wait grew exponentially even
185 // though forward progress was happening. The
186 // cursor's advance is the right "made progress"
187 // signal; reset backoff when next_id changed.
188 let progressed = response
189 .next_id
190 .as_ref()
191 .map(|new| Some(new) != this.cursor.as_ref())
192 .unwrap_or(false);
193 if progressed {
194 this.cursor = response.next_id;
195 this.current_interval = this.opts.poll_interval;
196 cx.waker().wake_by_ref();
197 return Poll::Pending;
198 }
199 // Backoff: double the interval, up to max.
200 // `current_interval * 2` panics on
201 // Duration overflow if a caller passed a
202 // pathological `poll_interval` (close to
203 // `Duration::MAX`). `saturating_mul` clamps to
204 // `Duration::MAX` so the bound is the
205 // `min(max_backoff)` clamp on the next line.
206 this.current_interval = this
207 .current_interval
208 .saturating_mul(2)
209 .min(this.opts.max_backoff);
210 let mut sleep = Box::pin(tokio::time::sleep(this.current_interval));
211 // Poll the sleep once now so the timer registers
212 // its waker with the executor. Returning Pending
213 // here parks the task on the timer directly,
214 // rather than paying an extra round-trip through
215 // the scheduler (the old code did
216 // `cx.waker().wake_by_ref()` immediately after
217 // creating the sleep, forcing one wasted re-poll
218 // per idle backoff tick).
219 //
220 // If the sleep resolves immediately (zero / already-
221 // elapsed duration), re-wake the task so the next
222 // `poll_next` kicks off a fresh poll instead of
223 // silently parking without a wake (cubic code
224 // review P2).
225 match sleep.as_mut().poll(cx) {
226 Poll::Pending => {
227 this.sleep = Some(sleep);
228 Poll::Pending
229 }
230 Poll::Ready(()) => {
231 // Don't stash the fired sleep; let the
232 // next poll build a fresh one.
233 cx.waker().wake_by_ref();
234 Poll::Pending
235 }
236 }
237 } else {
238 // Reset backoff on activity.
239 this.current_interval = this.opts.poll_interval;
240 this.cursor = response.next_id;
241 this.buffer = response.events;
242 this.buffer_idx = 1;
243 Poll::Ready(Some(Ok(this.buffer[0].clone())))
244 }
245 }
246 }
247 }
248}
249
250/// A typed async stream that deserializes events into `T`.
251pub struct TypedEventStream<T> {
252 inner: EventStream,
253 _marker: std::marker::PhantomData<T>,
254}
255
256impl<T: serde::de::DeserializeOwned> TypedEventStream<T> {
257 pub(crate) fn new(bus: Arc<EventBus>, opts: SubscribeOpts) -> Self {
258 Self {
259 inner: EventStream::new(bus, opts),
260 _marker: std::marker::PhantomData,
261 }
262 }
263}
264
265impl<T: serde::de::DeserializeOwned + Unpin> Stream for TypedEventStream<T> {
266 type Item = Result<T>;
267
268 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269 match self.inner.poll_next_unpin(cx) {
270 Poll::Pending => Poll::Pending,
271 Poll::Ready(None) => Poll::Ready(None),
272 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
273 Poll::Ready(Some(Ok(event))) => {
274 let parsed =
275 serde_json::from_slice(event.raw.as_ref()).map_err(SdkError::Serialization);
276 Poll::Ready(Some(parsed))
277 }
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 /// `SubscribeOpts::default().poll_interval(ZERO)`
287 /// must not store `Duration::ZERO`. Pre-fix the doubling
288 /// loop at `current_interval * 2` would resolve to zero
289 /// forever, the sleep would resolve immediately, and the
290 /// task would re-wake itself in a tight loop at 100% CPU.
291 #[test]
292 fn poll_interval_clamps_zero_to_minimum() {
293 let opts = SubscribeOpts::default().poll_interval(Duration::ZERO);
294 assert!(
295 opts.poll_interval > Duration::ZERO,
296 "poll_interval(ZERO) must clamp above zero; got {:?}",
297 opts.poll_interval
298 );
299 }
300
301 /// Same clamp on `max_backoff`.
302 #[test]
303 fn max_backoff_clamps_zero_to_minimum() {
304 let opts = SubscribeOpts::default().max_backoff(Duration::ZERO);
305 assert!(
306 opts.max_backoff > Duration::ZERO,
307 "max_backoff(ZERO) must clamp above zero; got {:?}",
308 opts.max_backoff
309 );
310 }
311
312 /// Setting both to zero (the worst case from the
313 /// audit) must still produce a non-zero effective interval.
314 #[test]
315 fn both_zero_still_has_nonzero_intervals() {
316 let opts = SubscribeOpts::default()
317 .poll_interval(Duration::ZERO)
318 .max_backoff(Duration::ZERO);
319 assert!(opts.poll_interval > Duration::ZERO);
320 assert!(opts.max_backoff > Duration::ZERO);
321 // The min() of the doubling loop would clamp current_interval
322 // to max_backoff each tick — confirming that, post-clamp,
323 // the result is still non-zero.
324 let doubled = opts.poll_interval.saturating_mul(2).min(opts.max_backoff);
325 assert!(
326 doubled > Duration::ZERO,
327 "post-clamp doubled interval must be > 0 to avoid spin; got {:?}",
328 doubled
329 );
330 }
331
332 /// `current_interval * 2` panics on Duration
333 /// overflow. `saturating_mul(2)` clamps to `Duration::MAX`.
334 #[test]
335 fn saturating_mul_does_not_panic_on_huge_interval() {
336 // Use the largest Duration that, when doubled, would
337 // overflow `*` (panic) but stay well-defined under
338 // `saturating_mul`.
339 let huge = Duration::from_secs(u64::MAX);
340 // Pre-fix this would have been `huge * 2` and panicked
341 // on overflow when invoked from inside the poll loop.
342 let doubled = huge.saturating_mul(2);
343 assert_eq!(
344 doubled,
345 Duration::MAX,
346 "saturating_mul must clamp to Duration::MAX, not panic"
347 );
348 }
349
350 /// Defaults must remain in the safe range and never trigger
351 /// the bug — guards against future default-tweaks.
352 #[test]
353 fn defaults_are_safe() {
354 let opts = SubscribeOpts::default();
355 assert!(opts.poll_interval > Duration::ZERO);
356 assert!(opts.max_backoff > Duration::ZERO);
357 assert!(opts.poll_interval <= opts.max_backoff);
358 }
359}