Skip to main content

datum/stream/
rate.rs

1//! Rate-control operators (`buffer`, `conflate`, `batch`, `expand`, `detach`,
2//! `aggregate_with_boundary`) on `Flow` and `Source`, plus `OverflowStrategy` and
3//! `AggregateTimer`.
4//!
5//! Coalescing only happens across a real producer/consumer speed gap — a fully
6//! fused synchronous chain passes elements through one at a time.
7
8use super::*;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum OverflowStrategy {
12    DropHead,
13    DropTail,
14    DropBuffer,
15    DropNew,
16    Backpressure,
17    Fail,
18}
19
20pub struct AggregateTimer<Agg> {
21    predicate: Arc<dyn Fn(&Agg) -> bool + Send + Sync>,
22    interval: Duration,
23}
24
25impl<Agg> Clone for AggregateTimer<Agg> {
26    fn clone(&self) -> Self {
27        Self {
28            predicate: Arc::clone(&self.predicate),
29            interval: self.interval,
30        }
31    }
32}
33
34impl<Agg> AggregateTimer<Agg> {
35    #[must_use]
36    pub fn new<F>(predicate: F, interval: Duration) -> Self
37    where
38        F: Fn(&Agg) -> bool + Send + Sync + 'static,
39    {
40        assert!(
41            interval > Duration::ZERO,
42            "aggregate_with_boundary timer interval must be greater than zero"
43        );
44        Self {
45            predicate: Arc::new(predicate),
46            interval,
47        }
48    }
49}
50
51#[derive(Clone)]
52enum TerminalSignal {
53    Complete,
54    Error(StreamError),
55}
56
57struct QueueShared<T> {
58    state: Mutex<QueueState<T>>,
59    available: Condvar,
60    cancelled: Arc<AtomicBool>,
61    capacity: usize,
62}
63
64struct QueueState<T> {
65    queue: VecDeque<T>,
66    terminal: Option<TerminalSignal>,
67}
68
69impl<T> QueueShared<T> {
70    fn new(capacity: usize) -> Arc<Self> {
71        Arc::new(Self {
72            state: Mutex::new(QueueState {
73                queue: VecDeque::with_capacity(capacity),
74                terminal: None,
75            }),
76            available: Condvar::new(),
77            cancelled: Arc::new(AtomicBool::new(false)),
78            capacity,
79        })
80    }
81}
82
83struct QueueStream<T> {
84    shared: Arc<QueueShared<T>>,
85    completion: Option<StreamCompletion<NotUsed>>,
86}
87
88impl<T> Iterator for QueueStream<T> {
89    type Item = StreamResult<T>;
90
91    fn next(&mut self) -> Option<Self::Item> {
92        let mut state = self
93            .shared
94            .state
95            .lock()
96            .unwrap_or_else(|poison| poison.into_inner());
97        loop {
98            if let Some(item) = state.queue.pop_front() {
99                self.shared.available.notify_all();
100                return Some(Ok(item));
101            }
102            if let Some(terminal) = state.terminal.clone() {
103                return match terminal {
104                    TerminalSignal::Complete => None,
105                    TerminalSignal::Error(error) => Some(Err(error)),
106                };
107            }
108            state = self
109                .shared
110                .available
111                .wait(state)
112                .unwrap_or_else(|poison| poison.into_inner());
113        }
114    }
115}
116
117impl<T> Drop for QueueStream<T> {
118    fn drop(&mut self) {
119        self.shared.cancelled.store(true, Ordering::SeqCst);
120        self.shared.available.notify_all();
121        let _ = self.completion.take();
122    }
123}
124
125struct SlotShared<T, Extra> {
126    state: Mutex<SlotState<T, Extra>>,
127    available: Condvar,
128    cancelled: Arc<AtomicBool>,
129}
130
131struct SlotState<T, Extra> {
132    slot: Option<T>,
133    terminal: Option<TerminalSignal>,
134    extra: Extra,
135}
136
137impl<T, Extra> SlotShared<T, Extra> {
138    fn new(extra: Extra) -> Arc<Self> {
139        Arc::new(Self {
140            state: Mutex::new(SlotState {
141                slot: None,
142                terminal: None,
143                extra,
144            }),
145            available: Condvar::new(),
146            cancelled: Arc::new(AtomicBool::new(false)),
147        })
148    }
149}
150
151struct SlotStream<T, Extra> {
152    shared: Arc<SlotShared<T, Extra>>,
153    completion: Option<StreamCompletion<NotUsed>>,
154}
155
156impl<T, Extra> Iterator for SlotStream<T, Extra> {
157    type Item = StreamResult<T>;
158
159    fn next(&mut self) -> Option<Self::Item> {
160        let mut state = self
161            .shared
162            .state
163            .lock()
164            .unwrap_or_else(|poison| poison.into_inner());
165        loop {
166            if let Some(item) = state.slot.take() {
167                self.shared.available.notify_all();
168                return Some(Ok(item));
169            }
170            if let Some(terminal) = state.terminal.clone() {
171                return match terminal {
172                    TerminalSignal::Complete => None,
173                    TerminalSignal::Error(error) => Some(Err(error)),
174                };
175            }
176            state = self
177                .shared
178                .available
179                .wait(state)
180                .unwrap_or_else(|poison| poison.into_inner());
181        }
182    }
183}
184
185impl<T, Extra> Drop for SlotStream<T, Extra> {
186    fn drop(&mut self) {
187        self.shared.cancelled.store(true, Ordering::SeqCst);
188        self.shared.available.notify_all();
189        let _ = self.completion.take();
190    }
191}
192
193fn finish_queue<T>(shared: &QueueShared<T>, terminal: TerminalSignal) {
194    let mut state = shared
195        .state
196        .lock()
197        .unwrap_or_else(|poison| poison.into_inner());
198    if state.terminal.is_none() {
199        state.terminal = Some(terminal);
200    }
201    drop(state);
202    shared.available.notify_all();
203}
204
205fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
206    let mut state = shared
207        .state
208        .lock()
209        .unwrap_or_else(|poison| poison.into_inner());
210    if state.terminal.is_none() {
211        state.terminal = Some(terminal);
212    }
213    drop(state);
214    shared.available.notify_all();
215}
216
217struct ProducerPanicGuard<T> {
218    shared: Arc<QueueShared<T>>,
219    armed: bool,
220}
221
222impl<T> ProducerPanicGuard<T> {
223    fn new(shared: Arc<QueueShared<T>>) -> Self {
224        Self {
225            shared,
226            armed: true,
227        }
228    }
229
230    fn disarm(&mut self) {
231        self.armed = false;
232    }
233}
234
235impl<T> Drop for ProducerPanicGuard<T> {
236    fn drop(&mut self) {
237        if self.armed {
238            finish_queue(
239                &self.shared,
240                TerminalSignal::Error(StreamError::AbruptTermination),
241            );
242        }
243    }
244}
245
246struct SlotProducerPanicGuard<T, Extra> {
247    shared: Arc<SlotShared<T, Extra>>,
248    armed: bool,
249}
250
251impl<T, Extra> SlotProducerPanicGuard<T, Extra> {
252    fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
253        Self {
254            shared,
255            armed: true,
256        }
257    }
258
259    fn disarm(&mut self) {
260        self.armed = false;
261    }
262}
263
264impl<T, Extra> Drop for SlotProducerPanicGuard<T, Extra> {
265    fn drop(&mut self) {
266        if self.armed {
267            finish_slot(
268                &self.shared,
269                TerminalSignal::Error(StreamError::AbruptTermination),
270            );
271        }
272    }
273}
274
275#[derive(Default)]
276struct NoExtra;
277
278struct BatchExtra<In> {
279    remaining: i128,
280    pending: Option<In>,
281}
282
283impl<In> BatchExtra<In> {
284    fn new(limit: u64) -> Self {
285        Self {
286            remaining: i128::from(limit),
287            pending: None,
288        }
289    }
290}
291
292struct BoundaryExtra {
293    ready: bool,
294}
295
296struct BoundaryStream<Agg, Emit> {
297    shared: Arc<SlotShared<Agg, BoundaryExtra>>,
298    completion: Option<StreamCompletion<NotUsed>>,
299    timer: Option<Cancellable>,
300    harvest: Arc<dyn Fn(Agg) -> Emit + Send + Sync>,
301}
302
303impl<Agg, Emit> Iterator for BoundaryStream<Agg, Emit> {
304    type Item = StreamResult<Emit>;
305
306    fn next(&mut self) -> Option<Self::Item> {
307        loop {
308            let (slot, terminal) = {
309                let mut state = self
310                    .shared
311                    .state
312                    .lock()
313                    .unwrap_or_else(|poison| poison.into_inner());
314                loop {
315                    if state.extra.ready {
316                        let slot = state.slot.take();
317                        state.extra.ready = false;
318                        self.shared.available.notify_all();
319                        break (slot, None);
320                    }
321                    if state.terminal.is_some() {
322                        if let Some(slot) = state.slot.take() {
323                            self.shared.available.notify_all();
324                            break (Some(slot), None);
325                        }
326                        break (None, state.terminal.clone());
327                    }
328                    state = self
329                        .shared
330                        .available
331                        .wait(state)
332                        .unwrap_or_else(|poison| poison.into_inner());
333                }
334            };
335
336            if let Some(agg) = slot {
337                return Some(Ok((self.harvest)(agg)));
338            }
339            if let Some(terminal) = terminal {
340                return match terminal {
341                    TerminalSignal::Complete => None,
342                    TerminalSignal::Error(error) => Some(Err(error)),
343                };
344            }
345        }
346    }
347}
348
349impl<Agg, Emit> Drop for BoundaryStream<Agg, Emit> {
350    fn drop(&mut self) {
351        self.shared.cancelled.store(true, Ordering::SeqCst);
352        self.shared.available.notify_all();
353        if let Some(timer) = self.timer.take() {
354            timer.cancel();
355        }
356        let _ = self.completion.take();
357    }
358}
359
360fn buffer_stage<T: Send + 'static>(
361    input: BoxStream<T>,
362    capacity: usize,
363    strategy: OverflowStrategy,
364    materializer: &Materializer,
365) -> StreamResult<BoxStream<T>> {
366    let shared = QueueShared::new(capacity);
367    let producer_shared = Arc::clone(&shared);
368    let cancelled = Arc::clone(&shared.cancelled);
369    let state = Arc::clone(&materializer.inner.state);
370    let completion = materializer.spawn_stream(move |_| {
371        let mut panic_guard = ProducerPanicGuard::new(Arc::clone(&producer_shared));
372        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
373        loop {
374            if cancelled.load(Ordering::SeqCst) {
375                panic_guard.disarm();
376                return Ok(NotUsed);
377            }
378
379            match input.next() {
380                Some(Ok(item)) => {
381                    let mut guard = producer_shared
382                        .state
383                        .lock()
384                        .unwrap_or_else(|poison| poison.into_inner());
385                    match strategy {
386                        OverflowStrategy::Backpressure => {
387                            while guard.queue.len() == producer_shared.capacity
388                                && !cancelled.load(Ordering::SeqCst)
389                            {
390                                guard = producer_shared
391                                    .available
392                                    .wait(guard)
393                                    .unwrap_or_else(|poison| poison.into_inner());
394                            }
395                            if cancelled.load(Ordering::SeqCst) {
396                                panic_guard.disarm();
397                                return Ok(NotUsed);
398                            }
399                            guard.queue.push_back(item);
400                        }
401                        OverflowStrategy::DropHead => {
402                            if guard.queue.len() == producer_shared.capacity {
403                                let _ = guard.queue.pop_front();
404                            }
405                            guard.queue.push_back(item);
406                        }
407                        OverflowStrategy::DropTail => {
408                            if guard.queue.len() == producer_shared.capacity {
409                                let _ = guard.queue.pop_back();
410                            }
411                            guard.queue.push_back(item);
412                        }
413                        OverflowStrategy::DropBuffer => {
414                            if guard.queue.len() == producer_shared.capacity {
415                                guard.queue.clear();
416                            }
417                            guard.queue.push_back(item);
418                        }
419                        OverflowStrategy::DropNew => {
420                            if guard.queue.len() < producer_shared.capacity {
421                                guard.queue.push_back(item);
422                            }
423                        }
424                        OverflowStrategy::Fail => {
425                            if guard.queue.len() == producer_shared.capacity {
426                                guard.queue.clear();
427                                drop(guard);
428                                panic_guard.disarm();
429                                finish_queue(
430                                    &producer_shared,
431                                    TerminalSignal::Error(StreamError::Failed(format!(
432                                        "Buffer overflow (max capacity was: {capacity})!"
433                                    ))),
434                                );
435                                return Ok(NotUsed);
436                            }
437                            guard.queue.push_back(item);
438                        }
439                    }
440                    drop(guard);
441                    producer_shared.available.notify_all();
442                }
443                Some(Err(error)) => {
444                    panic_guard.disarm();
445                    finish_queue(&producer_shared, TerminalSignal::Error(error));
446                    return Ok(NotUsed);
447                }
448                None => {
449                    panic_guard.disarm();
450                    finish_queue(&producer_shared, TerminalSignal::Complete);
451                    return Ok(NotUsed);
452                }
453            }
454        }
455    });
456
457    Ok(Box::new(QueueStream {
458        shared,
459        completion: Some(completion),
460    }))
461}
462
463fn batch_stage<In, Agg, Cost, Seed, Aggregate>(
464    input: BoxStream<In>,
465    limit: u64,
466    cost_fn: Arc<Cost>,
467    seed: Arc<Seed>,
468    aggregate: Arc<Aggregate>,
469    materializer: &Materializer,
470) -> StreamResult<BoxStream<Agg>>
471where
472    In: Send + 'static,
473    Agg: Send + 'static,
474    Cost: Fn(&In) -> u64 + Send + Sync + 'static,
475    Seed: Fn(In) -> Agg + Send + Sync + 'static,
476    Aggregate: Fn(Agg, In) -> Agg + Send + Sync + 'static,
477{
478    let shared = SlotShared::new(BatchExtra::new(limit));
479    let producer_shared = Arc::clone(&shared);
480    let cancelled = Arc::clone(&shared.cancelled);
481    let state = Arc::clone(&materializer.inner.state);
482    let completion = materializer.spawn_stream(move |_| {
483        let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
484        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
485        let mut carry = None::<In>;
486
487        loop {
488            if cancelled.load(Ordering::SeqCst) {
489                panic_guard.disarm();
490                return Ok(NotUsed);
491            }
492
493            let next = if let Some(item) = carry.take() {
494                Some(Ok(item))
495            } else {
496                input.next()
497            };
498
499            match next {
500                Some(Ok(item)) => {
501                    let cost = i128::from(cost_fn(&item));
502                    let current = {
503                        let mut guard = producer_shared
504                            .state
505                            .lock()
506                            .unwrap_or_else(|poison| poison.into_inner());
507
508                        if guard.slot.is_none() {
509                            None
510                        } else if guard.extra.remaining < cost {
511                            guard.extra.pending = Some(item);
512                            while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
513                                guard = producer_shared
514                                    .available
515                                    .wait(guard)
516                                    .unwrap_or_else(|poison| poison.into_inner());
517                            }
518                            if cancelled.load(Ordering::SeqCst) {
519                                panic_guard.disarm();
520                                return Ok(NotUsed);
521                            }
522                            carry = guard.extra.pending.take();
523                            guard.extra.remaining = i128::from(limit);
524                            continue;
525                        } else {
526                            let current = guard.slot.take();
527                            guard.extra.remaining -= cost;
528                            current
529                        }
530                    };
531
532                    match current {
533                        None => {
534                            let next_agg = seed(item);
535                            let mut guard = producer_shared
536                                .state
537                                .lock()
538                                .unwrap_or_else(|poison| poison.into_inner());
539                            guard.extra.remaining = i128::from(limit) - cost;
540                            guard.slot = Some(next_agg);
541                            drop(guard);
542                            producer_shared.available.notify_all();
543                        }
544                        Some(current) => {
545                            let next_agg = aggregate(current, item);
546                            let mut guard = producer_shared
547                                .state
548                                .lock()
549                                .unwrap_or_else(|poison| poison.into_inner());
550                            guard.slot = Some(next_agg);
551                            drop(guard);
552                            producer_shared.available.notify_all();
553                        }
554                    }
555                }
556                Some(Err(error)) => {
557                    panic_guard.disarm();
558                    finish_slot(&producer_shared, TerminalSignal::Error(error));
559                    return Ok(NotUsed);
560                }
561                None => {
562                    panic_guard.disarm();
563                    finish_slot(&producer_shared, TerminalSignal::Complete);
564                    return Ok(NotUsed);
565                }
566            }
567        }
568    });
569
570    Ok(Box::new(SlotStream {
571        shared,
572        completion: Some(completion),
573    }))
574}
575
576fn expand_stage<In, Out, Expand, Iter>(
577    input: BoxStream<In>,
578    expand: Arc<Expand>,
579    initial: Option<Box<dyn Iterator<Item = Out> + Send>>,
580    materializer: &Materializer,
581) -> StreamResult<BoxStream<Out>>
582where
583    In: Send + 'static,
584    Out: Send + 'static,
585    Expand: Fn(In) -> Iter + Send + Sync + 'static,
586    Iter: Iterator<Item = Out> + Send + 'static,
587{
588    let shared = SlotShared::new(NoExtra);
589    let producer_shared = Arc::clone(&shared);
590    let cancelled = Arc::clone(&shared.cancelled);
591    let state = Arc::clone(&materializer.inner.state);
592    let completion = materializer.spawn_stream(move |_| {
593        let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
594        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
595        loop {
596            if cancelled.load(Ordering::SeqCst) {
597                panic_guard.disarm();
598                return Ok(NotUsed);
599            }
600
601            match input.next() {
602                Some(Ok(item)) => {
603                    let mut guard = producer_shared
604                        .state
605                        .lock()
606                        .unwrap_or_else(|poison| poison.into_inner());
607                    while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
608                        guard = producer_shared
609                            .available
610                            .wait(guard)
611                            .unwrap_or_else(|poison| poison.into_inner());
612                    }
613                    if cancelled.load(Ordering::SeqCst) {
614                        panic_guard.disarm();
615                        return Ok(NotUsed);
616                    }
617                    guard.slot = Some(item);
618                    drop(guard);
619                    producer_shared.available.notify_all();
620                }
621                Some(Err(error)) => {
622                    panic_guard.disarm();
623                    finish_slot(&producer_shared, TerminalSignal::Error(error));
624                    return Ok(NotUsed);
625                }
626                None => {
627                    panic_guard.disarm();
628                    finish_slot(&producer_shared, TerminalSignal::Complete);
629                    return Ok(NotUsed);
630                }
631            }
632        }
633    });
634
635    Ok(Box::new(ExpandStream {
636        shared,
637        completion: Some(completion),
638        current: initial,
639        expanded_once: false,
640        seeded_from_upstream: false,
641        expand,
642    }))
643}
644
645struct ExpandStream<In, Out, Expand, Iter>
646where
647    Expand: Fn(In) -> Iter + Send + Sync + 'static,
648    Iter: Iterator<Item = Out> + Send + 'static,
649{
650    shared: Arc<SlotShared<In, NoExtra>>,
651    completion: Option<StreamCompletion<NotUsed>>,
652    current: Option<Box<dyn Iterator<Item = Out> + Send>>,
653    expanded_once: bool,
654    /// Whether `current` was built from a real upstream element (vs the
655    /// synthetic `initial` extrapolation seed). Only a real element's first
656    /// emission is guaranteed; the seed yields to a slot that filled first.
657    seeded_from_upstream: bool,
658    expand: Arc<Expand>,
659}
660
661impl<In, Out, Expand, Iter> Iterator for ExpandStream<In, Out, Expand, Iter>
662where
663    In: Send + 'static,
664    Out: Send + 'static,
665    Expand: Fn(In) -> Iter + Send + Sync + 'static,
666    Iter: Iterator<Item = Out> + Send + 'static,
667{
668    type Item = StreamResult<Out>;
669
670    fn next(&mut self) -> Option<Self::Item> {
671        loop {
672            // Guaranteed first emission for an iterator built from a real
673            // upstream element: deliver its head before consulting the slot
674            // or terminal again. The synthetic `initial` seed does NOT get
675            // this priority — a real element that arrived first wins (see the
676            // emit-initial decision below).
677            if let Some(current) = &mut self.current
678                && !self.expanded_once
679                && self.seeded_from_upstream
680            {
681                if let Some(item) = current.next() {
682                    self.expanded_once = true;
683                    return Some(Ok(item));
684                }
685                self.current = None;
686                self.expanded_once = false;
687            }
688
689            // Decide the next action under the lock, but never run user code
690            // (the `expand` closure or the extrapolation iterator) while
691            // holding it: a slow or blocking user iterator must not stall the
692            // producer's slot handoff.
693            enum Decision<In> {
694                NewElement(In),
695                Extrapolate,
696                EmitInitial,
697                Terminal(TerminalSignal),
698            }
699
700            let decision = {
701                let mut state = self
702                    .shared
703                    .state
704                    .lock()
705                    .unwrap_or_else(|poison| poison.into_inner());
706                loop {
707                    if let Some(item) = state.slot.take() {
708                        self.shared.available.notify_all();
709                        break Decision::NewElement(item);
710                    }
711
712                    if let Some(terminal) = state.terminal.clone() {
713                        break Decision::Terminal(terminal);
714                    }
715
716                    if self.current.is_some() {
717                        break if self.expanded_once {
718                            Decision::Extrapolate
719                        } else {
720                            // Un-emitted `initial` seed and the slot is empty:
721                            // downstream pulled before upstream produced.
722                            Decision::EmitInitial
723                        };
724                    }
725
726                    state = self
727                        .shared
728                        .available
729                        .wait(state)
730                        .unwrap_or_else(|poison| poison.into_inner());
731                }
732            };
733
734            match decision {
735                Decision::NewElement(item) => {
736                    self.current = Some(Box::new((self.expand)(item)));
737                    self.expanded_once = false;
738                    self.seeded_from_upstream = true;
739                }
740                Decision::EmitInitial => {
741                    if let Some(current) = &mut self.current
742                        && let Some(item) = current.next()
743                    {
744                        self.expanded_once = true;
745                        return Some(Ok(item));
746                    }
747                    self.current = None;
748                    self.expanded_once = false;
749                }
750                Decision::Extrapolate => {
751                    if let Some(current) = &mut self.current
752                        && let Some(item) = current.next()
753                    {
754                        return Some(Ok(item));
755                    }
756                    self.current = None;
757                    self.expanded_once = false;
758                }
759                Decision::Terminal(terminal) => {
760                    return match terminal {
761                        TerminalSignal::Complete => None,
762                        TerminalSignal::Error(error) => Some(Err(error)),
763                    };
764                }
765            }
766        }
767    }
768}
769
770impl<In, Out, Expand, Iter> Drop for ExpandStream<In, Out, Expand, Iter>
771where
772    Expand: Fn(In) -> Iter + Send + Sync + 'static,
773    Iter: Iterator<Item = Out> + Send + 'static,
774{
775    fn drop(&mut self) {
776        self.shared.cancelled.store(true, Ordering::SeqCst);
777        self.shared.available.notify_all();
778        let _ = self.completion.take();
779    }
780}
781
782fn aggregate_with_boundary_stage<In, Agg, Emit, Allocate, Aggregate, Harvest>(
783    input: BoxStream<In>,
784    allocate: Arc<Allocate>,
785    aggregate: Arc<Aggregate>,
786    harvest: Arc<Harvest>,
787    emit_on_timer: Option<AggregateTimer<Agg>>,
788    materializer: &Materializer,
789) -> StreamResult<BoxStream<Emit>>
790where
791    In: Send + 'static,
792    Agg: Send + 'static,
793    Emit: Send + 'static,
794    Allocate: Fn() -> Agg + Send + Sync + 'static,
795    Aggregate: Fn(Agg, In) -> (Agg, bool) + Send + Sync + 'static,
796    Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
797{
798    let shared = SlotShared::new(BoundaryExtra { ready: false });
799    let producer_shared = Arc::clone(&shared);
800    let cancelled = Arc::clone(&shared.cancelled);
801    let state = Arc::clone(&materializer.inner.state);
802    let completion = materializer.spawn_stream(move |_| {
803        let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
804        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
805        loop {
806            if cancelled.load(Ordering::SeqCst) {
807                panic_guard.disarm();
808                return Ok(NotUsed);
809            }
810
811            match input.next() {
812                Some(Ok(item)) => {
813                    let current = {
814                        let mut guard = producer_shared
815                            .state
816                            .lock()
817                            .unwrap_or_else(|poison| poison.into_inner());
818                        while guard.extra.ready && !cancelled.load(Ordering::SeqCst) {
819                            guard = producer_shared
820                                .available
821                                .wait(guard)
822                                .unwrap_or_else(|poison| poison.into_inner());
823                        }
824                        if cancelled.load(Ordering::SeqCst) {
825                            panic_guard.disarm();
826                            return Ok(NotUsed);
827                        }
828                        guard.slot.take()
829                    };
830
831                    let current = current.unwrap_or_else(|| allocate());
832                    let (updated, ready) = aggregate(current, item);
833                    let mut guard = producer_shared
834                        .state
835                        .lock()
836                        .unwrap_or_else(|poison| poison.into_inner());
837                    guard.slot = Some(updated);
838                    guard.extra.ready = ready;
839                    drop(guard);
840                    producer_shared.available.notify_all();
841                }
842                Some(Err(error)) => {
843                    panic_guard.disarm();
844                    finish_slot(&producer_shared, TerminalSignal::Error(error));
845                    return Ok(NotUsed);
846                }
847                None => {
848                    panic_guard.disarm();
849                    finish_slot(&producer_shared, TerminalSignal::Complete);
850                    return Ok(NotUsed);
851                }
852            }
853        }
854    });
855
856    let timer = emit_on_timer.map(|timer| {
857        let shared = Arc::clone(&shared);
858        let cancelled = Arc::clone(&shared.cancelled);
859        materializer.schedule_with_fixed_delay(timer.interval, timer.interval, move || {
860            if cancelled.load(Ordering::SeqCst) {
861                return;
862            }
863            let should_emit = {
864                let state = shared
865                    .state
866                    .lock()
867                    .unwrap_or_else(|poison| poison.into_inner());
868                if state.slot.is_none() || state.extra.ready || state.terminal.is_some() {
869                    None
870                } else {
871                    Some(std::panic::catch_unwind(std::panic::AssertUnwindSafe(
872                        || (timer.predicate)(state.slot.as_ref().expect("aggregate present")),
873                    )))
874                }
875            };
876            match should_emit {
877                Some(Ok(true)) => {
878                    let mut state = shared
879                        .state
880                        .lock()
881                        .unwrap_or_else(|poison| poison.into_inner());
882                    if state.slot.is_some() && !state.extra.ready && state.terminal.is_none() {
883                        state.extra.ready = true;
884                    }
885                    drop(state);
886                    shared.available.notify_all();
887                }
888                Some(Ok(false)) | None => {}
889                Some(Err(_)) => {
890                    finish_slot(
891                        &shared,
892                        TerminalSignal::Error(StreamError::AbruptTermination),
893                    );
894                }
895            }
896        })
897    });
898
899    Ok(Box::new(BoundaryStream {
900        shared,
901        completion: Some(completion),
902        timer,
903        harvest,
904    }))
905}
906
907impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
908    pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Flow<In, Out, Mat> {
909        assert!(size > 0, "buffer size must be greater than zero");
910        self.via(Flow::from_runtime_transform(move |input, materializer| {
911            buffer_stage(input, size, strategy, materializer)
912        }))
913    }
914
915    pub fn conflate_with_seed<Agg, Seed, Aggregate>(
916        self,
917        seed: Seed,
918        aggregate: Aggregate,
919    ) -> Flow<In, Agg, Mat>
920    where
921        Agg: Send + 'static,
922        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
923        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
924    {
925        let seed = Arc::new(seed);
926        let aggregate = Arc::new(aggregate);
927        self.via(Flow::from_runtime_transform(move |input, materializer| {
928            batch_stage(
929                input,
930                1,
931                Arc::new(|_: &Out| 0),
932                Arc::clone(&seed),
933                Arc::clone(&aggregate),
934                materializer,
935            )
936        }))
937    }
938
939    pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
940        self.conflate_with_seed(|item| item, aggregate)
941    }
942
943    pub fn batch<Agg, Seed, Aggregate>(
944        self,
945        max: u64,
946        seed: Seed,
947        aggregate: Aggregate,
948    ) -> Flow<In, Agg, Mat>
949    where
950        Agg: Send + 'static,
951        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
952        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
953    {
954        assert!(max > 0, "batch max must be greater than zero");
955        let seed = Arc::new(seed);
956        let aggregate = Arc::new(aggregate);
957        self.via(Flow::from_runtime_transform(move |input, materializer| {
958            batch_stage(
959                input,
960                max,
961                Arc::new(|_: &Out| 1),
962                Arc::clone(&seed),
963                Arc::clone(&aggregate),
964                materializer,
965            )
966        }))
967    }
968
969    pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
970        self,
971        max: u64,
972        cost_fn: Cost,
973        seed: Seed,
974        aggregate: Aggregate,
975    ) -> Flow<In, Agg, Mat>
976    where
977        Agg: Send + 'static,
978        Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
979        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
980        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
981    {
982        assert!(max > 0, "batch_weighted max must be greater than zero");
983        let cost_fn = Arc::new(cost_fn);
984        let seed = Arc::new(seed);
985        let aggregate = Arc::new(aggregate);
986        self.via(Flow::from_runtime_transform(move |input, materializer| {
987            batch_stage(
988                input,
989                max,
990                Arc::clone(&cost_fn),
991                Arc::clone(&seed),
992                Arc::clone(&aggregate),
993                materializer,
994            )
995        }))
996    }
997
998    pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Flow<In, Next, Mat>
999    where
1000        Next: Send + 'static,
1001        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1002        Iter: Iterator<Item = Next> + Send + 'static,
1003    {
1004        let expand = Arc::new(expand);
1005        self.via(Flow::from_runtime_transform(move |input, materializer| {
1006            expand_stage(input, Arc::clone(&expand), None, materializer)
1007        }))
1008    }
1009
1010    pub fn extrapolate<Expand, Iter>(
1011        self,
1012        extrapolator: Expand,
1013        initial: Option<Out>,
1014    ) -> Flow<In, Out, Mat>
1015    where
1016        Out: Clone + Sync,
1017        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1018        Iter: Iterator<Item = Out> + Send + 'static,
1019    {
1020        let extrapolator = Arc::new(extrapolator);
1021        self.via(Flow::from_runtime_transform(move |input, materializer| {
1022            let extrapolator = Arc::clone(&extrapolator);
1023            let initial = initial.clone().map(|item| {
1024                Box::new(std::iter::once(item)) as Box<dyn Iterator<Item = Out> + Send>
1025            });
1026            expand_stage(
1027                input,
1028                Arc::new(move |item: Out| {
1029                    std::iter::once(item.clone()).chain((extrapolator)(item))
1030                }),
1031                initial,
1032                materializer,
1033            )
1034        }))
1035    }
1036
1037    pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1038        self,
1039        allocate: Allocate,
1040        aggregate: Aggregate,
1041        harvest: Harvest,
1042        emit_on_timer: Option<AggregateTimer<Agg>>,
1043    ) -> Flow<In, Emit, Mat>
1044    where
1045        Agg: Send + 'static,
1046        Emit: Send + 'static,
1047        Allocate: Fn() -> Agg + Send + Sync + 'static,
1048        Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1049        Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1050    {
1051        let allocate = Arc::new(allocate);
1052        let aggregate = Arc::new(aggregate);
1053        let harvest = Arc::new(harvest);
1054        self.via(Flow::from_runtime_transform(move |input, materializer| {
1055            aggregate_with_boundary_stage(
1056                input,
1057                Arc::clone(&allocate),
1058                Arc::clone(&aggregate),
1059                Arc::clone(&harvest),
1060                emit_on_timer.clone(),
1061                materializer,
1062            )
1063        }))
1064    }
1065
1066    pub fn detach(self) -> Flow<In, Out, Mat> {
1067        self.buffer(1, OverflowStrategy::Backpressure)
1068    }
1069}
1070
1071impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1072    pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Self {
1073        self.via(Flow::identity().buffer(size, strategy))
1074    }
1075
1076    pub fn conflate_with_seed<Agg, Seed, Aggregate>(
1077        self,
1078        seed: Seed,
1079        aggregate: Aggregate,
1080    ) -> Source<Agg, Mat>
1081    where
1082        Agg: Send + 'static,
1083        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1084        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1085    {
1086        self.via(Flow::identity().conflate_with_seed(seed, aggregate))
1087    }
1088
1089    pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
1090        self.via(Flow::identity().conflate(aggregate))
1091    }
1092
1093    pub fn batch<Agg, Seed, Aggregate>(
1094        self,
1095        max: u64,
1096        seed: Seed,
1097        aggregate: Aggregate,
1098    ) -> Source<Agg, Mat>
1099    where
1100        Agg: Send + 'static,
1101        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1102        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1103    {
1104        self.via(Flow::identity().batch(max, seed, aggregate))
1105    }
1106
1107    pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
1108        self,
1109        max: u64,
1110        cost_fn: Cost,
1111        seed: Seed,
1112        aggregate: Aggregate,
1113    ) -> Source<Agg, Mat>
1114    where
1115        Agg: Send + 'static,
1116        Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
1117        Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1118        Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1119    {
1120        self.via(Flow::identity().batch_weighted(max, cost_fn, seed, aggregate))
1121    }
1122
1123    pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Source<Next, Mat>
1124    where
1125        Next: Send + 'static,
1126        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1127        Iter: Iterator<Item = Next> + Send + 'static,
1128    {
1129        self.via(Flow::identity().expand(expand))
1130    }
1131
1132    pub fn extrapolate<Expand, Iter>(self, extrapolator: Expand, initial: Option<Out>) -> Self
1133    where
1134        Out: Clone + Sync,
1135        Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1136        Iter: Iterator<Item = Out> + Send + 'static,
1137    {
1138        self.via(Flow::identity().extrapolate(extrapolator, initial))
1139    }
1140
1141    pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1142        self,
1143        allocate: Allocate,
1144        aggregate: Aggregate,
1145        harvest: Harvest,
1146        emit_on_timer: Option<AggregateTimer<Agg>>,
1147    ) -> Source<Emit, Mat>
1148    where
1149        Agg: Send + 'static,
1150        Emit: Send + 'static,
1151        Allocate: Fn() -> Agg + Send + Sync + 'static,
1152        Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1153        Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1154    {
1155        self.via(Flow::identity().aggregate_with_boundary(
1156            allocate,
1157            aggregate,
1158            harvest,
1159            emit_on_timer,
1160        ))
1161    }
1162
1163    pub fn detach(self) -> Self {
1164        self.via(Flow::identity().detach())
1165    }
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170    use super::*;
1171    use crate::testkit::{TestSink, TestSource};
1172    use std::sync::mpsc;
1173    use std::time::Duration;
1174
1175    fn materialize_buffered_stream<T: Send + 'static>(
1176        source: Source<T>,
1177    ) -> (BoxStream<T>, Materializer) {
1178        let materializer = Materializer::new();
1179        let (stream, _) = Arc::clone(&source.factory)
1180            .create(&materializer)
1181            .expect("buffer source materializes");
1182        (stream, materializer)
1183    }
1184
1185    fn buffered_probe(
1186        strategy: OverflowStrategy,
1187    ) -> (
1188        crate::testkit::TestPublisherProbe<i32>,
1189        crate::testkit::TestSubscriberProbe<i32>,
1190    ) {
1191        TestSource::probe::<i32>()
1192            .buffer(2, strategy)
1193            .to_mat(TestSink::probe(), Keep::both)
1194            .run()
1195            .expect("buffer probe materializes")
1196    }
1197
1198    fn rate_probe<T, U>(
1199        flow: impl FnOnce(
1200            Source<T, crate::testkit::TestPublisherProbe<T>>,
1201        ) -> Source<U, crate::testkit::TestPublisherProbe<T>>,
1202    ) -> (
1203        crate::testkit::TestPublisherProbe<T>,
1204        crate::testkit::TestSubscriberProbe<U>,
1205    )
1206    where
1207        T: Send + 'static,
1208        U: Send + 'static,
1209    {
1210        flow(TestSource::probe::<T>())
1211            .to_mat(TestSink::probe(), Keep::both)
1212            .run()
1213            .expect("rate probe materializes")
1214    }
1215
1216    #[test]
1217    fn buffer_drop_head_drops_oldest_buffered_elements() {
1218        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropHead);
1219        publisher.expect_request();
1220        publisher.send_next(1);
1221        publisher.expect_request();
1222        publisher.send_next(2);
1223        publisher.expect_request();
1224        publisher.send_next(3);
1225        publisher.expect_request();
1226        publisher.send_next(4);
1227        publisher.expect_request();
1228        publisher.send_complete();
1229
1230        subscriber.request(3);
1231        assert_eq!(subscriber.expect_next(), 3);
1232        assert_eq!(subscriber.expect_next(), 4);
1233        subscriber.expect_complete();
1234    }
1235
1236    #[test]
1237    fn buffer_pulls_eagerly_before_downstream_demand() {
1238        let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1239        publisher.set_timeout(Duration::from_millis(250));
1240        subscriber.set_timeout(Duration::from_millis(250));
1241
1242        publisher.expect_request();
1243        publisher.send_next(1);
1244        publisher.expect_request();
1245        publisher.send_next(2);
1246        subscriber.expect_no_message(Duration::from_millis(250));
1247
1248        subscriber.request(2);
1249        assert_eq!(subscriber.expect_next(), 1);
1250        assert_eq!(subscriber.expect_next(), 2);
1251    }
1252
1253    #[test]
1254    fn buffer_drop_tail_drops_newest_buffered_element() {
1255        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropTail);
1256        publisher.expect_request();
1257        publisher.send_next(1);
1258        publisher.expect_request();
1259        publisher.send_next(2);
1260        publisher.expect_request();
1261        publisher.send_next(3);
1262        publisher.expect_request();
1263        publisher.send_next(4);
1264        publisher.expect_request();
1265        publisher.send_complete();
1266
1267        subscriber.request(3);
1268        assert_eq!(subscriber.expect_next(), 1);
1269        assert_eq!(subscriber.expect_next(), 4);
1270        subscriber.expect_complete();
1271    }
1272
1273    #[test]
1274    fn buffer_drop_buffer_drops_all_buffered_elements() {
1275        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropBuffer);
1276        publisher.expect_request();
1277        publisher.send_next(1);
1278        publisher.expect_request();
1279        publisher.send_next(2);
1280        publisher.expect_request();
1281        publisher.send_next(3);
1282        publisher.expect_request();
1283        publisher.send_next(4);
1284        publisher.expect_request();
1285        publisher.send_complete();
1286
1287        subscriber.request(3);
1288        assert_eq!(subscriber.expect_next(), 3);
1289        assert_eq!(subscriber.expect_next(), 4);
1290        subscriber.expect_complete();
1291    }
1292
1293    #[test]
1294    fn buffer_drop_new_drops_incoming_elements() {
1295        let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropNew);
1296        publisher.expect_request();
1297        publisher.send_next(1);
1298        publisher.expect_request();
1299        publisher.send_next(2);
1300        publisher.expect_request();
1301        publisher.send_next(3);
1302        publisher.expect_request();
1303        publisher.send_next(4);
1304        publisher.expect_request();
1305        publisher.send_complete();
1306
1307        subscriber.request(3);
1308        assert_eq!(subscriber.expect_next(), 1);
1309        assert_eq!(subscriber.expect_next(), 2);
1310        subscriber.expect_complete();
1311    }
1312
1313    #[test]
1314    fn buffer_backpressure_stops_pulling_when_full() {
1315        let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1316        publisher.expect_request();
1317        publisher.send_next(1);
1318        publisher.expect_request();
1319        publisher.send_next(2);
1320        publisher.set_timeout(Duration::from_millis(250));
1321        subscriber.set_timeout(Duration::from_millis(250));
1322        subscriber.expect_no_message(Duration::from_millis(250));
1323
1324        subscriber.request(1);
1325        assert_eq!(subscriber.expect_next(), 1);
1326        publisher.expect_request();
1327        publisher.send_next(3);
1328        publisher.send_complete();
1329
1330        subscriber.request(3);
1331        assert_eq!(subscriber.expect_next(), 2);
1332        assert_eq!(subscriber.expect_next(), 3);
1333        subscriber.expect_complete();
1334    }
1335
1336    #[test]
1337    fn buffer_fail_surfaces_overflow_error() {
1338        let (publisher, subscriber) = buffered_probe(OverflowStrategy::Fail);
1339        publisher.expect_request();
1340        publisher.send_next(1);
1341        publisher.expect_request();
1342        publisher.send_next(2);
1343        publisher.expect_request();
1344        publisher.send_next(3);
1345        publisher.expect_cancellation();
1346
1347        subscriber.request(1);
1348        assert_eq!(
1349            subscriber.expect_error(),
1350            StreamError::Failed("Buffer overflow (max capacity was: 2)!".to_owned())
1351        );
1352    }
1353
1354    #[test]
1355    fn buffer_terminal_completion_is_sticky_across_repolls() {
1356        let (mut stream, _materializer) = materialize_buffered_stream(
1357            Source::from_iter([1, 2]).buffer(2, OverflowStrategy::Backpressure),
1358        );
1359
1360        assert_eq!(stream.next(), Some(Ok(1)));
1361        assert_eq!(stream.next(), Some(Ok(2)));
1362        assert_eq!(stream.next(), None);
1363        assert_eq!(stream.next(), None);
1364    }
1365
1366    #[test]
1367    fn buffer_terminal_failure_is_sticky_across_repolls() {
1368        let (mut stream, _materializer) = materialize_buffered_stream(
1369            Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1370                .buffer(2, OverflowStrategy::Backpressure),
1371        );
1372
1373        assert_eq!(
1374            stream.next(),
1375            Some(Err(StreamError::Failed("boom".to_owned())))
1376        );
1377        assert_eq!(
1378            stream.next(),
1379            Some(Err(StreamError::Failed("boom".to_owned())))
1380        );
1381    }
1382
1383    #[test]
1384    fn buffer_surfaces_producer_panics_as_stream_failure() {
1385        let (sender, receiver) = mpsc::channel();
1386
1387        std::thread::spawn(move || {
1388            let (mut stream, _materializer) = materialize_buffered_stream(
1389                Source::from_iter([1, 2, 3])
1390                    .map(|item| {
1391                        if item == 2 {
1392                            panic!("boom");
1393                        }
1394                        item
1395                    })
1396                    .buffer(2, OverflowStrategy::Backpressure),
1397            );
1398
1399            sender
1400                .send((stream.next(), stream.next(), stream.next()))
1401                .expect("test thread sends buffered panic results");
1402        });
1403
1404        let (first, second, third) = receiver
1405            .recv_timeout(Duration::from_secs(1))
1406            .expect("buffer panic path should not hang");
1407        assert_eq!(first, Some(Ok(1)));
1408        assert_eq!(second, Some(Err(StreamError::AbruptTermination)));
1409        assert_eq!(third, Some(Err(StreamError::AbruptTermination)));
1410    }
1411
1412    #[test]
1413    fn conflate_passes_through_without_rate_difference() {
1414        let (publisher, subscriber) =
1415            rate_probe(|source| source.conflate(|left, right| left + right));
1416
1417        for value in 1..=4 {
1418            subscriber.request(1);
1419            publisher.expect_request();
1420            publisher.send_next(value);
1421            assert_eq!(subscriber.expect_next(), value);
1422        }
1423    }
1424
1425    #[test]
1426    fn conflate_aggregates_while_downstream_is_silent() {
1427        let (publisher, subscriber) = rate_probe(|source| {
1428            source.conflate_with_seed(
1429                |item| vec![item],
1430                |mut items, item| {
1431                    items.push(item);
1432                    items
1433                },
1434            )
1435        });
1436
1437        for value in 1..=4 {
1438            publisher.expect_request();
1439            publisher.send_next(value);
1440        }
1441        publisher.expect_request();
1442        publisher.send_complete();
1443
1444        subscriber.request(2);
1445        assert_eq!(subscriber.expect_next(), vec![1, 2, 3, 4]);
1446        subscriber.expect_complete();
1447    }
1448
1449    #[test]
1450    fn batch_passes_through_without_rate_difference() {
1451        let (publisher, subscriber) =
1452            rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1453
1454        for value in 1..=4 {
1455            subscriber.request(1);
1456            publisher.expect_request();
1457            publisher.send_next(value);
1458            assert_eq!(subscriber.expect_next(), value);
1459        }
1460    }
1461
1462    #[test]
1463    fn batch_aggregates_while_downstream_is_silent() {
1464        let (publisher, subscriber) = rate_probe(|source| {
1465            source.batch(
1466                u64::MAX,
1467                |item| vec![item],
1468                |mut items, item| {
1469                    items.insert(0, item);
1470                    items
1471                },
1472            )
1473        });
1474
1475        for value in 1..=10 {
1476            publisher.expect_request();
1477            publisher.send_next(value);
1478        }
1479        publisher.expect_request();
1480        publisher.send_complete();
1481
1482        subscriber.request(1);
1483        assert_eq!(
1484            subscriber.expect_next(),
1485            vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
1486        );
1487    }
1488
1489    #[test]
1490    fn batch_weighted_keeps_heavy_elements_separate() {
1491        let (publisher, subscriber) = rate_probe(|source| {
1492            source.batch_weighted(3, |_| 4, |item| item, |left, right| left + right)
1493        });
1494
1495        publisher.expect_request();
1496        publisher.send_next(1);
1497        publisher.expect_request();
1498        publisher.send_next(2);
1499
1500        subscriber.request(1);
1501        assert_eq!(subscriber.expect_next(), 1);
1502
1503        publisher.send_next(3);
1504        publisher.send_complete();
1505
1506        subscriber.request(2);
1507        assert_eq!(subscriber.expect_next(), 2);
1508        assert_eq!(subscriber.expect_next(), 3);
1509    }
1510
1511    #[test]
1512    fn batch_backpressures_at_max_aggregate() {
1513        let (publisher, subscriber) =
1514            rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1515
1516        publisher.expect_request();
1517        publisher.send_next(1);
1518        publisher.expect_request();
1519        publisher.send_next(2);
1520
1521        publisher.expect_request();
1522        publisher.send_next(3);
1523
1524        // Backpressure: nothing flows downstream while the aggregate is full.
1525        subscriber.expect_no_message(Duration::from_millis(200));
1526
1527        // Consume the first batch — releases backpressure.
1528        subscriber.request(1);
1529        let first = subscriber.expect_next();
1530
1531        publisher.expect_request();
1532        publisher.send_next(4);
1533        publisher.send_complete();
1534
1535        // Under load the consumer may pull before the producer aggregates,
1536        // legitimately splitting batches. Verify all elements accounted for.
1537        let mut all_values = vec![first];
1538        all_values.extend(subscriber.drain_until_complete());
1539        let total: i32 = all_values.iter().sum();
1540        assert_eq!(total, 10, "all elements should be accounted for");
1541        for &v in &all_values {
1542            assert!((1..=7).contains(&v), "batch value {v} out of range");
1543        }
1544    }
1545
1546    #[test]
1547    fn expand_passes_through_without_rate_difference() {
1548        let (publisher, subscriber) = rate_probe(|source| source.expand(std::iter::once::<i32>));
1549
1550        for value in 1..=4 {
1551            publisher.expect_request();
1552            publisher.send_next(value);
1553            subscriber.request(1);
1554            assert_eq!(subscriber.expect_next(), value);
1555        }
1556    }
1557
1558    #[test]
1559    fn expand_elements_while_upstream_is_silent() {
1560        let (publisher, mut subscriber) = rate_probe(|source| source.expand(std::iter::repeat));
1561        subscriber.set_timeout(Duration::from_millis(250));
1562
1563        publisher.expect_request();
1564        publisher.send_next(42);
1565
1566        subscriber.request(4);
1567        assert_eq!(subscriber.expect_next(), 42);
1568        assert_eq!(subscriber.expect_next(), 42);
1569        assert_eq!(subscriber.expect_next(), 42);
1570        assert_eq!(subscriber.expect_next(), 42);
1571
1572        publisher.expect_request();
1573        publisher.send_next(-42);
1574        subscriber.expect_no_message(Duration::from_millis(250));
1575        subscriber.request(1);
1576        assert_eq!(subscriber.expect_next(), -42);
1577    }
1578
1579    #[test]
1580    fn expand_does_not_drop_last_element() {
1581        let (mut stream, _materializer) =
1582            materialize_buffered_stream(Source::from_iter([1, 2]).expand(std::iter::once::<i32>));
1583
1584        assert_eq!(stream.next(), Some(Ok(1)));
1585        assert_eq!(stream.next(), Some(Ok(2)));
1586        assert_eq!(stream.next(), None);
1587    }
1588
1589    #[test]
1590    fn expand_handles_finite_extrapolations() {
1591        let (mut stream, _materializer) = materialize_buffered_stream(
1592            Source::from_iter([1, 2]).expand(|item| (0..3).map(move |index| (item, index))),
1593        );
1594
1595        let mut output = Vec::new();
1596        for item in stream.by_ref() {
1597            output.push(item.expect("expand should not fail"));
1598        }
1599
1600        assert!(
1601            !output.is_empty(),
1602            "expand must emit at least the first real element"
1603        );
1604        assert_eq!(
1605            output
1606                .iter()
1607                .filter_map(|(item, index)| (*index == 0).then_some(*item))
1608                .collect::<Vec<_>>(),
1609            vec![1, 2],
1610            "each real upstream element must emit its first expanded value exactly once"
1611        );
1612
1613        let mut current_item = output[0].0;
1614        let mut expected_index = 0;
1615        for &(item, index) in &output {
1616            assert!(
1617                item == current_item || item == current_item + 1,
1618                "expanded output must stay on the current item or advance to the next real item"
1619            );
1620            if item != current_item {
1621                assert_eq!(
1622                    item,
1623                    current_item + 1,
1624                    "real items must remain in upstream order"
1625                );
1626                assert_eq!(
1627                    index, 0,
1628                    "a new real item must begin with its first expanded value"
1629                );
1630                current_item = item;
1631                expected_index = 0;
1632            }
1633            assert_eq!(
1634                index, expected_index,
1635                "each real item may be followed only by its own ordered extrapolations"
1636            );
1637            expected_index += 1;
1638        }
1639
1640        assert_eq!(stream.next(), None);
1641        assert_eq!(stream.next(), None);
1642    }
1643
1644    #[test]
1645    fn expand_emits_first_value_even_if_terminal_is_already_visible() {
1646        let shared = SlotShared::new(NoExtra);
1647        {
1648            let mut state = shared
1649                .state
1650                .lock()
1651                .unwrap_or_else(|poison| poison.into_inner());
1652            state.slot = Some(42);
1653            state.terminal = Some(TerminalSignal::Complete);
1654        }
1655
1656        let mut stream = ExpandStream {
1657            shared,
1658            completion: None,
1659            current: None,
1660            expanded_once: false,
1661            seeded_from_upstream: false,
1662            expand: Arc::new(|item| (0..3).map(move |index| (item, index))),
1663        };
1664
1665        assert_eq!(stream.next(), Some(Ok((42, 0))));
1666        assert_eq!(stream.next(), None);
1667    }
1668
1669    #[test]
1670    fn extrapolate_initial_yields_to_real_element_already_in_slot() {
1671        let shared = SlotShared::new(NoExtra);
1672        {
1673            let mut state = shared
1674                .state
1675                .lock()
1676                .unwrap_or_else(|poison| poison.into_inner());
1677            state.slot = Some(7);
1678        }
1679
1680        let mut stream = ExpandStream {
1681            shared: Arc::clone(&shared),
1682            completion: None,
1683            // The synthetic `initial` extrapolation seed, installed exactly
1684            // the way `extrapolate(..., Some(initial))` does.
1685            current: Some(Box::new(std::iter::repeat(99))),
1686            expanded_once: false,
1687            seeded_from_upstream: false,
1688            expand: Arc::new(std::iter::repeat),
1689        };
1690
1691        // A real element that arrived before the first pull wins over the
1692        // synthetic initial seed; 99 must never be emitted.
1693        assert_eq!(stream.next(), Some(Ok(7)));
1694
1695        finish_slot(&shared, TerminalSignal::Complete);
1696        assert_eq!(stream.next(), None);
1697        assert_eq!(stream.next(), None);
1698    }
1699
1700    #[test]
1701    fn extrapolate_preserves_original_before_filling_gaps() {
1702        let (publisher, subscriber) =
1703            rate_probe(|source| source.extrapolate(|item| std::iter::once(item + 100), None));
1704
1705        publisher.expect_request();
1706        publisher.send_next(1);
1707
1708        subscriber.request(2);
1709        assert_eq!(subscriber.expect_next(), 1);
1710        assert_eq!(subscriber.expect_next(), 101);
1711    }
1712
1713    #[test]
1714    fn extrapolate_emits_initial_element_before_upstream_arrives() {
1715        let (publisher, subscriber) =
1716            rate_probe(|source| source.extrapolate(std::iter::repeat, Some(0)));
1717
1718        subscriber.request(1);
1719        assert_eq!(subscriber.expect_next(), 0);
1720
1721        publisher.expect_request();
1722        publisher.send_next(42);
1723        subscriber.request(3);
1724        assert_eq!(subscriber.expect_next(), 42);
1725        assert_eq!(subscriber.expect_next(), 42);
1726        assert_eq!(subscriber.expect_next(), 42);
1727    }
1728
1729    #[test]
1730    fn aggregate_with_boundary_splits_by_size() {
1731        let result = Source::from_iter(1..=7)
1732            .aggregate_with_boundary(
1733                Vec::<i32>::new,
1734                |mut buffer, item| {
1735                    buffer.push(item);
1736                    let ready = buffer.len() >= 3;
1737                    (buffer, ready)
1738                },
1739                |buffer| buffer,
1740                None,
1741            )
1742            .run_collect()
1743            .unwrap();
1744
1745        assert_eq!(result, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
1746    }
1747
1748    #[test]
1749    fn aggregate_with_boundary_honors_timer_trigger() {
1750        let (publisher, mut subscriber) = rate_probe(|source| {
1751            source.aggregate_with_boundary(
1752                Vec::<i32>::new,
1753                |mut buffer, item| {
1754                    buffer.push(item);
1755                    (buffer, false)
1756                },
1757                |buffer| buffer,
1758                Some(AggregateTimer::new(
1759                    |buffer: &Vec<i32>| !buffer.is_empty(),
1760                    Duration::from_millis(10),
1761                )),
1762            )
1763        });
1764        subscriber.set_timeout(Duration::from_millis(200));
1765
1766        publisher.expect_request();
1767        publisher.send_next(1);
1768        publisher.expect_request();
1769        publisher.send_next(2);
1770
1771        subscriber.request(1);
1772        assert_eq!(subscriber.expect_next(), vec![1, 2]);
1773    }
1774
1775    #[test]
1776    fn detach_passes_through_all_elements() {
1777        assert_eq!(
1778            Source::from_iter(1..=100).detach().run_collect().unwrap(),
1779            (1..=100).collect::<Vec<_>>()
1780        );
1781    }
1782
1783    #[test]
1784    fn detach_passes_through_failure() {
1785        let result = Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1786            .detach()
1787            .run_collect();
1788        assert_eq!(result, Err(StreamError::Failed("boom".to_owned())));
1789    }
1790
1791    #[test]
1792    fn detach_emits_last_element_when_completed_without_demand() {
1793        let (mut stream, _materializer) = materialize_buffered_stream(Source::single(42).detach());
1794
1795        assert_eq!(stream.next(), Some(Ok(42)));
1796        assert_eq!(stream.next(), None);
1797    }
1798}