Skip to main content

datum/stream/
rate.rs

1use super::*;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum OverflowStrategy {
5    DropHead,
6    DropTail,
7    DropBuffer,
8    DropNew,
9    Backpressure,
10    Fail,
11}
12
13pub struct AggregateTimer<Agg> {
14    predicate: Arc<dyn Fn(&Agg) -> bool + Send + Sync>,
15    interval: Duration,
16}
17
18impl<Agg> Clone for AggregateTimer<Agg> {
19    fn clone(&self) -> Self {
20        Self {
21            predicate: Arc::clone(&self.predicate),
22            interval: self.interval,
23        }
24    }
25}
26
27impl<Agg> AggregateTimer<Agg> {
28    #[must_use]
29    pub fn new<F>(predicate: F, interval: Duration) -> Self
30    where
31        F: Fn(&Agg) -> bool + Send + Sync + 'static,
32    {
33        assert!(
34            interval > Duration::ZERO,
35            "aggregate_with_boundary timer interval must be greater than zero"
36        );
37        Self {
38            predicate: Arc::new(predicate),
39            interval,
40        }
41    }
42}
43
44#[derive(Clone)]
45enum TerminalSignal {
46    Complete,
47    Error(StreamError),
48}
49
50struct QueueShared<T> {
51    state: Mutex<QueueState<T>>,
52    available: Condvar,
53    cancelled: Arc<AtomicBool>,
54    capacity: usize,
55}
56
57struct QueueState<T> {
58    queue: VecDeque<T>,
59    terminal: Option<TerminalSignal>,
60}
61
62impl<T> QueueShared<T> {
63    fn new(capacity: usize) -> Arc<Self> {
64        Arc::new(Self {
65            state: Mutex::new(QueueState {
66                queue: VecDeque::with_capacity(capacity),
67                terminal: None,
68            }),
69            available: Condvar::new(),
70            cancelled: Arc::new(AtomicBool::new(false)),
71            capacity,
72        })
73    }
74}
75
76struct QueueStream<T> {
77    shared: Arc<QueueShared<T>>,
78    completion: Option<StreamCompletion<NotUsed>>,
79}
80
81impl<T> Iterator for QueueStream<T> {
82    type Item = StreamResult<T>;
83
84    fn next(&mut self) -> Option<Self::Item> {
85        let mut state = self
86            .shared
87            .state
88            .lock()
89            .unwrap_or_else(|poison| poison.into_inner());
90        loop {
91            if let Some(item) = state.queue.pop_front() {
92                self.shared.available.notify_all();
93                return Some(Ok(item));
94            }
95            if let Some(terminal) = state.terminal.clone() {
96                return match terminal {
97                    TerminalSignal::Complete => None,
98                    TerminalSignal::Error(error) => Some(Err(error)),
99                };
100            }
101            state = self
102                .shared
103                .available
104                .wait(state)
105                .unwrap_or_else(|poison| poison.into_inner());
106        }
107    }
108}
109
110impl<T> Drop for QueueStream<T> {
111    fn drop(&mut self) {
112        self.shared.cancelled.store(true, Ordering::SeqCst);
113        self.shared.available.notify_all();
114        let _ = self.completion.take();
115    }
116}
117
118struct SlotShared<T, Extra> {
119    state: Mutex<SlotState<T, Extra>>,
120    available: Condvar,
121    cancelled: Arc<AtomicBool>,
122}
123
124struct SlotState<T, Extra> {
125    slot: Option<T>,
126    terminal: Option<TerminalSignal>,
127    extra: Extra,
128}
129
130impl<T, Extra> SlotShared<T, Extra> {
131    fn new(extra: Extra) -> Arc<Self> {
132        Arc::new(Self {
133            state: Mutex::new(SlotState {
134                slot: None,
135                terminal: None,
136                extra,
137            }),
138            available: Condvar::new(),
139            cancelled: Arc::new(AtomicBool::new(false)),
140        })
141    }
142}
143
144struct SlotStream<T, Extra> {
145    shared: Arc<SlotShared<T, Extra>>,
146    completion: Option<StreamCompletion<NotUsed>>,
147}
148
149impl<T, Extra> Iterator for SlotStream<T, Extra> {
150    type Item = StreamResult<T>;
151
152    fn next(&mut self) -> Option<Self::Item> {
153        let mut state = self
154            .shared
155            .state
156            .lock()
157            .unwrap_or_else(|poison| poison.into_inner());
158        loop {
159            if let Some(item) = state.slot.take() {
160                self.shared.available.notify_all();
161                return Some(Ok(item));
162            }
163            if let Some(terminal) = state.terminal.clone() {
164                return match terminal {
165                    TerminalSignal::Complete => None,
166                    TerminalSignal::Error(error) => Some(Err(error)),
167                };
168            }
169            state = self
170                .shared
171                .available
172                .wait(state)
173                .unwrap_or_else(|poison| poison.into_inner());
174        }
175    }
176}
177
178impl<T, Extra> Drop for SlotStream<T, Extra> {
179    fn drop(&mut self) {
180        self.shared.cancelled.store(true, Ordering::SeqCst);
181        self.shared.available.notify_all();
182        let _ = self.completion.take();
183    }
184}
185
186fn finish_queue<T>(shared: &QueueShared<T>, terminal: TerminalSignal) {
187    let mut state = shared
188        .state
189        .lock()
190        .unwrap_or_else(|poison| poison.into_inner());
191    if state.terminal.is_none() {
192        state.terminal = Some(terminal);
193    }
194    drop(state);
195    shared.available.notify_all();
196}
197
198fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
199    let mut state = shared
200        .state
201        .lock()
202        .unwrap_or_else(|poison| poison.into_inner());
203    if state.terminal.is_none() {
204        state.terminal = Some(terminal);
205    }
206    drop(state);
207    shared.available.notify_all();
208}
209
210struct ProducerPanicGuard<T> {
211    shared: Arc<QueueShared<T>>,
212    armed: bool,
213}
214
215impl<T> ProducerPanicGuard<T> {
216    fn new(shared: Arc<QueueShared<T>>) -> Self {
217        Self {
218            shared,
219            armed: true,
220        }
221    }
222
223    fn disarm(&mut self) {
224        self.armed = false;
225    }
226}
227
228impl<T> Drop for ProducerPanicGuard<T> {
229    fn drop(&mut self) {
230        if self.armed {
231            finish_queue(
232                &self.shared,
233                TerminalSignal::Error(StreamError::AbruptTermination),
234            );
235        }
236    }
237}
238
239struct SlotProducerPanicGuard<T, Extra> {
240    shared: Arc<SlotShared<T, Extra>>,
241    armed: bool,
242}
243
244impl<T, Extra> SlotProducerPanicGuard<T, Extra> {
245    fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
246        Self {
247            shared,
248            armed: true,
249        }
250    }
251
252    fn disarm(&mut self) {
253        self.armed = false;
254    }
255}
256
257impl<T, Extra> Drop for SlotProducerPanicGuard<T, Extra> {
258    fn drop(&mut self) {
259        if self.armed {
260            finish_slot(
261                &self.shared,
262                TerminalSignal::Error(StreamError::AbruptTermination),
263            );
264        }
265    }
266}
267
268#[derive(Default)]
269struct NoExtra;
270
271struct BatchExtra<In> {
272    remaining: i128,
273    pending: Option<In>,
274}
275
276impl<In> BatchExtra<In> {
277    fn new(limit: u64) -> Self {
278        Self {
279            remaining: i128::from(limit),
280            pending: None,
281        }
282    }
283}
284
285struct BoundaryExtra {
286    ready: bool,
287}
288
289struct BoundaryStream<Agg, Emit> {
290    shared: Arc<SlotShared<Agg, BoundaryExtra>>,
291    completion: Option<StreamCompletion<NotUsed>>,
292    timer: Option<Cancellable>,
293    harvest: Arc<dyn Fn(Agg) -> Emit + Send + Sync>,
294}
295
296impl<Agg, Emit> Iterator for BoundaryStream<Agg, Emit> {
297    type Item = StreamResult<Emit>;
298
299    fn next(&mut self) -> Option<Self::Item> {
300        loop {
301            let (slot, terminal) = {
302                let mut state = self
303                    .shared
304                    .state
305                    .lock()
306                    .unwrap_or_else(|poison| poison.into_inner());
307                loop {
308                    if state.extra.ready {
309                        let slot = state.slot.take();
310                        state.extra.ready = false;
311                        self.shared.available.notify_all();
312                        break (slot, None);
313                    }
314                    if state.terminal.is_some() {
315                        if let Some(slot) = state.slot.take() {
316                            self.shared.available.notify_all();
317                            break (Some(slot), None);
318                        }
319                        break (None, state.terminal.clone());
320                    }
321                    state = self
322                        .shared
323                        .available
324                        .wait(state)
325                        .unwrap_or_else(|poison| poison.into_inner());
326                }
327            };
328
329            if let Some(agg) = slot {
330                return Some(Ok((self.harvest)(agg)));
331            }
332            if let Some(terminal) = terminal {
333                return match terminal {
334                    TerminalSignal::Complete => None,
335                    TerminalSignal::Error(error) => Some(Err(error)),
336                };
337            }
338        }
339    }
340}
341
342impl<Agg, Emit> Drop for BoundaryStream<Agg, Emit> {
343    fn drop(&mut self) {
344        self.shared.cancelled.store(true, Ordering::SeqCst);
345        self.shared.available.notify_all();
346        if let Some(timer) = self.timer.take() {
347            timer.cancel();
348        }
349        let _ = self.completion.take();
350    }
351}
352
353fn buffer_stage<T: Send + 'static>(
354    input: BoxStream<T>,
355    capacity: usize,
356    strategy: OverflowStrategy,
357    materializer: &Materializer,
358) -> StreamResult<BoxStream<T>> {
359    let shared = QueueShared::new(capacity);
360    let producer_shared = Arc::clone(&shared);
361    let cancelled = Arc::clone(&shared.cancelled);
362    let state = Arc::clone(&materializer.inner.state);
363    let completion = materializer.spawn_stream(move |_| {
364        let mut panic_guard = ProducerPanicGuard::new(Arc::clone(&producer_shared));
365        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
366        loop {
367            if cancelled.load(Ordering::SeqCst) {
368                panic_guard.disarm();
369                return Ok(NotUsed);
370            }
371
372            match input.next() {
373                Some(Ok(item)) => {
374                    let mut guard = producer_shared
375                        .state
376                        .lock()
377                        .unwrap_or_else(|poison| poison.into_inner());
378                    match strategy {
379                        OverflowStrategy::Backpressure => {
380                            while guard.queue.len() == producer_shared.capacity
381                                && !cancelled.load(Ordering::SeqCst)
382                            {
383                                guard = producer_shared
384                                    .available
385                                    .wait(guard)
386                                    .unwrap_or_else(|poison| poison.into_inner());
387                            }
388                            if cancelled.load(Ordering::SeqCst) {
389                                panic_guard.disarm();
390                                return Ok(NotUsed);
391                            }
392                            guard.queue.push_back(item);
393                        }
394                        OverflowStrategy::DropHead => {
395                            if guard.queue.len() == producer_shared.capacity {
396                                let _ = guard.queue.pop_front();
397                            }
398                            guard.queue.push_back(item);
399                        }
400                        OverflowStrategy::DropTail => {
401                            if guard.queue.len() == producer_shared.capacity {
402                                let _ = guard.queue.pop_back();
403                            }
404                            guard.queue.push_back(item);
405                        }
406                        OverflowStrategy::DropBuffer => {
407                            if guard.queue.len() == producer_shared.capacity {
408                                guard.queue.clear();
409                            }
410                            guard.queue.push_back(item);
411                        }
412                        OverflowStrategy::DropNew => {
413                            if guard.queue.len() < producer_shared.capacity {
414                                guard.queue.push_back(item);
415                            }
416                        }
417                        OverflowStrategy::Fail => {
418                            if guard.queue.len() == producer_shared.capacity {
419                                guard.queue.clear();
420                                drop(guard);
421                                panic_guard.disarm();
422                                finish_queue(
423                                    &producer_shared,
424                                    TerminalSignal::Error(StreamError::Failed(format!(
425                                        "Buffer overflow (max capacity was: {capacity})!"
426                                    ))),
427                                );
428                                return Ok(NotUsed);
429                            }
430                            guard.queue.push_back(item);
431                        }
432                    }
433                    drop(guard);
434                    producer_shared.available.notify_all();
435                }
436                Some(Err(error)) => {
437                    panic_guard.disarm();
438                    finish_queue(&producer_shared, TerminalSignal::Error(error));
439                    return Ok(NotUsed);
440                }
441                None => {
442                    panic_guard.disarm();
443                    finish_queue(&producer_shared, TerminalSignal::Complete);
444                    return Ok(NotUsed);
445                }
446            }
447        }
448    });
449
450    Ok(Box::new(QueueStream {
451        shared,
452        completion: Some(completion),
453    }))
454}
455
456fn batch_stage<In, Agg, Cost, Seed, Aggregate>(
457    input: BoxStream<In>,
458    limit: u64,
459    cost_fn: Arc<Cost>,
460    seed: Arc<Seed>,
461    aggregate: Arc<Aggregate>,
462    materializer: &Materializer,
463) -> StreamResult<BoxStream<Agg>>
464where
465    In: Send + 'static,
466    Agg: Send + 'static,
467    Cost: Fn(&In) -> u64 + Send + Sync + 'static,
468    Seed: Fn(In) -> Agg + Send + Sync + 'static,
469    Aggregate: Fn(Agg, In) -> Agg + Send + Sync + 'static,
470{
471    let shared = SlotShared::new(BatchExtra::new(limit));
472    let producer_shared = Arc::clone(&shared);
473    let cancelled = Arc::clone(&shared.cancelled);
474    let state = Arc::clone(&materializer.inner.state);
475    let completion = materializer.spawn_stream(move |_| {
476        let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
477        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
478        let mut carry = None::<In>;
479
480        loop {
481            if cancelled.load(Ordering::SeqCst) {
482                panic_guard.disarm();
483                return Ok(NotUsed);
484            }
485
486            let next = if let Some(item) = carry.take() {
487                Some(Ok(item))
488            } else {
489                input.next()
490            };
491
492            match next {
493                Some(Ok(item)) => {
494                    let cost = i128::from(cost_fn(&item));
495                    let current = {
496                        let mut guard = producer_shared
497                            .state
498                            .lock()
499                            .unwrap_or_else(|poison| poison.into_inner());
500
501                        if guard.slot.is_none() {
502                            None
503                        } else if guard.extra.remaining < cost {
504                            guard.extra.pending = Some(item);
505                            while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
506                                guard = producer_shared
507                                    .available
508                                    .wait(guard)
509                                    .unwrap_or_else(|poison| poison.into_inner());
510                            }
511                            if cancelled.load(Ordering::SeqCst) {
512                                panic_guard.disarm();
513                                return Ok(NotUsed);
514                            }
515                            carry = guard.extra.pending.take();
516                            guard.extra.remaining = i128::from(limit);
517                            continue;
518                        } else {
519                            let current = guard.slot.take();
520                            guard.extra.remaining -= cost;
521                            current
522                        }
523                    };
524
525                    match current {
526                        None => {
527                            let next_agg = seed(item);
528                            let mut guard = producer_shared
529                                .state
530                                .lock()
531                                .unwrap_or_else(|poison| poison.into_inner());
532                            guard.extra.remaining = i128::from(limit) - cost;
533                            guard.slot = Some(next_agg);
534                            drop(guard);
535                            producer_shared.available.notify_all();
536                        }
537                        Some(current) => {
538                            let next_agg = aggregate(current, item);
539                            let mut guard = producer_shared
540                                .state
541                                .lock()
542                                .unwrap_or_else(|poison| poison.into_inner());
543                            guard.slot = Some(next_agg);
544                            drop(guard);
545                            producer_shared.available.notify_all();
546                        }
547                    }
548                }
549                Some(Err(error)) => {
550                    panic_guard.disarm();
551                    finish_slot(&producer_shared, TerminalSignal::Error(error));
552                    return Ok(NotUsed);
553                }
554                None => {
555                    panic_guard.disarm();
556                    finish_slot(&producer_shared, TerminalSignal::Complete);
557                    return Ok(NotUsed);
558                }
559            }
560        }
561    });
562
563    Ok(Box::new(SlotStream {
564        shared,
565        completion: Some(completion),
566    }))
567}
568
569fn expand_stage<In, Out, Expand, Iter>(
570    input: BoxStream<In>,
571    expand: Arc<Expand>,
572    initial: Option<Box<dyn Iterator<Item = Out> + Send>>,
573    materializer: &Materializer,
574) -> StreamResult<BoxStream<Out>>
575where
576    In: Send + 'static,
577    Out: Send + 'static,
578    Expand: Fn(In) -> Iter + Send + Sync + 'static,
579    Iter: Iterator<Item = Out> + Send + 'static,
580{
581    let shared = SlotShared::new(NoExtra);
582    let producer_shared = Arc::clone(&shared);
583    let cancelled = Arc::clone(&shared.cancelled);
584    let state = Arc::clone(&materializer.inner.state);
585    let completion = materializer.spawn_stream(move |_| {
586        let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
587        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
588        loop {
589            if cancelled.load(Ordering::SeqCst) {
590                panic_guard.disarm();
591                return Ok(NotUsed);
592            }
593
594            match input.next() {
595                Some(Ok(item)) => {
596                    let mut guard = producer_shared
597                        .state
598                        .lock()
599                        .unwrap_or_else(|poison| poison.into_inner());
600                    while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
601                        guard = producer_shared
602                            .available
603                            .wait(guard)
604                            .unwrap_or_else(|poison| poison.into_inner());
605                    }
606                    if cancelled.load(Ordering::SeqCst) {
607                        panic_guard.disarm();
608                        return Ok(NotUsed);
609                    }
610                    guard.slot = Some(item);
611                    drop(guard);
612                    producer_shared.available.notify_all();
613                }
614                Some(Err(error)) => {
615                    panic_guard.disarm();
616                    finish_slot(&producer_shared, TerminalSignal::Error(error));
617                    return Ok(NotUsed);
618                }
619                None => {
620                    panic_guard.disarm();
621                    finish_slot(&producer_shared, TerminalSignal::Complete);
622                    return Ok(NotUsed);
623                }
624            }
625        }
626    });
627
628    Ok(Box::new(ExpandStream {
629        shared,
630        completion: Some(completion),
631        current: initial,
632        expanded_once: false,
633        seeded_from_upstream: false,
634        expand,
635    }))
636}
637
638struct ExpandStream<In, Out, Expand, Iter>
639where
640    Expand: Fn(In) -> Iter + Send + Sync + 'static,
641    Iter: Iterator<Item = Out> + Send + 'static,
642{
643    shared: Arc<SlotShared<In, NoExtra>>,
644    completion: Option<StreamCompletion<NotUsed>>,
645    current: Option<Box<dyn Iterator<Item = Out> + Send>>,
646    expanded_once: bool,
647    /// Whether `current` was built from a real upstream element (vs the
648    /// synthetic `initial` extrapolation seed). Only a real element's first
649    /// emission is guaranteed; the seed yields to a slot that filled first.
650    seeded_from_upstream: bool,
651    expand: Arc<Expand>,
652}
653
654impl<In, Out, Expand, Iter> Iterator for ExpandStream<In, Out, Expand, Iter>
655where
656    In: Send + 'static,
657    Out: Send + 'static,
658    Expand: Fn(In) -> Iter + Send + Sync + 'static,
659    Iter: Iterator<Item = Out> + Send + 'static,
660{
661    type Item = StreamResult<Out>;
662
663    fn next(&mut self) -> Option<Self::Item> {
664        loop {
665            // Guaranteed first emission for an iterator built from a real
666            // upstream element: deliver its head before consulting the slot
667            // or terminal again. The synthetic `initial` seed does NOT get
668            // this priority — a real element that arrived first wins (see the
669            // emit-initial decision below).
670            if let Some(current) = &mut self.current
671                && !self.expanded_once
672                && self.seeded_from_upstream
673            {
674                if let Some(item) = current.next() {
675                    self.expanded_once = true;
676                    return Some(Ok(item));
677                }
678                self.current = None;
679                self.expanded_once = false;
680            }
681
682            // Decide the next action under the lock, but never run user code
683            // (the `expand` closure or the extrapolation iterator) while
684            // holding it: a slow or blocking user iterator must not stall the
685            // producer's slot handoff.
686            enum Decision<In> {
687                NewElement(In),
688                Extrapolate,
689                EmitInitial,
690                Terminal(TerminalSignal),
691            }
692
693            let decision = {
694                let mut state = self
695                    .shared
696                    .state
697                    .lock()
698                    .unwrap_or_else(|poison| poison.into_inner());
699                loop {
700                    if let Some(item) = state.slot.take() {
701                        self.shared.available.notify_all();
702                        break Decision::NewElement(item);
703                    }
704
705                    if let Some(terminal) = state.terminal.clone() {
706                        break Decision::Terminal(terminal);
707                    }
708
709                    if self.current.is_some() {
710                        break if self.expanded_once {
711                            Decision::Extrapolate
712                        } else {
713                            // Un-emitted `initial` seed and the slot is empty:
714                            // downstream pulled before upstream produced.
715                            Decision::EmitInitial
716                        };
717                    }
718
719                    state = self
720                        .shared
721                        .available
722                        .wait(state)
723                        .unwrap_or_else(|poison| poison.into_inner());
724                }
725            };
726
727            match decision {
728                Decision::NewElement(item) => {
729                    self.current = Some(Box::new((self.expand)(item)));
730                    self.expanded_once = false;
731                    self.seeded_from_upstream = true;
732                }
733                Decision::EmitInitial => {
734                    if let Some(current) = &mut self.current
735                        && let Some(item) = current.next()
736                    {
737                        self.expanded_once = true;
738                        return Some(Ok(item));
739                    }
740                    self.current = None;
741                    self.expanded_once = false;
742                }
743                Decision::Extrapolate => {
744                    if let Some(current) = &mut self.current
745                        && let Some(item) = current.next()
746                    {
747                        return Some(Ok(item));
748                    }
749                    self.current = None;
750                    self.expanded_once = false;
751                }
752                Decision::Terminal(terminal) => {
753                    return match terminal {
754                        TerminalSignal::Complete => None,
755                        TerminalSignal::Error(error) => Some(Err(error)),
756                    };
757                }
758            }
759        }
760    }
761}
762
763impl<In, Out, Expand, Iter> Drop for ExpandStream<In, Out, Expand, Iter>
764where
765    Expand: Fn(In) -> Iter + Send + Sync + 'static,
766    Iter: Iterator<Item = Out> + Send + 'static,
767{
768    fn drop(&mut self) {
769        self.shared.cancelled.store(true, Ordering::SeqCst);
770        self.shared.available.notify_all();
771        let _ = self.completion.take();
772    }
773}
774
775fn aggregate_with_boundary_stage<In, Agg, Emit, Allocate, Aggregate, Harvest>(
776    input: BoxStream<In>,
777    allocate: Arc<Allocate>,
778    aggregate: Arc<Aggregate>,
779    harvest: Arc<Harvest>,
780    emit_on_timer: Option<AggregateTimer<Agg>>,
781    materializer: &Materializer,
782) -> StreamResult<BoxStream<Emit>>
783where
784    In: Send + 'static,
785    Agg: Send + 'static,
786    Emit: Send + 'static,
787    Allocate: Fn() -> Agg + Send + Sync + 'static,
788    Aggregate: Fn(Agg, In) -> (Agg, bool) + Send + Sync + 'static,
789    Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
790{
791    let shared = SlotShared::new(BoundaryExtra { ready: false });
792    let producer_shared = Arc::clone(&shared);
793    let cancelled = Arc::clone(&shared.cancelled);
794    let state = Arc::clone(&materializer.inner.state);
795    let completion = materializer.spawn_stream(move |_| {
796        let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
797        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
798        loop {
799            if cancelled.load(Ordering::SeqCst) {
800                panic_guard.disarm();
801                return Ok(NotUsed);
802            }
803
804            match input.next() {
805                Some(Ok(item)) => {
806                    let current = {
807                        let mut guard = producer_shared
808                            .state
809                            .lock()
810                            .unwrap_or_else(|poison| poison.into_inner());
811                        while guard.extra.ready && !cancelled.load(Ordering::SeqCst) {
812                            guard = producer_shared
813                                .available
814                                .wait(guard)
815                                .unwrap_or_else(|poison| poison.into_inner());
816                        }
817                        if cancelled.load(Ordering::SeqCst) {
818                            panic_guard.disarm();
819                            return Ok(NotUsed);
820                        }
821                        guard.slot.take()
822                    };
823
824                    let current = current.unwrap_or_else(|| allocate());
825                    let (updated, ready) = aggregate(current, item);
826                    let mut guard = producer_shared
827                        .state
828                        .lock()
829                        .unwrap_or_else(|poison| poison.into_inner());
830                    guard.slot = Some(updated);
831                    guard.extra.ready = ready;
832                    drop(guard);
833                    producer_shared.available.notify_all();
834                }
835                Some(Err(error)) => {
836                    panic_guard.disarm();
837                    finish_slot(&producer_shared, TerminalSignal::Error(error));
838                    return Ok(NotUsed);
839                }
840                None => {
841                    panic_guard.disarm();
842                    finish_slot(&producer_shared, TerminalSignal::Complete);
843                    return Ok(NotUsed);
844                }
845            }
846        }
847    });
848
849    let timer = emit_on_timer.map(|timer| {
850        let shared = Arc::clone(&shared);
851        let cancelled = Arc::clone(&shared.cancelled);
852        materializer.schedule_with_fixed_delay(timer.interval, timer.interval, move || {
853            if cancelled.load(Ordering::SeqCst) {
854                return;
855            }
856            let should_emit = {
857                let state = shared
858                    .state
859                    .lock()
860                    .unwrap_or_else(|poison| poison.into_inner());
861                if state.slot.is_none() || state.extra.ready || state.terminal.is_some() {
862                    None
863                } else {
864                    Some(std::panic::catch_unwind(std::panic::AssertUnwindSafe(
865                        || (timer.predicate)(state.slot.as_ref().expect("aggregate present")),
866                    )))
867                }
868            };
869            match should_emit {
870                Some(Ok(true)) => {
871                    let mut state = shared
872                        .state
873                        .lock()
874                        .unwrap_or_else(|poison| poison.into_inner());
875                    if state.slot.is_some() && !state.extra.ready && state.terminal.is_none() {
876                        state.extra.ready = true;
877                    }
878                    drop(state);
879                    shared.available.notify_all();
880                }
881                Some(Ok(false)) | None => {}
882                Some(Err(_)) => {
883                    finish_slot(
884                        &shared,
885                        TerminalSignal::Error(StreamError::AbruptTermination),
886                    );
887                }
888            }
889        })
890    });
891
892    Ok(Box::new(BoundaryStream {
893        shared,
894        completion: Some(completion),
895        timer,
896        harvest,
897    }))
898}
899
900impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
901    pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Flow<In, Out, Mat> {
902        assert!(size > 0, "buffer size must be greater than zero");
903        self.via(Flow::from_runtime_transform(move |input, materializer| {
904            buffer_stage(input, size, strategy, materializer)
905        }))
906    }
907
908    pub fn conflate_with_seed<Agg, Seed, Aggregate>(
909        self,
910        seed: Seed,
911        aggregate: Aggregate,
912    ) -> Flow<In, Agg, Mat>
913    where
914        Agg: Send + 'static,
915        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
916        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
917    {
918        let seed = Arc::new(seed);
919        let aggregate = Arc::new(aggregate);
920        self.via(Flow::from_runtime_transform(move |input, materializer| {
921            batch_stage(
922                input,
923                1,
924                Arc::new(|_: &Out| 0),
925                Arc::clone(&seed),
926                Arc::clone(&aggregate),
927                materializer,
928            )
929        }))
930    }
931
932    pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
933        self.conflate_with_seed(|item| item, aggregate)
934    }
935
936    pub fn batch<Agg, Seed, Aggregate>(
937        self,
938        max: u64,
939        seed: Seed,
940        aggregate: Aggregate,
941    ) -> Flow<In, Agg, Mat>
942    where
943        Agg: Send + 'static,
944        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
945        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
946    {
947        assert!(max > 0, "batch max must be greater than zero");
948        let seed = Arc::new(seed);
949        let aggregate = Arc::new(aggregate);
950        self.via(Flow::from_runtime_transform(move |input, materializer| {
951            batch_stage(
952                input,
953                max,
954                Arc::new(|_: &Out| 1),
955                Arc::clone(&seed),
956                Arc::clone(&aggregate),
957                materializer,
958            )
959        }))
960    }
961
962    pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
963        self,
964        max: u64,
965        cost_fn: Cost,
966        seed: Seed,
967        aggregate: Aggregate,
968    ) -> Flow<In, Agg, Mat>
969    where
970        Agg: Send + 'static,
971        Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
972        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
973        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
974    {
975        assert!(max > 0, "batch_weighted max must be greater than zero");
976        let cost_fn = Arc::new(cost_fn);
977        let seed = Arc::new(seed);
978        let aggregate = Arc::new(aggregate);
979        self.via(Flow::from_runtime_transform(move |input, materializer| {
980            batch_stage(
981                input,
982                max,
983                Arc::clone(&cost_fn),
984                Arc::clone(&seed),
985                Arc::clone(&aggregate),
986                materializer,
987            )
988        }))
989    }
990
991    pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Flow<In, Next, Mat>
992    where
993        Next: Send + 'static,
994        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
995        Iter: Iterator<Item = Next> + Send + 'static,
996    {
997        let expand = Arc::new(expand);
998        self.via(Flow::from_runtime_transform(move |input, materializer| {
999            expand_stage(input, Arc::clone(&expand), None, materializer)
1000        }))
1001    }
1002
1003    pub fn extrapolate<Expand, Iter>(
1004        self,
1005        extrapolator: Expand,
1006        initial: Option<Out>,
1007    ) -> Flow<In, Out, Mat>
1008    where
1009        Out: Clone + Sync,
1010        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1011        Iter: Iterator<Item = Out> + Send + 'static,
1012    {
1013        let extrapolator = Arc::new(extrapolator);
1014        self.via(Flow::from_runtime_transform(move |input, materializer| {
1015            let extrapolator = Arc::clone(&extrapolator);
1016            let initial = initial.clone().map(|item| {
1017                Box::new(std::iter::once(item)) as Box<dyn Iterator<Item = Out> + Send>
1018            });
1019            expand_stage(
1020                input,
1021                Arc::new(move |item: Out| {
1022                    std::iter::once(item.clone()).chain((extrapolator)(item))
1023                }),
1024                initial,
1025                materializer,
1026            )
1027        }))
1028    }
1029
1030    pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1031        self,
1032        allocate: Allocate,
1033        aggregate: Aggregate,
1034        harvest: Harvest,
1035        emit_on_timer: Option<AggregateTimer<Agg>>,
1036    ) -> Flow<In, Emit, Mat>
1037    where
1038        Agg: Send + 'static,
1039        Emit: Send + 'static,
1040        Allocate: Fn() -> Agg + Send + Sync + 'static,
1041        Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1042        Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1043    {
1044        let allocate = Arc::new(allocate);
1045        let aggregate = Arc::new(aggregate);
1046        let harvest = Arc::new(harvest);
1047        self.via(Flow::from_runtime_transform(move |input, materializer| {
1048            aggregate_with_boundary_stage(
1049                input,
1050                Arc::clone(&allocate),
1051                Arc::clone(&aggregate),
1052                Arc::clone(&harvest),
1053                emit_on_timer.clone(),
1054                materializer,
1055            )
1056        }))
1057    }
1058
1059    pub fn detach(self) -> Flow<In, Out, Mat> {
1060        self.buffer(1, OverflowStrategy::Backpressure)
1061    }
1062}
1063
1064impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1065    pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Self {
1066        self.via(Flow::identity().buffer(size, strategy))
1067    }
1068
1069    pub fn conflate_with_seed<Agg, Seed, Aggregate>(
1070        self,
1071        seed: Seed,
1072        aggregate: Aggregate,
1073    ) -> Source<Agg, Mat>
1074    where
1075        Agg: Send + 'static,
1076        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1077        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1078    {
1079        self.via(Flow::identity().conflate_with_seed(seed, aggregate))
1080    }
1081
1082    pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
1083        self.via(Flow::identity().conflate(aggregate))
1084    }
1085
1086    pub fn batch<Agg, Seed, Aggregate>(
1087        self,
1088        max: u64,
1089        seed: Seed,
1090        aggregate: Aggregate,
1091    ) -> Source<Agg, Mat>
1092    where
1093        Agg: Send + 'static,
1094        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1095        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1096    {
1097        self.via(Flow::identity().batch(max, seed, aggregate))
1098    }
1099
1100    pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
1101        self,
1102        max: u64,
1103        cost_fn: Cost,
1104        seed: Seed,
1105        aggregate: Aggregate,
1106    ) -> Source<Agg, Mat>
1107    where
1108        Agg: Send + 'static,
1109        Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
1110        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1111        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1112    {
1113        self.via(Flow::identity().batch_weighted(max, cost_fn, seed, aggregate))
1114    }
1115
1116    pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Source<Next, Mat>
1117    where
1118        Next: Send + 'static,
1119        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1120        Iter: Iterator<Item = Next> + Send + 'static,
1121    {
1122        self.via(Flow::identity().expand(expand))
1123    }
1124
1125    pub fn extrapolate<Expand, Iter>(self, extrapolator: Expand, initial: Option<Out>) -> Self
1126    where
1127        Out: Clone + Sync,
1128        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1129        Iter: Iterator<Item = Out> + Send + 'static,
1130    {
1131        self.via(Flow::identity().extrapolate(extrapolator, initial))
1132    }
1133
1134    pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1135        self,
1136        allocate: Allocate,
1137        aggregate: Aggregate,
1138        harvest: Harvest,
1139        emit_on_timer: Option<AggregateTimer<Agg>>,
1140    ) -> Source<Emit, Mat>
1141    where
1142        Agg: Send + 'static,
1143        Emit: Send + 'static,
1144        Allocate: Fn() -> Agg + Send + Sync + 'static,
1145        Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1146        Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1147    {
1148        self.via(Flow::identity().aggregate_with_boundary(
1149            allocate,
1150            aggregate,
1151            harvest,
1152            emit_on_timer,
1153        ))
1154    }
1155
1156    pub fn detach(self) -> Self {
1157        self.via(Flow::identity().detach())
1158    }
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163    use super::*;
1164    use crate::testkit::{TestSink, TestSource};
1165    use std::sync::mpsc;
1166    use std::time::Duration;
1167
1168    fn materialize_buffered_stream<T: Send + 'static>(
1169        source: Source<T>,
1170    ) -> (BoxStream<T>, Materializer) {
1171        let materializer = Materializer::new();
1172        let (stream, _) = Arc::clone(&source.factory)
1173            .create(&materializer)
1174            .expect("buffer source materializes");
1175        (stream, materializer)
1176    }
1177
1178    fn buffered_probe(
1179        strategy: OverflowStrategy,
1180    ) -> (
1181        crate::testkit::TestPublisherProbe<i32>,
1182        crate::testkit::TestSubscriberProbe<i32>,
1183    ) {
1184        TestSource::probe::<i32>()
1185            .buffer(2, strategy)
1186            .to_mat(TestSink::probe(), Keep::both)
1187            .run()
1188            .expect("buffer probe materializes")
1189    }
1190
1191    fn rate_probe<T, U>(
1192        flow: impl FnOnce(
1193            Source<T, crate::testkit::TestPublisherProbe<T>>,
1194        ) -> Source<U, crate::testkit::TestPublisherProbe<T>>,
1195    ) -> (
1196        crate::testkit::TestPublisherProbe<T>,
1197        crate::testkit::TestSubscriberProbe<U>,
1198    )
1199    where
1200        T: Send + 'static,
1201        U: Send + 'static,
1202    {
1203        flow(TestSource::probe::<T>())
1204            .to_mat(TestSink::probe(), Keep::both)
1205            .run()
1206            .expect("rate probe materializes")
1207    }
1208
1209    #[test]
1210    fn buffer_drop_head_drops_oldest_buffered_elements() {
1211        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropHead);
1212        publisher.expect_request();
1213        publisher.send_next(1);
1214        publisher.expect_request();
1215        publisher.send_next(2);
1216        publisher.expect_request();
1217        publisher.send_next(3);
1218        publisher.expect_request();
1219        publisher.send_next(4);
1220        publisher.expect_request();
1221        publisher.send_complete();
1222
1223        subscriber.request(3);
1224        assert_eq!(subscriber.expect_next(), 3);
1225        assert_eq!(subscriber.expect_next(), 4);
1226        subscriber.expect_complete();
1227    }
1228
1229    #[test]
1230    fn buffer_pulls_eagerly_before_downstream_demand() {
1231        let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1232        publisher.set_timeout(Duration::from_millis(250));
1233        subscriber.set_timeout(Duration::from_millis(250));
1234
1235        publisher.expect_request();
1236        publisher.send_next(1);
1237        publisher.expect_request();
1238        publisher.send_next(2);
1239        subscriber.expect_no_message(Duration::from_millis(250));
1240
1241        subscriber.request(2);
1242        assert_eq!(subscriber.expect_next(), 1);
1243        assert_eq!(subscriber.expect_next(), 2);
1244    }
1245
1246    #[test]
1247    fn buffer_drop_tail_drops_newest_buffered_element() {
1248        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropTail);
1249        publisher.expect_request();
1250        publisher.send_next(1);
1251        publisher.expect_request();
1252        publisher.send_next(2);
1253        publisher.expect_request();
1254        publisher.send_next(3);
1255        publisher.expect_request();
1256        publisher.send_next(4);
1257        publisher.expect_request();
1258        publisher.send_complete();
1259
1260        subscriber.request(3);
1261        assert_eq!(subscriber.expect_next(), 1);
1262        assert_eq!(subscriber.expect_next(), 4);
1263        subscriber.expect_complete();
1264    }
1265
1266    #[test]
1267    fn buffer_drop_buffer_drops_all_buffered_elements() {
1268        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropBuffer);
1269        publisher.expect_request();
1270        publisher.send_next(1);
1271        publisher.expect_request();
1272        publisher.send_next(2);
1273        publisher.expect_request();
1274        publisher.send_next(3);
1275        publisher.expect_request();
1276        publisher.send_next(4);
1277        publisher.expect_request();
1278        publisher.send_complete();
1279
1280        subscriber.request(3);
1281        assert_eq!(subscriber.expect_next(), 3);
1282        assert_eq!(subscriber.expect_next(), 4);
1283        subscriber.expect_complete();
1284    }
1285
1286    #[test]
1287    fn buffer_drop_new_drops_incoming_elements() {
1288        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropNew);
1289        publisher.expect_request();
1290        publisher.send_next(1);
1291        publisher.expect_request();
1292        publisher.send_next(2);
1293        publisher.expect_request();
1294        publisher.send_next(3);
1295        publisher.expect_request();
1296        publisher.send_next(4);
1297        publisher.expect_request();
1298        publisher.send_complete();
1299
1300        subscriber.request(3);
1301        assert_eq!(subscriber.expect_next(), 1);
1302        assert_eq!(subscriber.expect_next(), 2);
1303        subscriber.expect_complete();
1304    }
1305
1306    #[test]
1307    fn buffer_backpressure_stops_pulling_when_full() {
1308        let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1309        publisher.expect_request();
1310        publisher.send_next(1);
1311        publisher.expect_request();
1312        publisher.send_next(2);
1313        publisher.set_timeout(Duration::from_millis(250));
1314        subscriber.set_timeout(Duration::from_millis(250));
1315        subscriber.expect_no_message(Duration::from_millis(250));
1316
1317        subscriber.request(1);
1318        assert_eq!(subscriber.expect_next(), 1);
1319        publisher.expect_request();
1320        publisher.send_next(3);
1321        publisher.send_complete();
1322
1323        subscriber.request(3);
1324        assert_eq!(subscriber.expect_next(), 2);
1325        assert_eq!(subscriber.expect_next(), 3);
1326        subscriber.expect_complete();
1327    }
1328
1329    #[test]
1330    fn buffer_fail_surfaces_overflow_error() {
1331        let (publisher, subscriber) = buffered_probe(OverflowStrategy::Fail);
1332        publisher.expect_request();
1333        publisher.send_next(1);
1334        publisher.expect_request();
1335        publisher.send_next(2);
1336        publisher.expect_request();
1337        publisher.send_next(3);
1338        publisher.expect_cancellation();
1339
1340        subscriber.request(1);
1341        assert_eq!(
1342            subscriber.expect_error(),
1343            StreamError::Failed("Buffer overflow (max capacity was: 2)!".to_owned())
1344        );
1345    }
1346
1347    #[test]
1348    fn buffer_terminal_completion_is_sticky_across_repolls() {
1349        let (mut stream, _materializer) = materialize_buffered_stream(
1350            Source::from_iter([1, 2]).buffer(2, OverflowStrategy::Backpressure),
1351        );
1352
1353        assert_eq!(stream.next(), Some(Ok(1)));
1354        assert_eq!(stream.next(), Some(Ok(2)));
1355        assert_eq!(stream.next(), None);
1356        assert_eq!(stream.next(), None);
1357    }
1358
1359    #[test]
1360    fn buffer_terminal_failure_is_sticky_across_repolls() {
1361        let (mut stream, _materializer) = materialize_buffered_stream(
1362            Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1363                .buffer(2, OverflowStrategy::Backpressure),
1364        );
1365
1366        assert_eq!(
1367            stream.next(),
1368            Some(Err(StreamError::Failed("boom".to_owned())))
1369        );
1370        assert_eq!(
1371            stream.next(),
1372            Some(Err(StreamError::Failed("boom".to_owned())))
1373        );
1374    }
1375
1376    #[test]
1377    fn buffer_surfaces_producer_panics_as_stream_failure() {
1378        let (sender, receiver) = mpsc::channel();
1379
1380        std::thread::spawn(move || {
1381            let (mut stream, _materializer) = materialize_buffered_stream(
1382                Source::from_iter([1, 2, 3])
1383                    .map(|item| {
1384                        if item == 2 {
1385                            panic!("boom");
1386                        }
1387                        item
1388                    })
1389                    .buffer(2, OverflowStrategy::Backpressure),
1390            );
1391
1392            sender
1393                .send((stream.next(), stream.next(), stream.next()))
1394                .expect("test thread sends buffered panic results");
1395        });
1396
1397        let (first, second, third) = receiver
1398            .recv_timeout(Duration::from_secs(1))
1399            .expect("buffer panic path should not hang");
1400        assert_eq!(first, Some(Ok(1)));
1401        assert_eq!(second, Some(Err(StreamError::AbruptTermination)));
1402        assert_eq!(third, Some(Err(StreamError::AbruptTermination)));
1403    }
1404
1405    #[test]
1406    fn conflate_passes_through_without_rate_difference() {
1407        let (publisher, subscriber) =
1408            rate_probe(|source| source.conflate(|left, right| left + right));
1409
1410        for value in 1..=4 {
1411            subscriber.request(1);
1412            publisher.expect_request();
1413            publisher.send_next(value);
1414            assert_eq!(subscriber.expect_next(), value);
1415        }
1416    }
1417
1418    #[test]
1419    fn conflate_aggregates_while_downstream_is_silent() {
1420        let (publisher, subscriber) = rate_probe(|source| {
1421            source.conflate_with_seed(
1422                |item| vec![item],
1423                |mut items, item| {
1424                    items.push(item);
1425                    items
1426                },
1427            )
1428        });
1429
1430        for value in 1..=4 {
1431            publisher.expect_request();
1432            publisher.send_next(value);
1433        }
1434        publisher.expect_request();
1435        publisher.send_complete();
1436
1437        subscriber.request(2);
1438        assert_eq!(subscriber.expect_next(), vec![1, 2, 3, 4]);
1439        subscriber.expect_complete();
1440    }
1441
1442    #[test]
1443    fn batch_passes_through_without_rate_difference() {
1444        let (publisher, subscriber) =
1445            rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1446
1447        for value in 1..=4 {
1448            subscriber.request(1);
1449            publisher.expect_request();
1450            publisher.send_next(value);
1451            assert_eq!(subscriber.expect_next(), value);
1452        }
1453    }
1454
1455    #[test]
1456    fn batch_aggregates_while_downstream_is_silent() {
1457        let (publisher, subscriber) = rate_probe(|source| {
1458            source.batch(
1459                u64::MAX,
1460                |item| vec![item],
1461                |mut items, item| {
1462                    items.insert(0, item);
1463                    items
1464                },
1465            )
1466        });
1467
1468        for value in 1..=10 {
1469            publisher.expect_request();
1470            publisher.send_next(value);
1471        }
1472        publisher.expect_request();
1473        publisher.send_complete();
1474
1475        subscriber.request(1);
1476        assert_eq!(
1477            subscriber.expect_next(),
1478            vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
1479        );
1480    }
1481
1482    #[test]
1483    fn batch_weighted_keeps_heavy_elements_separate() {
1484        let (publisher, subscriber) = rate_probe(|source| {
1485            source.batch_weighted(3, |_| 4, |item| item, |left, right| left + right)
1486        });
1487
1488        publisher.expect_request();
1489        publisher.send_next(1);
1490        publisher.expect_request();
1491        publisher.send_next(2);
1492
1493        subscriber.request(1);
1494        assert_eq!(subscriber.expect_next(), 1);
1495
1496        publisher.send_next(3);
1497        publisher.send_complete();
1498
1499        subscriber.request(2);
1500        assert_eq!(subscriber.expect_next(), 2);
1501        assert_eq!(subscriber.expect_next(), 3);
1502    }
1503
1504    #[test]
1505    fn batch_backpressures_at_max_aggregate() {
1506        let (publisher, subscriber) =
1507            rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1508
1509        publisher.expect_request();
1510        publisher.send_next(1);
1511        publisher.expect_request();
1512        publisher.send_next(2);
1513
1514        publisher.expect_request();
1515        publisher.send_next(3);
1516
1517        // Backpressure: nothing flows downstream while the aggregate is full.
1518        subscriber.expect_no_message(Duration::from_millis(200));
1519
1520        // Consume the first batch — releases backpressure.
1521        subscriber.request(1);
1522        let first = subscriber.expect_next();
1523
1524        publisher.expect_request();
1525        publisher.send_next(4);
1526        publisher.send_complete();
1527
1528        // Under load the consumer may pull before the producer aggregates,
1529        // legitimately splitting batches. Verify all elements accounted for.
1530        let mut all_values = vec![first];
1531        all_values.extend(subscriber.drain_until_complete());
1532        let total: i32 = all_values.iter().sum();
1533        assert_eq!(total, 10, "all elements should be accounted for");
1534        for &v in &all_values {
1535            assert!((1..=7).contains(&v), "batch value {v} out of range");
1536        }
1537    }
1538
1539    #[test]
1540    fn expand_passes_through_without_rate_difference() {
1541        let (publisher, subscriber) = rate_probe(|source| source.expand(std::iter::once::<i32>));
1542
1543        for value in 1..=4 {
1544            publisher.expect_request();
1545            publisher.send_next(value);
1546            subscriber.request(1);
1547            assert_eq!(subscriber.expect_next(), value);
1548        }
1549    }
1550
1551    #[test]
1552    fn expand_elements_while_upstream_is_silent() {
1553        let (publisher, mut subscriber) = rate_probe(|source| source.expand(std::iter::repeat));
1554        subscriber.set_timeout(Duration::from_millis(250));
1555
1556        publisher.expect_request();
1557        publisher.send_next(42);
1558
1559        subscriber.request(4);
1560        assert_eq!(subscriber.expect_next(), 42);
1561        assert_eq!(subscriber.expect_next(), 42);
1562        assert_eq!(subscriber.expect_next(), 42);
1563        assert_eq!(subscriber.expect_next(), 42);
1564
1565        publisher.expect_request();
1566        publisher.send_next(-42);
1567        subscriber.expect_no_message(Duration::from_millis(250));
1568        subscriber.request(1);
1569        assert_eq!(subscriber.expect_next(), -42);
1570    }
1571
1572    #[test]
1573    fn expand_does_not_drop_last_element() {
1574        let (mut stream, _materializer) =
1575            materialize_buffered_stream(Source::from_iter([1, 2]).expand(std::iter::once::<i32>));
1576
1577        assert_eq!(stream.next(), Some(Ok(1)));
1578        assert_eq!(stream.next(), Some(Ok(2)));
1579        assert_eq!(stream.next(), None);
1580    }
1581
1582    #[test]
1583    fn expand_handles_finite_extrapolations() {
1584        let (mut stream, _materializer) = materialize_buffered_stream(
1585            Source::from_iter([1, 2]).expand(|item| (0..3).map(move |index| (item, index))),
1586        );
1587
1588        let mut output = Vec::new();
1589        for item in stream.by_ref() {
1590            output.push(item.expect("expand should not fail"));
1591        }
1592
1593        assert!(
1594            !output.is_empty(),
1595            "expand must emit at least the first real element"
1596        );
1597        assert_eq!(
1598            output
1599                .iter()
1600                .filter_map(|(item, index)| (*index == 0).then_some(*item))
1601                .collect::<Vec<_>>(),
1602            vec![1, 2],
1603            "each real upstream element must emit its first expanded value exactly once"
1604        );
1605
1606        let mut current_item = output[0].0;
1607        let mut expected_index = 0;
1608        for &(item, index) in &output {
1609            assert!(
1610                item == current_item || item == current_item + 1,
1611                "expanded output must stay on the current item or advance to the next real item"
1612            );
1613            if item != current_item {
1614                assert_eq!(
1615                    item,
1616                    current_item + 1,
1617                    "real items must remain in upstream order"
1618                );
1619                assert_eq!(
1620                    index, 0,
1621                    "a new real item must begin with its first expanded value"
1622                );
1623                current_item = item;
1624                expected_index = 0;
1625            }
1626            assert_eq!(
1627                index, expected_index,
1628                "each real item may be followed only by its own ordered extrapolations"
1629            );
1630            expected_index += 1;
1631        }
1632
1633        assert_eq!(stream.next(), None);
1634        assert_eq!(stream.next(), None);
1635    }
1636
1637    #[test]
1638    fn expand_emits_first_value_even_if_terminal_is_already_visible() {
1639        let shared = SlotShared::new(NoExtra);
1640        {
1641            let mut state = shared
1642                .state
1643                .lock()
1644                .unwrap_or_else(|poison| poison.into_inner());
1645            state.slot = Some(42);
1646            state.terminal = Some(TerminalSignal::Complete);
1647        }
1648
1649        let mut stream = ExpandStream {
1650            shared,
1651            completion: None,
1652            current: None,
1653            expanded_once: false,
1654            seeded_from_upstream: false,
1655            expand: Arc::new(|item| (0..3).map(move |index| (item, index))),
1656        };
1657
1658        assert_eq!(stream.next(), Some(Ok((42, 0))));
1659        assert_eq!(stream.next(), None);
1660    }
1661
1662    #[test]
1663    fn extrapolate_initial_yields_to_real_element_already_in_slot() {
1664        let shared = SlotShared::new(NoExtra);
1665        {
1666            let mut state = shared
1667                .state
1668                .lock()
1669                .unwrap_or_else(|poison| poison.into_inner());
1670            state.slot = Some(7);
1671        }
1672
1673        let mut stream = ExpandStream {
1674            shared: Arc::clone(&shared),
1675            completion: None,
1676            // The synthetic `initial` extrapolation seed, installed exactly
1677            // the way `extrapolate(..., Some(initial))` does.
1678            current: Some(Box::new(std::iter::repeat(99))),
1679            expanded_once: false,
1680            seeded_from_upstream: false,
1681            expand: Arc::new(std::iter::repeat),
1682        };
1683
1684        // A real element that arrived before the first pull wins over the
1685        // synthetic initial seed; 99 must never be emitted.
1686        assert_eq!(stream.next(), Some(Ok(7)));
1687
1688        finish_slot(&shared, TerminalSignal::Complete);
1689        assert_eq!(stream.next(), None);
1690        assert_eq!(stream.next(), None);
1691    }
1692
1693    #[test]
1694    fn extrapolate_preserves_original_before_filling_gaps() {
1695        let (publisher, subscriber) =
1696            rate_probe(|source| source.extrapolate(|item| std::iter::once(item + 100), None));
1697
1698        publisher.expect_request();
1699        publisher.send_next(1);
1700
1701        subscriber.request(2);
1702        assert_eq!(subscriber.expect_next(), 1);
1703        assert_eq!(subscriber.expect_next(), 101);
1704    }
1705
1706    #[test]
1707    fn extrapolate_emits_initial_element_before_upstream_arrives() {
1708        let (publisher, subscriber) =
1709            rate_probe(|source| source.extrapolate(std::iter::repeat, Some(0)));
1710
1711        subscriber.request(1);
1712        assert_eq!(subscriber.expect_next(), 0);
1713
1714        publisher.expect_request();
1715        publisher.send_next(42);
1716        subscriber.request(3);
1717        assert_eq!(subscriber.expect_next(), 42);
1718        assert_eq!(subscriber.expect_next(), 42);
1719        assert_eq!(subscriber.expect_next(), 42);
1720    }
1721
1722    #[test]
1723    fn aggregate_with_boundary_splits_by_size() {
1724        let result = Source::from_iter(1..=7)
1725            .aggregate_with_boundary(
1726                Vec::<i32>::new,
1727                |mut buffer, item| {
1728                    buffer.push(item);
1729                    let ready = buffer.len() >= 3;
1730                    (buffer, ready)
1731                },
1732                |buffer| buffer,
1733                None,
1734            )
1735            .run_collect()
1736            .unwrap();
1737
1738        assert_eq!(result, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
1739    }
1740
1741    #[test]
1742    fn aggregate_with_boundary_honors_timer_trigger() {
1743        let (publisher, mut subscriber) = rate_probe(|source| {
1744            source.aggregate_with_boundary(
1745                Vec::<i32>::new,
1746                |mut buffer, item| {
1747                    buffer.push(item);
1748                    (buffer, false)
1749                },
1750                |buffer| buffer,
1751                Some(AggregateTimer::new(
1752                    |buffer: &Vec<i32>| !buffer.is_empty(),
1753                    Duration::from_millis(10),
1754                )),
1755            )
1756        });
1757        subscriber.set_timeout(Duration::from_millis(200));
1758
1759        publisher.expect_request();
1760        publisher.send_next(1);
1761        publisher.expect_request();
1762        publisher.send_next(2);
1763
1764        subscriber.request(1);
1765        assert_eq!(subscriber.expect_next(), vec![1, 2]);
1766    }
1767
1768    #[test]
1769    fn detach_passes_through_all_elements() {
1770        assert_eq!(
1771            Source::from_iter(1..=100).detach().run_collect().unwrap(),
1772            (1..=100).collect::<Vec<_>>()
1773        );
1774    }
1775
1776    #[test]
1777    fn detach_passes_through_failure() {
1778        let result = Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1779            .detach()
1780            .run_collect();
1781        assert_eq!(result, Err(StreamError::Failed("boom".to_owned())));
1782    }
1783
1784    #[test]
1785    fn detach_emits_last_element_when_completed_without_demand() {
1786        let (mut stream, _materializer) = materialize_buffered_stream(Source::single(42).detach());
1787
1788        assert_eq!(stream.next(), Some(Ok(42)));
1789        assert_eq!(stream.next(), None);
1790    }
1791}