Skip to main content

net_sdk/
stream.rs

1//! Async stream-based event consumption.
2
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9use futures::Stream;
10use futures::StreamExt;
11
12use net::consumer::Ordering;
13use net::{ConsumeRequest, EventBus, Filter, StoredEvent};
14
15use crate::error::{Result, SdkError};
16
17/// Options for subscribing to events.
18#[derive(Clone, Debug)]
19pub struct SubscribeOpts {
20    pub(crate) limit: usize,
21    pub(crate) filter: Option<Filter>,
22    pub(crate) ordering: Ordering,
23    pub(crate) poll_interval: Duration,
24    pub(crate) max_backoff: Duration,
25}
26
27impl Default for SubscribeOpts {
28    fn default() -> Self {
29        Self {
30            limit: 100,
31            filter: None,
32            ordering: Ordering::None,
33            poll_interval: Duration::from_millis(1),
34            max_backoff: Duration::from_millis(100),
35        }
36    }
37}
38
39impl SubscribeOpts {
40    /// Set the maximum number of events per poll.
41    pub fn limit(mut self, limit: usize) -> Self {
42        self.limit = limit;
43        self
44    }
45
46    /// Set an event filter.
47    pub fn filter(mut self, filter: Filter) -> Self {
48        self.filter = Some(filter);
49        self
50    }
51
52    /// Set the event ordering.
53    pub fn ordering(mut self, ordering: Ordering) -> Self {
54        self.ordering = ordering;
55        self
56    }
57
58    /// Set the base poll interval.
59    ///
60    /// Pre-fix, `Duration::ZERO` was accepted verbatim,
61    /// and combined with a zero `max_backoff` the doubling loop
62    /// at `current_interval = (current_interval * 2).min(max_backoff)`
63    /// resolved to zero forever. The poll-then-zero-sleep-then-
64    /// wake_by_ref path then ran at 100 % CPU on an idle stream.
65    /// Clamp to a minimum of 1 ns so even pathological inputs stay
66    /// out of the spin path; production callers wanting a tight
67    /// poll should set `Duration::from_millis(1)` or similar.
68    pub fn poll_interval(mut self, interval: Duration) -> Self {
69        self.poll_interval = interval.max(MIN_BACKOFF_INTERVAL);
70        self
71    }
72
73    /// Set the maximum backoff interval.
74    ///
75    /// See `poll_interval` — the same zero-collapse hazard
76    /// applies. Clamped to a minimum of 1 ns so the doubling loop
77    /// always parks the task on a real timer rather than spinning.
78    pub fn max_backoff(mut self, max: Duration) -> Self {
79        self.max_backoff = max.max(MIN_BACKOFF_INTERVAL);
80        self
81    }
82}
83
84/// Lower bound on `poll_interval` and `max_backoff`. Anything
85/// shorter would let the doubling-and-sleep loop in `poll_next`
86/// resolve to zero and burn CPU instead of parking on a timer.
87/// 1 ns is below any realistic timer resolution but cleanly above
88/// `Duration::ZERO`, which is the actual danger.
89const MIN_BACKOFF_INTERVAL: Duration = Duration::from_nanos(1);
90
91type PollFuture = Pin<
92    Box<
93        dyn Future<Output = std::result::Result<net::ConsumeResponse, net::error::ConsumerError>>
94            + Send,
95    >,
96>;
97
98/// An async stream of events from the event bus.
99///
100/// Internally polls the bus with adaptive backoff — polls tightly when
101/// events are flowing, backs off when idle.
102pub struct EventStream {
103    bus: Arc<EventBus>,
104    opts: SubscribeOpts,
105    cursor: Option<String>,
106    buffer: Vec<StoredEvent>,
107    buffer_idx: usize,
108    current_interval: Duration,
109    sleep: Option<Pin<Box<tokio::time::Sleep>>>,
110    inflight: Option<PollFuture>,
111}
112
113impl EventStream {
114    pub(crate) fn new(bus: Arc<EventBus>, opts: SubscribeOpts) -> Self {
115        let interval = opts.poll_interval;
116        Self {
117            bus,
118            opts,
119            cursor: None,
120            buffer: Vec::new(),
121            buffer_idx: 0,
122            current_interval: interval,
123            sleep: None,
124            inflight: None,
125        }
126    }
127}
128
129impl Stream for EventStream {
130    type Item = Result<StoredEvent>;
131
132    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133        let this = self.get_mut();
134
135        // Return buffered events first.
136        if this.buffer_idx < this.buffer.len() {
137            let event = this.buffer[this.buffer_idx].clone();
138            this.buffer_idx += 1;
139            return Poll::Ready(Some(Ok(event)));
140        }
141
142        // If we have a sleep pending, wait for it.
143        if let Some(sleep) = &mut this.sleep {
144            match Pin::new(sleep).poll(cx) {
145                Poll::Pending => return Poll::Pending,
146                Poll::Ready(()) => {
147                    this.sleep = None;
148                }
149            }
150        }
151
152        // If we have an in-flight poll, resume it.
153        if this.inflight.is_none() {
154            let mut request = ConsumeRequest::new(this.opts.limit);
155            if let Some(cursor) = &this.cursor {
156                request = request.from(cursor);
157            }
158            if let Some(filter) = &this.opts.filter {
159                request = request.filter(filter.clone());
160            }
161            request = request.ordering(this.opts.ordering);
162
163            let bus = this.bus.clone();
164            this.inflight = Some(Box::pin(async move { bus.poll(request).await }));
165        }
166
167        let fut = this.inflight.as_mut().unwrap();
168        match fut.as_mut().poll(cx) {
169            Poll::Pending => Poll::Pending,
170            Poll::Ready(Err(e)) => {
171                this.inflight = None;
172                Poll::Ready(Some(Err(SdkError::from(e))))
173            }
174            Poll::Ready(Ok(response)) => {
175                this.inflight = None;
176                if response.events.is_empty() {
177                    // Backoff only when the poll made no forward
178                    // progress. Pre-fix this branch fired on
179                    // `events.is_empty()` regardless of
180                    // `response.has_more` / `next_id`: a poll that
181                    // advanced the cursor past records this shard's
182                    // filter didn't match returned an empty batch
183                    // AND a fresh `next_id`, but the doubling fired
184                    // anyway and the wait grew exponentially even
185                    // though forward progress was happening. The
186                    // cursor's advance is the right "made progress"
187                    // signal; reset backoff when next_id changed.
188                    let progressed = response
189                        .next_id
190                        .as_ref()
191                        .map(|new| Some(new) != this.cursor.as_ref())
192                        .unwrap_or(false);
193                    if progressed {
194                        this.cursor = response.next_id;
195                        this.current_interval = this.opts.poll_interval;
196                        cx.waker().wake_by_ref();
197                        return Poll::Pending;
198                    }
199                    // Backoff: double the interval, up to max.
200                    // `current_interval * 2` panics on
201                    // Duration overflow if a caller passed a
202                    // pathological `poll_interval` (close to
203                    // `Duration::MAX`). `saturating_mul` clamps to
204                    // `Duration::MAX` so the bound is the
205                    // `min(max_backoff)` clamp on the next line.
206                    this.current_interval = this
207                        .current_interval
208                        .saturating_mul(2)
209                        .min(this.opts.max_backoff);
210                    let mut sleep = Box::pin(tokio::time::sleep(this.current_interval));
211                    // Poll the sleep once now so the timer registers
212                    // its waker with the executor. Returning Pending
213                    // here parks the task on the timer directly,
214                    // rather than paying an extra round-trip through
215                    // the scheduler (the old code did
216                    // `cx.waker().wake_by_ref()` immediately after
217                    // creating the sleep, forcing one wasted re-poll
218                    // per idle backoff tick).
219                    //
220                    // If the sleep resolves immediately (zero / already-
221                    // elapsed duration), re-wake the task so the next
222                    // `poll_next` kicks off a fresh poll instead of
223                    // silently parking without a wake (cubic code
224                    // review P2).
225                    match sleep.as_mut().poll(cx) {
226                        Poll::Pending => {
227                            this.sleep = Some(sleep);
228                            Poll::Pending
229                        }
230                        Poll::Ready(()) => {
231                            // Don't stash the fired sleep; let the
232                            // next poll build a fresh one.
233                            cx.waker().wake_by_ref();
234                            Poll::Pending
235                        }
236                    }
237                } else {
238                    // Reset backoff on activity.
239                    this.current_interval = this.opts.poll_interval;
240                    this.cursor = response.next_id;
241                    this.buffer = response.events;
242                    this.buffer_idx = 1;
243                    Poll::Ready(Some(Ok(this.buffer[0].clone())))
244                }
245            }
246        }
247    }
248}
249
250/// A typed async stream that deserializes events into `T`.
251pub struct TypedEventStream<T> {
252    inner: EventStream,
253    _marker: std::marker::PhantomData<T>,
254}
255
256impl<T: serde::de::DeserializeOwned> TypedEventStream<T> {
257    pub(crate) fn new(bus: Arc<EventBus>, opts: SubscribeOpts) -> Self {
258        Self {
259            inner: EventStream::new(bus, opts),
260            _marker: std::marker::PhantomData,
261        }
262    }
263}
264
265impl<T: serde::de::DeserializeOwned + Unpin> Stream for TypedEventStream<T> {
266    type Item = Result<T>;
267
268    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269        match self.inner.poll_next_unpin(cx) {
270            Poll::Pending => Poll::Pending,
271            Poll::Ready(None) => Poll::Ready(None),
272            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
273            Poll::Ready(Some(Ok(event))) => {
274                let parsed =
275                    serde_json::from_slice(event.raw.as_ref()).map_err(SdkError::Serialization);
276                Poll::Ready(Some(parsed))
277            }
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    /// `SubscribeOpts::default().poll_interval(ZERO)`
287    /// must not store `Duration::ZERO`. Pre-fix the doubling
288    /// loop at `current_interval * 2` would resolve to zero
289    /// forever, the sleep would resolve immediately, and the
290    /// task would re-wake itself in a tight loop at 100% CPU.
291    #[test]
292    fn poll_interval_clamps_zero_to_minimum() {
293        let opts = SubscribeOpts::default().poll_interval(Duration::ZERO);
294        assert!(
295            opts.poll_interval > Duration::ZERO,
296            "poll_interval(ZERO) must clamp above zero; got {:?}",
297            opts.poll_interval
298        );
299    }
300
301    /// Same clamp on `max_backoff`.
302    #[test]
303    fn max_backoff_clamps_zero_to_minimum() {
304        let opts = SubscribeOpts::default().max_backoff(Duration::ZERO);
305        assert!(
306            opts.max_backoff > Duration::ZERO,
307            "max_backoff(ZERO) must clamp above zero; got {:?}",
308            opts.max_backoff
309        );
310    }
311
312    /// Setting both to zero (the worst case from the
313    /// audit) must still produce a non-zero effective interval.
314    #[test]
315    fn both_zero_still_has_nonzero_intervals() {
316        let opts = SubscribeOpts::default()
317            .poll_interval(Duration::ZERO)
318            .max_backoff(Duration::ZERO);
319        assert!(opts.poll_interval > Duration::ZERO);
320        assert!(opts.max_backoff > Duration::ZERO);
321        // The min() of the doubling loop would clamp current_interval
322        // to max_backoff each tick — confirming that, post-clamp,
323        // the result is still non-zero.
324        let doubled = opts.poll_interval.saturating_mul(2).min(opts.max_backoff);
325        assert!(
326            doubled > Duration::ZERO,
327            "post-clamp doubled interval must be > 0 to avoid spin; got {:?}",
328            doubled
329        );
330    }
331
332    /// `current_interval * 2` panics on Duration
333    /// overflow. `saturating_mul(2)` clamps to `Duration::MAX`.
334    #[test]
335    fn saturating_mul_does_not_panic_on_huge_interval() {
336        // Use the largest Duration that, when doubled, would
337        // overflow `*` (panic) but stay well-defined under
338        // `saturating_mul`.
339        let huge = Duration::from_secs(u64::MAX);
340        // Pre-fix this would have been `huge * 2` and panicked
341        // on overflow when invoked from inside the poll loop.
342        let doubled = huge.saturating_mul(2);
343        assert_eq!(
344            doubled,
345            Duration::MAX,
346            "saturating_mul must clamp to Duration::MAX, not panic"
347        );
348    }
349
350    /// Defaults must remain in the safe range and never trigger
351    /// the bug — guards against future default-tweaks.
352    #[test]
353    fn defaults_are_safe() {
354        let opts = SubscribeOpts::default();
355        assert!(opts.poll_interval > Duration::ZERO);
356        assert!(opts.max_backoff > Duration::ZERO);
357        assert!(opts.poll_interval <= opts.max_backoff);
358    }
359}