1use super::*;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum OverflowStrategy {
12 DropHead,
13 DropTail,
14 DropBuffer,
15 DropNew,
16 Backpressure,
17 Fail,
18}
19
20pub struct AggregateTimer<Agg> {
21 predicate: Arc<dyn Fn(&Agg) -> bool + Send + Sync>,
22 interval: Duration,
23}
24
25impl<Agg> Clone for AggregateTimer<Agg> {
26 fn clone(&self) -> Self {
27 Self {
28 predicate: Arc::clone(&self.predicate),
29 interval: self.interval,
30 }
31 }
32}
33
34impl<Agg> AggregateTimer<Agg> {
35 #[must_use]
36 pub fn new<F>(predicate: F, interval: Duration) -> Self
37 where
38 F: Fn(&Agg) -> bool + Send + Sync + 'static,
39 {
40 assert!(
41 interval > Duration::ZERO,
42 "aggregate_with_boundary timer interval must be greater than zero"
43 );
44 Self {
45 predicate: Arc::new(predicate),
46 interval,
47 }
48 }
49}
50
51#[derive(Clone)]
52enum TerminalSignal {
53 Complete,
54 Error(StreamError),
55}
56
57struct QueueShared<T> {
58 state: Mutex<QueueState<T>>,
59 available: Condvar,
60 cancelled: Arc<AtomicBool>,
61 capacity: usize,
62}
63
64struct QueueState<T> {
65 queue: VecDeque<T>,
66 terminal: Option<TerminalSignal>,
67}
68
69impl<T> QueueShared<T> {
70 fn new(capacity: usize) -> Arc<Self> {
71 Arc::new(Self {
72 state: Mutex::new(QueueState {
73 queue: VecDeque::with_capacity(capacity),
74 terminal: None,
75 }),
76 available: Condvar::new(),
77 cancelled: Arc::new(AtomicBool::new(false)),
78 capacity,
79 })
80 }
81}
82
83struct QueueStream<T> {
84 shared: Arc<QueueShared<T>>,
85 completion: Option<StreamCompletion<NotUsed>>,
86}
87
88impl<T> Iterator for QueueStream<T> {
89 type Item = StreamResult<T>;
90
91 fn next(&mut self) -> Option<Self::Item> {
92 let mut state = self
93 .shared
94 .state
95 .lock()
96 .unwrap_or_else(|poison| poison.into_inner());
97 loop {
98 if let Some(item) = state.queue.pop_front() {
99 self.shared.available.notify_all();
100 return Some(Ok(item));
101 }
102 if let Some(terminal) = state.terminal.clone() {
103 return match terminal {
104 TerminalSignal::Complete => None,
105 TerminalSignal::Error(error) => Some(Err(error)),
106 };
107 }
108 state = self
109 .shared
110 .available
111 .wait(state)
112 .unwrap_or_else(|poison| poison.into_inner());
113 }
114 }
115}
116
117impl<T> Drop for QueueStream<T> {
118 fn drop(&mut self) {
119 self.shared.cancelled.store(true, Ordering::SeqCst);
120 self.shared.available.notify_all();
121 let _ = self.completion.take();
122 }
123}
124
125struct SlotShared<T, Extra> {
126 state: Mutex<SlotState<T, Extra>>,
127 available: Condvar,
128 cancelled: Arc<AtomicBool>,
129}
130
131struct SlotState<T, Extra> {
132 slot: Option<T>,
133 terminal: Option<TerminalSignal>,
134 extra: Extra,
135}
136
137impl<T, Extra> SlotShared<T, Extra> {
138 fn new(extra: Extra) -> Arc<Self> {
139 Arc::new(Self {
140 state: Mutex::new(SlotState {
141 slot: None,
142 terminal: None,
143 extra,
144 }),
145 available: Condvar::new(),
146 cancelled: Arc::new(AtomicBool::new(false)),
147 })
148 }
149}
150
151struct SlotStream<T, Extra> {
152 shared: Arc<SlotShared<T, Extra>>,
153 completion: Option<StreamCompletion<NotUsed>>,
154}
155
156impl<T, Extra> Iterator for SlotStream<T, Extra> {
157 type Item = StreamResult<T>;
158
159 fn next(&mut self) -> Option<Self::Item> {
160 let mut state = self
161 .shared
162 .state
163 .lock()
164 .unwrap_or_else(|poison| poison.into_inner());
165 loop {
166 if let Some(item) = state.slot.take() {
167 self.shared.available.notify_all();
168 return Some(Ok(item));
169 }
170 if let Some(terminal) = state.terminal.clone() {
171 return match terminal {
172 TerminalSignal::Complete => None,
173 TerminalSignal::Error(error) => Some(Err(error)),
174 };
175 }
176 state = self
177 .shared
178 .available
179 .wait(state)
180 .unwrap_or_else(|poison| poison.into_inner());
181 }
182 }
183}
184
185impl<T, Extra> Drop for SlotStream<T, Extra> {
186 fn drop(&mut self) {
187 self.shared.cancelled.store(true, Ordering::SeqCst);
188 self.shared.available.notify_all();
189 let _ = self.completion.take();
190 }
191}
192
193fn finish_queue<T>(shared: &QueueShared<T>, terminal: TerminalSignal) {
194 let mut state = shared
195 .state
196 .lock()
197 .unwrap_or_else(|poison| poison.into_inner());
198 if state.terminal.is_none() {
199 state.terminal = Some(terminal);
200 }
201 drop(state);
202 shared.available.notify_all();
203}
204
205fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
206 let mut state = shared
207 .state
208 .lock()
209 .unwrap_or_else(|poison| poison.into_inner());
210 if state.terminal.is_none() {
211 state.terminal = Some(terminal);
212 }
213 drop(state);
214 shared.available.notify_all();
215}
216
217struct ProducerPanicGuard<T> {
218 shared: Arc<QueueShared<T>>,
219 armed: bool,
220}
221
222impl<T> ProducerPanicGuard<T> {
223 fn new(shared: Arc<QueueShared<T>>) -> Self {
224 Self {
225 shared,
226 armed: true,
227 }
228 }
229
230 fn disarm(&mut self) {
231 self.armed = false;
232 }
233}
234
235impl<T> Drop for ProducerPanicGuard<T> {
236 fn drop(&mut self) {
237 if self.armed {
238 finish_queue(
239 &self.shared,
240 TerminalSignal::Error(StreamError::AbruptTermination),
241 );
242 }
243 }
244}
245
246struct SlotProducerPanicGuard<T, Extra> {
247 shared: Arc<SlotShared<T, Extra>>,
248 armed: bool,
249}
250
251impl<T, Extra> SlotProducerPanicGuard<T, Extra> {
252 fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
253 Self {
254 shared,
255 armed: true,
256 }
257 }
258
259 fn disarm(&mut self) {
260 self.armed = false;
261 }
262}
263
264impl<T, Extra> Drop for SlotProducerPanicGuard<T, Extra> {
265 fn drop(&mut self) {
266 if self.armed {
267 finish_slot(
268 &self.shared,
269 TerminalSignal::Error(StreamError::AbruptTermination),
270 );
271 }
272 }
273}
274
275#[derive(Default)]
276struct NoExtra;
277
278struct BatchExtra<In> {
279 remaining: i128,
280 pending: Option<In>,
281}
282
283impl<In> BatchExtra<In> {
284 fn new(limit: u64) -> Self {
285 Self {
286 remaining: i128::from(limit),
287 pending: None,
288 }
289 }
290}
291
292struct BoundaryExtra {
293 ready: bool,
294}
295
296struct BoundaryStream<Agg, Emit> {
297 shared: Arc<SlotShared<Agg, BoundaryExtra>>,
298 completion: Option<StreamCompletion<NotUsed>>,
299 timer: Option<Cancellable>,
300 harvest: Arc<dyn Fn(Agg) -> Emit + Send + Sync>,
301}
302
303impl<Agg, Emit> Iterator for BoundaryStream<Agg, Emit> {
304 type Item = StreamResult<Emit>;
305
306 fn next(&mut self) -> Option<Self::Item> {
307 loop {
308 let (slot, terminal) = {
309 let mut state = self
310 .shared
311 .state
312 .lock()
313 .unwrap_or_else(|poison| poison.into_inner());
314 loop {
315 if state.extra.ready {
316 let slot = state.slot.take();
317 state.extra.ready = false;
318 self.shared.available.notify_all();
319 break (slot, None);
320 }
321 if state.terminal.is_some() {
322 if let Some(slot) = state.slot.take() {
323 self.shared.available.notify_all();
324 break (Some(slot), None);
325 }
326 break (None, state.terminal.clone());
327 }
328 state = self
329 .shared
330 .available
331 .wait(state)
332 .unwrap_or_else(|poison| poison.into_inner());
333 }
334 };
335
336 if let Some(agg) = slot {
337 return Some(Ok((self.harvest)(agg)));
338 }
339 if let Some(terminal) = terminal {
340 return match terminal {
341 TerminalSignal::Complete => None,
342 TerminalSignal::Error(error) => Some(Err(error)),
343 };
344 }
345 }
346 }
347}
348
349impl<Agg, Emit> Drop for BoundaryStream<Agg, Emit> {
350 fn drop(&mut self) {
351 self.shared.cancelled.store(true, Ordering::SeqCst);
352 self.shared.available.notify_all();
353 if let Some(timer) = self.timer.take() {
354 timer.cancel();
355 }
356 let _ = self.completion.take();
357 }
358}
359
360fn buffer_stage<T: Send + 'static>(
361 input: BoxStream<T>,
362 capacity: usize,
363 strategy: OverflowStrategy,
364 materializer: &Materializer,
365) -> StreamResult<BoxStream<T>> {
366 let shared = QueueShared::new(capacity);
367 let producer_shared = Arc::clone(&shared);
368 let cancelled = Arc::clone(&shared.cancelled);
369 let state = Arc::clone(&materializer.inner.state);
370 let completion = materializer.spawn_stream(move |_| {
371 let mut panic_guard = ProducerPanicGuard::new(Arc::clone(&producer_shared));
372 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
373 loop {
374 if cancelled.load(Ordering::SeqCst) {
375 panic_guard.disarm();
376 return Ok(NotUsed);
377 }
378
379 match input.next() {
380 Some(Ok(item)) => {
381 let mut guard = producer_shared
382 .state
383 .lock()
384 .unwrap_or_else(|poison| poison.into_inner());
385 match strategy {
386 OverflowStrategy::Backpressure => {
387 while guard.queue.len() == producer_shared.capacity
388 && !cancelled.load(Ordering::SeqCst)
389 {
390 guard = producer_shared
391 .available
392 .wait(guard)
393 .unwrap_or_else(|poison| poison.into_inner());
394 }
395 if cancelled.load(Ordering::SeqCst) {
396 panic_guard.disarm();
397 return Ok(NotUsed);
398 }
399 guard.queue.push_back(item);
400 }
401 OverflowStrategy::DropHead => {
402 if guard.queue.len() == producer_shared.capacity {
403 let _ = guard.queue.pop_front();
404 }
405 guard.queue.push_back(item);
406 }
407 OverflowStrategy::DropTail => {
408 if guard.queue.len() == producer_shared.capacity {
409 let _ = guard.queue.pop_back();
410 }
411 guard.queue.push_back(item);
412 }
413 OverflowStrategy::DropBuffer => {
414 if guard.queue.len() == producer_shared.capacity {
415 guard.queue.clear();
416 }
417 guard.queue.push_back(item);
418 }
419 OverflowStrategy::DropNew => {
420 if guard.queue.len() < producer_shared.capacity {
421 guard.queue.push_back(item);
422 }
423 }
424 OverflowStrategy::Fail => {
425 if guard.queue.len() == producer_shared.capacity {
426 guard.queue.clear();
427 drop(guard);
428 panic_guard.disarm();
429 finish_queue(
430 &producer_shared,
431 TerminalSignal::Error(StreamError::Failed(format!(
432 "Buffer overflow (max capacity was: {capacity})!"
433 ))),
434 );
435 return Ok(NotUsed);
436 }
437 guard.queue.push_back(item);
438 }
439 }
440 drop(guard);
441 producer_shared.available.notify_all();
442 }
443 Some(Err(error)) => {
444 panic_guard.disarm();
445 finish_queue(&producer_shared, TerminalSignal::Error(error));
446 return Ok(NotUsed);
447 }
448 None => {
449 panic_guard.disarm();
450 finish_queue(&producer_shared, TerminalSignal::Complete);
451 return Ok(NotUsed);
452 }
453 }
454 }
455 });
456
457 Ok(Box::new(QueueStream {
458 shared,
459 completion: Some(completion),
460 }))
461}
462
463fn batch_stage<In, Agg, Cost, Seed, Aggregate>(
464 input: BoxStream<In>,
465 limit: u64,
466 cost_fn: Arc<Cost>,
467 seed: Arc<Seed>,
468 aggregate: Arc<Aggregate>,
469 materializer: &Materializer,
470) -> StreamResult<BoxStream<Agg>>
471where
472 In: Send + 'static,
473 Agg: Send + 'static,
474 Cost: Fn(&In) -> u64 + Send + Sync + 'static,
475 Seed: Fn(In) -> Agg + Send + Sync + 'static,
476 Aggregate: Fn(Agg, In) -> Agg + Send + Sync + 'static,
477{
478 let shared = SlotShared::new(BatchExtra::new(limit));
479 let producer_shared = Arc::clone(&shared);
480 let cancelled = Arc::clone(&shared.cancelled);
481 let state = Arc::clone(&materializer.inner.state);
482 let completion = materializer.spawn_stream(move |_| {
483 let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
484 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
485 let mut carry = None::<In>;
486
487 loop {
488 if cancelled.load(Ordering::SeqCst) {
489 panic_guard.disarm();
490 return Ok(NotUsed);
491 }
492
493 let next = if let Some(item) = carry.take() {
494 Some(Ok(item))
495 } else {
496 input.next()
497 };
498
499 match next {
500 Some(Ok(item)) => {
501 let cost = i128::from(cost_fn(&item));
502 let current = {
503 let mut guard = producer_shared
504 .state
505 .lock()
506 .unwrap_or_else(|poison| poison.into_inner());
507
508 if guard.slot.is_none() {
509 None
510 } else if guard.extra.remaining < cost {
511 guard.extra.pending = Some(item);
512 while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
513 guard = producer_shared
514 .available
515 .wait(guard)
516 .unwrap_or_else(|poison| poison.into_inner());
517 }
518 if cancelled.load(Ordering::SeqCst) {
519 panic_guard.disarm();
520 return Ok(NotUsed);
521 }
522 carry = guard.extra.pending.take();
523 guard.extra.remaining = i128::from(limit);
524 continue;
525 } else {
526 let current = guard.slot.take();
527 guard.extra.remaining -= cost;
528 current
529 }
530 };
531
532 match current {
533 None => {
534 let next_agg = seed(item);
535 let mut guard = producer_shared
536 .state
537 .lock()
538 .unwrap_or_else(|poison| poison.into_inner());
539 guard.extra.remaining = i128::from(limit) - cost;
540 guard.slot = Some(next_agg);
541 drop(guard);
542 producer_shared.available.notify_all();
543 }
544 Some(current) => {
545 let next_agg = aggregate(current, item);
546 let mut guard = producer_shared
547 .state
548 .lock()
549 .unwrap_or_else(|poison| poison.into_inner());
550 guard.slot = Some(next_agg);
551 drop(guard);
552 producer_shared.available.notify_all();
553 }
554 }
555 }
556 Some(Err(error)) => {
557 panic_guard.disarm();
558 finish_slot(&producer_shared, TerminalSignal::Error(error));
559 return Ok(NotUsed);
560 }
561 None => {
562 panic_guard.disarm();
563 finish_slot(&producer_shared, TerminalSignal::Complete);
564 return Ok(NotUsed);
565 }
566 }
567 }
568 });
569
570 Ok(Box::new(SlotStream {
571 shared,
572 completion: Some(completion),
573 }))
574}
575
576fn expand_stage<In, Out, Expand, Iter>(
577 input: BoxStream<In>,
578 expand: Arc<Expand>,
579 initial: Option<Box<dyn Iterator<Item = Out> + Send>>,
580 materializer: &Materializer,
581) -> StreamResult<BoxStream<Out>>
582where
583 In: Send + 'static,
584 Out: Send + 'static,
585 Expand: Fn(In) -> Iter + Send + Sync + 'static,
586 Iter: Iterator<Item = Out> + Send + 'static,
587{
588 let shared = SlotShared::new(NoExtra);
589 let producer_shared = Arc::clone(&shared);
590 let cancelled = Arc::clone(&shared.cancelled);
591 let state = Arc::clone(&materializer.inner.state);
592 let completion = materializer.spawn_stream(move |_| {
593 let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
594 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
595 loop {
596 if cancelled.load(Ordering::SeqCst) {
597 panic_guard.disarm();
598 return Ok(NotUsed);
599 }
600
601 match input.next() {
602 Some(Ok(item)) => {
603 let mut guard = producer_shared
604 .state
605 .lock()
606 .unwrap_or_else(|poison| poison.into_inner());
607 while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
608 guard = producer_shared
609 .available
610 .wait(guard)
611 .unwrap_or_else(|poison| poison.into_inner());
612 }
613 if cancelled.load(Ordering::SeqCst) {
614 panic_guard.disarm();
615 return Ok(NotUsed);
616 }
617 guard.slot = Some(item);
618 drop(guard);
619 producer_shared.available.notify_all();
620 }
621 Some(Err(error)) => {
622 panic_guard.disarm();
623 finish_slot(&producer_shared, TerminalSignal::Error(error));
624 return Ok(NotUsed);
625 }
626 None => {
627 panic_guard.disarm();
628 finish_slot(&producer_shared, TerminalSignal::Complete);
629 return Ok(NotUsed);
630 }
631 }
632 }
633 });
634
635 Ok(Box::new(ExpandStream {
636 shared,
637 completion: Some(completion),
638 current: initial,
639 expanded_once: false,
640 seeded_from_upstream: false,
641 expand,
642 }))
643}
644
645struct ExpandStream<In, Out, Expand, Iter>
646where
647 Expand: Fn(In) -> Iter + Send + Sync + 'static,
648 Iter: Iterator<Item = Out> + Send + 'static,
649{
650 shared: Arc<SlotShared<In, NoExtra>>,
651 completion: Option<StreamCompletion<NotUsed>>,
652 current: Option<Box<dyn Iterator<Item = Out> + Send>>,
653 expanded_once: bool,
654 seeded_from_upstream: bool,
658 expand: Arc<Expand>,
659}
660
661impl<In, Out, Expand, Iter> Iterator for ExpandStream<In, Out, Expand, Iter>
662where
663 In: Send + 'static,
664 Out: Send + 'static,
665 Expand: Fn(In) -> Iter + Send + Sync + 'static,
666 Iter: Iterator<Item = Out> + Send + 'static,
667{
668 type Item = StreamResult<Out>;
669
670 fn next(&mut self) -> Option<Self::Item> {
671 loop {
672 if let Some(current) = &mut self.current
678 && !self.expanded_once
679 && self.seeded_from_upstream
680 {
681 if let Some(item) = current.next() {
682 self.expanded_once = true;
683 return Some(Ok(item));
684 }
685 self.current = None;
686 self.expanded_once = false;
687 }
688
689 enum Decision<In> {
694 NewElement(In),
695 Extrapolate,
696 EmitInitial,
697 Terminal(TerminalSignal),
698 }
699
700 let decision = {
701 let mut state = self
702 .shared
703 .state
704 .lock()
705 .unwrap_or_else(|poison| poison.into_inner());
706 loop {
707 if let Some(item) = state.slot.take() {
708 self.shared.available.notify_all();
709 break Decision::NewElement(item);
710 }
711
712 if let Some(terminal) = state.terminal.clone() {
713 break Decision::Terminal(terminal);
714 }
715
716 if self.current.is_some() {
717 break if self.expanded_once {
718 Decision::Extrapolate
719 } else {
720 Decision::EmitInitial
723 };
724 }
725
726 state = self
727 .shared
728 .available
729 .wait(state)
730 .unwrap_or_else(|poison| poison.into_inner());
731 }
732 };
733
734 match decision {
735 Decision::NewElement(item) => {
736 self.current = Some(Box::new((self.expand)(item)));
737 self.expanded_once = false;
738 self.seeded_from_upstream = true;
739 }
740 Decision::EmitInitial => {
741 if let Some(current) = &mut self.current
742 && let Some(item) = current.next()
743 {
744 self.expanded_once = true;
745 return Some(Ok(item));
746 }
747 self.current = None;
748 self.expanded_once = false;
749 }
750 Decision::Extrapolate => {
751 if let Some(current) = &mut self.current
752 && let Some(item) = current.next()
753 {
754 return Some(Ok(item));
755 }
756 self.current = None;
757 self.expanded_once = false;
758 }
759 Decision::Terminal(terminal) => {
760 return match terminal {
761 TerminalSignal::Complete => None,
762 TerminalSignal::Error(error) => Some(Err(error)),
763 };
764 }
765 }
766 }
767 }
768}
769
770impl<In, Out, Expand, Iter> Drop for ExpandStream<In, Out, Expand, Iter>
771where
772 Expand: Fn(In) -> Iter + Send + Sync + 'static,
773 Iter: Iterator<Item = Out> + Send + 'static,
774{
775 fn drop(&mut self) {
776 self.shared.cancelled.store(true, Ordering::SeqCst);
777 self.shared.available.notify_all();
778 let _ = self.completion.take();
779 }
780}
781
782fn aggregate_with_boundary_stage<In, Agg, Emit, Allocate, Aggregate, Harvest>(
783 input: BoxStream<In>,
784 allocate: Arc<Allocate>,
785 aggregate: Arc<Aggregate>,
786 harvest: Arc<Harvest>,
787 emit_on_timer: Option<AggregateTimer<Agg>>,
788 materializer: &Materializer,
789) -> StreamResult<BoxStream<Emit>>
790where
791 In: Send + 'static,
792 Agg: Send + 'static,
793 Emit: Send + 'static,
794 Allocate: Fn() -> Agg + Send + Sync + 'static,
795 Aggregate: Fn(Agg, In) -> (Agg, bool) + Send + Sync + 'static,
796 Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
797{
798 let shared = SlotShared::new(BoundaryExtra { ready: false });
799 let producer_shared = Arc::clone(&shared);
800 let cancelled = Arc::clone(&shared.cancelled);
801 let state = Arc::clone(&materializer.inner.state);
802 let completion = materializer.spawn_stream(move |_| {
803 let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
804 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
805 loop {
806 if cancelled.load(Ordering::SeqCst) {
807 panic_guard.disarm();
808 return Ok(NotUsed);
809 }
810
811 match input.next() {
812 Some(Ok(item)) => {
813 let current = {
814 let mut guard = producer_shared
815 .state
816 .lock()
817 .unwrap_or_else(|poison| poison.into_inner());
818 while guard.extra.ready && !cancelled.load(Ordering::SeqCst) {
819 guard = producer_shared
820 .available
821 .wait(guard)
822 .unwrap_or_else(|poison| poison.into_inner());
823 }
824 if cancelled.load(Ordering::SeqCst) {
825 panic_guard.disarm();
826 return Ok(NotUsed);
827 }
828 guard.slot.take()
829 };
830
831 let current = current.unwrap_or_else(|| allocate());
832 let (updated, ready) = aggregate(current, item);
833 let mut guard = producer_shared
834 .state
835 .lock()
836 .unwrap_or_else(|poison| poison.into_inner());
837 guard.slot = Some(updated);
838 guard.extra.ready = ready;
839 drop(guard);
840 producer_shared.available.notify_all();
841 }
842 Some(Err(error)) => {
843 panic_guard.disarm();
844 finish_slot(&producer_shared, TerminalSignal::Error(error));
845 return Ok(NotUsed);
846 }
847 None => {
848 panic_guard.disarm();
849 finish_slot(&producer_shared, TerminalSignal::Complete);
850 return Ok(NotUsed);
851 }
852 }
853 }
854 });
855
856 let timer = emit_on_timer.map(|timer| {
857 let shared = Arc::clone(&shared);
858 let cancelled = Arc::clone(&shared.cancelled);
859 materializer.schedule_with_fixed_delay(timer.interval, timer.interval, move || {
860 if cancelled.load(Ordering::SeqCst) {
861 return;
862 }
863 let should_emit = {
864 let state = shared
865 .state
866 .lock()
867 .unwrap_or_else(|poison| poison.into_inner());
868 if state.slot.is_none() || state.extra.ready || state.terminal.is_some() {
869 None
870 } else {
871 Some(std::panic::catch_unwind(std::panic::AssertUnwindSafe(
872 || (timer.predicate)(state.slot.as_ref().expect("aggregate present")),
873 )))
874 }
875 };
876 match should_emit {
877 Some(Ok(true)) => {
878 let mut state = shared
879 .state
880 .lock()
881 .unwrap_or_else(|poison| poison.into_inner());
882 if state.slot.is_some() && !state.extra.ready && state.terminal.is_none() {
883 state.extra.ready = true;
884 }
885 drop(state);
886 shared.available.notify_all();
887 }
888 Some(Ok(false)) | None => {}
889 Some(Err(_)) => {
890 finish_slot(
891 &shared,
892 TerminalSignal::Error(StreamError::AbruptTermination),
893 );
894 }
895 }
896 })
897 });
898
899 Ok(Box::new(BoundaryStream {
900 shared,
901 completion: Some(completion),
902 timer,
903 harvest,
904 }))
905}
906
907impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
908 pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Flow<In, Out, Mat> {
909 assert!(size > 0, "buffer size must be greater than zero");
910 self.via(Flow::from_runtime_transform(move |input, materializer| {
911 buffer_stage(input, size, strategy, materializer)
912 }))
913 }
914
915 pub fn conflate_with_seed<Agg, Seed, Aggregate>(
916 self,
917 seed: Seed,
918 aggregate: Aggregate,
919 ) -> Flow<In, Agg, Mat>
920 where
921 Agg: Send + 'static,
922 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
923 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
924 {
925 let seed = Arc::new(seed);
926 let aggregate = Arc::new(aggregate);
927 self.via(Flow::from_runtime_transform(move |input, materializer| {
928 batch_stage(
929 input,
930 1,
931 Arc::new(|_: &Out| 0),
932 Arc::clone(&seed),
933 Arc::clone(&aggregate),
934 materializer,
935 )
936 }))
937 }
938
939 pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
940 self.conflate_with_seed(|item| item, aggregate)
941 }
942
943 pub fn batch<Agg, Seed, Aggregate>(
944 self,
945 max: u64,
946 seed: Seed,
947 aggregate: Aggregate,
948 ) -> Flow<In, Agg, Mat>
949 where
950 Agg: Send + 'static,
951 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
952 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
953 {
954 assert!(max > 0, "batch max must be greater than zero");
955 let seed = Arc::new(seed);
956 let aggregate = Arc::new(aggregate);
957 self.via(Flow::from_runtime_transform(move |input, materializer| {
958 batch_stage(
959 input,
960 max,
961 Arc::new(|_: &Out| 1),
962 Arc::clone(&seed),
963 Arc::clone(&aggregate),
964 materializer,
965 )
966 }))
967 }
968
969 pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
970 self,
971 max: u64,
972 cost_fn: Cost,
973 seed: Seed,
974 aggregate: Aggregate,
975 ) -> Flow<In, Agg, Mat>
976 where
977 Agg: Send + 'static,
978 Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
979 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
980 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
981 {
982 assert!(max > 0, "batch_weighted max must be greater than zero");
983 let cost_fn = Arc::new(cost_fn);
984 let seed = Arc::new(seed);
985 let aggregate = Arc::new(aggregate);
986 self.via(Flow::from_runtime_transform(move |input, materializer| {
987 batch_stage(
988 input,
989 max,
990 Arc::clone(&cost_fn),
991 Arc::clone(&seed),
992 Arc::clone(&aggregate),
993 materializer,
994 )
995 }))
996 }
997
998 pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Flow<In, Next, Mat>
999 where
1000 Next: Send + 'static,
1001 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1002 Iter: Iterator<Item = Next> + Send + 'static,
1003 {
1004 let expand = Arc::new(expand);
1005 self.via(Flow::from_runtime_transform(move |input, materializer| {
1006 expand_stage(input, Arc::clone(&expand), None, materializer)
1007 }))
1008 }
1009
1010 pub fn extrapolate<Expand, Iter>(
1011 self,
1012 extrapolator: Expand,
1013 initial: Option<Out>,
1014 ) -> Flow<In, Out, Mat>
1015 where
1016 Out: Clone + Sync,
1017 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1018 Iter: Iterator<Item = Out> + Send + 'static,
1019 {
1020 let extrapolator = Arc::new(extrapolator);
1021 self.via(Flow::from_runtime_transform(move |input, materializer| {
1022 let extrapolator = Arc::clone(&extrapolator);
1023 let initial = initial.clone().map(|item| {
1024 Box::new(std::iter::once(item)) as Box<dyn Iterator<Item = Out> + Send>
1025 });
1026 expand_stage(
1027 input,
1028 Arc::new(move |item: Out| {
1029 std::iter::once(item.clone()).chain((extrapolator)(item))
1030 }),
1031 initial,
1032 materializer,
1033 )
1034 }))
1035 }
1036
1037 pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1038 self,
1039 allocate: Allocate,
1040 aggregate: Aggregate,
1041 harvest: Harvest,
1042 emit_on_timer: Option<AggregateTimer<Agg>>,
1043 ) -> Flow<In, Emit, Mat>
1044 where
1045 Agg: Send + 'static,
1046 Emit: Send + 'static,
1047 Allocate: Fn() -> Agg + Send + Sync + 'static,
1048 Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1049 Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1050 {
1051 let allocate = Arc::new(allocate);
1052 let aggregate = Arc::new(aggregate);
1053 let harvest = Arc::new(harvest);
1054 self.via(Flow::from_runtime_transform(move |input, materializer| {
1055 aggregate_with_boundary_stage(
1056 input,
1057 Arc::clone(&allocate),
1058 Arc::clone(&aggregate),
1059 Arc::clone(&harvest),
1060 emit_on_timer.clone(),
1061 materializer,
1062 )
1063 }))
1064 }
1065
1066 pub fn detach(self) -> Flow<In, Out, Mat> {
1067 self.buffer(1, OverflowStrategy::Backpressure)
1068 }
1069}
1070
1071impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1072 pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Self {
1073 self.via(Flow::identity().buffer(size, strategy))
1074 }
1075
1076 pub fn conflate_with_seed<Agg, Seed, Aggregate>(
1077 self,
1078 seed: Seed,
1079 aggregate: Aggregate,
1080 ) -> Source<Agg, Mat>
1081 where
1082 Agg: Send + 'static,
1083 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1084 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1085 {
1086 self.via(Flow::identity().conflate_with_seed(seed, aggregate))
1087 }
1088
1089 pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
1090 self.via(Flow::identity().conflate(aggregate))
1091 }
1092
1093 pub fn batch<Agg, Seed, Aggregate>(
1094 self,
1095 max: u64,
1096 seed: Seed,
1097 aggregate: Aggregate,
1098 ) -> Source<Agg, Mat>
1099 where
1100 Agg: Send + 'static,
1101 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1102 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1103 {
1104 self.via(Flow::identity().batch(max, seed, aggregate))
1105 }
1106
1107 pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
1108 self,
1109 max: u64,
1110 cost_fn: Cost,
1111 seed: Seed,
1112 aggregate: Aggregate,
1113 ) -> Source<Agg, Mat>
1114 where
1115 Agg: Send + 'static,
1116 Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
1117 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1118 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1119 {
1120 self.via(Flow::identity().batch_weighted(max, cost_fn, seed, aggregate))
1121 }
1122
1123 pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Source<Next, Mat>
1124 where
1125 Next: Send + 'static,
1126 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1127 Iter: Iterator<Item = Next> + Send + 'static,
1128 {
1129 self.via(Flow::identity().expand(expand))
1130 }
1131
1132 pub fn extrapolate<Expand, Iter>(self, extrapolator: Expand, initial: Option<Out>) -> Self
1133 where
1134 Out: Clone + Sync,
1135 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1136 Iter: Iterator<Item = Out> + Send + 'static,
1137 {
1138 self.via(Flow::identity().extrapolate(extrapolator, initial))
1139 }
1140
1141 pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1142 self,
1143 allocate: Allocate,
1144 aggregate: Aggregate,
1145 harvest: Harvest,
1146 emit_on_timer: Option<AggregateTimer<Agg>>,
1147 ) -> Source<Emit, Mat>
1148 where
1149 Agg: Send + 'static,
1150 Emit: Send + 'static,
1151 Allocate: Fn() -> Agg + Send + Sync + 'static,
1152 Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1153 Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1154 {
1155 self.via(Flow::identity().aggregate_with_boundary(
1156 allocate,
1157 aggregate,
1158 harvest,
1159 emit_on_timer,
1160 ))
1161 }
1162
1163 pub fn detach(self) -> Self {
1164 self.via(Flow::identity().detach())
1165 }
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use super::*;
1171 use crate::testkit::{TestSink, TestSource};
1172 use std::sync::mpsc;
1173 use std::time::Duration;
1174
1175 fn materialize_buffered_stream<T: Send + 'static>(
1176 source: Source<T>,
1177 ) -> (BoxStream<T>, Materializer) {
1178 let materializer = Materializer::new();
1179 let (stream, _) = Arc::clone(&source.factory)
1180 .create(&materializer)
1181 .expect("buffer source materializes");
1182 (stream, materializer)
1183 }
1184
1185 fn buffered_probe(
1186 strategy: OverflowStrategy,
1187 ) -> (
1188 crate::testkit::TestPublisherProbe<i32>,
1189 crate::testkit::TestSubscriberProbe<i32>,
1190 ) {
1191 TestSource::probe::<i32>()
1192 .buffer(2, strategy)
1193 .to_mat(TestSink::probe(), Keep::both)
1194 .run()
1195 .expect("buffer probe materializes")
1196 }
1197
1198 fn rate_probe<T, U>(
1199 flow: impl FnOnce(
1200 Source<T, crate::testkit::TestPublisherProbe<T>>,
1201 ) -> Source<U, crate::testkit::TestPublisherProbe<T>>,
1202 ) -> (
1203 crate::testkit::TestPublisherProbe<T>,
1204 crate::testkit::TestSubscriberProbe<U>,
1205 )
1206 where
1207 T: Send + 'static,
1208 U: Send + 'static,
1209 {
1210 flow(TestSource::probe::<T>())
1211 .to_mat(TestSink::probe(), Keep::both)
1212 .run()
1213 .expect("rate probe materializes")
1214 }
1215
1216 #[test]
1217 fn buffer_drop_head_drops_oldest_buffered_elements() {
1218 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropHead);
1219 publisher.expect_request();
1220 publisher.send_next(1);
1221 publisher.expect_request();
1222 publisher.send_next(2);
1223 publisher.expect_request();
1224 publisher.send_next(3);
1225 publisher.expect_request();
1226 publisher.send_next(4);
1227 publisher.expect_request();
1228 publisher.send_complete();
1229
1230 subscriber.request(3);
1231 assert_eq!(subscriber.expect_next(), 3);
1232 assert_eq!(subscriber.expect_next(), 4);
1233 subscriber.expect_complete();
1234 }
1235
1236 #[test]
1237 fn buffer_pulls_eagerly_before_downstream_demand() {
1238 let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1239 publisher.set_timeout(Duration::from_millis(250));
1240 subscriber.set_timeout(Duration::from_millis(250));
1241
1242 publisher.expect_request();
1243 publisher.send_next(1);
1244 publisher.expect_request();
1245 publisher.send_next(2);
1246 subscriber.expect_no_message(Duration::from_millis(250));
1247
1248 subscriber.request(2);
1249 assert_eq!(subscriber.expect_next(), 1);
1250 assert_eq!(subscriber.expect_next(), 2);
1251 }
1252
1253 #[test]
1254 fn buffer_drop_tail_drops_newest_buffered_element() {
1255 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropTail);
1256 publisher.expect_request();
1257 publisher.send_next(1);
1258 publisher.expect_request();
1259 publisher.send_next(2);
1260 publisher.expect_request();
1261 publisher.send_next(3);
1262 publisher.expect_request();
1263 publisher.send_next(4);
1264 publisher.expect_request();
1265 publisher.send_complete();
1266
1267 subscriber.request(3);
1268 assert_eq!(subscriber.expect_next(), 1);
1269 assert_eq!(subscriber.expect_next(), 4);
1270 subscriber.expect_complete();
1271 }
1272
1273 #[test]
1274 fn buffer_drop_buffer_drops_all_buffered_elements() {
1275 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropBuffer);
1276 publisher.expect_request();
1277 publisher.send_next(1);
1278 publisher.expect_request();
1279 publisher.send_next(2);
1280 publisher.expect_request();
1281 publisher.send_next(3);
1282 publisher.expect_request();
1283 publisher.send_next(4);
1284 publisher.expect_request();
1285 publisher.send_complete();
1286
1287 subscriber.request(3);
1288 assert_eq!(subscriber.expect_next(), 3);
1289 assert_eq!(subscriber.expect_next(), 4);
1290 subscriber.expect_complete();
1291 }
1292
1293 #[test]
1294 fn buffer_drop_new_drops_incoming_elements() {
1295 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropNew);
1296 publisher.expect_request();
1297 publisher.send_next(1);
1298 publisher.expect_request();
1299 publisher.send_next(2);
1300 publisher.expect_request();
1301 publisher.send_next(3);
1302 publisher.expect_request();
1303 publisher.send_next(4);
1304 publisher.expect_request();
1305 publisher.send_complete();
1306
1307 subscriber.request(3);
1308 assert_eq!(subscriber.expect_next(), 1);
1309 assert_eq!(subscriber.expect_next(), 2);
1310 subscriber.expect_complete();
1311 }
1312
1313 #[test]
1314 fn buffer_backpressure_stops_pulling_when_full() {
1315 let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1316 publisher.expect_request();
1317 publisher.send_next(1);
1318 publisher.expect_request();
1319 publisher.send_next(2);
1320 publisher.set_timeout(Duration::from_millis(250));
1321 subscriber.set_timeout(Duration::from_millis(250));
1322 subscriber.expect_no_message(Duration::from_millis(250));
1323
1324 subscriber.request(1);
1325 assert_eq!(subscriber.expect_next(), 1);
1326 publisher.expect_request();
1327 publisher.send_next(3);
1328 publisher.send_complete();
1329
1330 subscriber.request(3);
1331 assert_eq!(subscriber.expect_next(), 2);
1332 assert_eq!(subscriber.expect_next(), 3);
1333 subscriber.expect_complete();
1334 }
1335
1336 #[test]
1337 fn buffer_fail_surfaces_overflow_error() {
1338 let (publisher, subscriber) = buffered_probe(OverflowStrategy::Fail);
1339 publisher.expect_request();
1340 publisher.send_next(1);
1341 publisher.expect_request();
1342 publisher.send_next(2);
1343 publisher.expect_request();
1344 publisher.send_next(3);
1345 publisher.expect_cancellation();
1346
1347 subscriber.request(1);
1348 assert_eq!(
1349 subscriber.expect_error(),
1350 StreamError::Failed("Buffer overflow (max capacity was: 2)!".to_owned())
1351 );
1352 }
1353
1354 #[test]
1355 fn buffer_terminal_completion_is_sticky_across_repolls() {
1356 let (mut stream, _materializer) = materialize_buffered_stream(
1357 Source::from_iter([1, 2]).buffer(2, OverflowStrategy::Backpressure),
1358 );
1359
1360 assert_eq!(stream.next(), Some(Ok(1)));
1361 assert_eq!(stream.next(), Some(Ok(2)));
1362 assert_eq!(stream.next(), None);
1363 assert_eq!(stream.next(), None);
1364 }
1365
1366 #[test]
1367 fn buffer_terminal_failure_is_sticky_across_repolls() {
1368 let (mut stream, _materializer) = materialize_buffered_stream(
1369 Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1370 .buffer(2, OverflowStrategy::Backpressure),
1371 );
1372
1373 assert_eq!(
1374 stream.next(),
1375 Some(Err(StreamError::Failed("boom".to_owned())))
1376 );
1377 assert_eq!(
1378 stream.next(),
1379 Some(Err(StreamError::Failed("boom".to_owned())))
1380 );
1381 }
1382
1383 #[test]
1384 fn buffer_surfaces_producer_panics_as_stream_failure() {
1385 let (sender, receiver) = mpsc::channel();
1386
1387 std::thread::spawn(move || {
1388 let (mut stream, _materializer) = materialize_buffered_stream(
1389 Source::from_iter([1, 2, 3])
1390 .map(|item| {
1391 if item == 2 {
1392 panic!("boom");
1393 }
1394 item
1395 })
1396 .buffer(2, OverflowStrategy::Backpressure),
1397 );
1398
1399 sender
1400 .send((stream.next(), stream.next(), stream.next()))
1401 .expect("test thread sends buffered panic results");
1402 });
1403
1404 let (first, second, third) = receiver
1405 .recv_timeout(Duration::from_secs(1))
1406 .expect("buffer panic path should not hang");
1407 assert_eq!(first, Some(Ok(1)));
1408 assert_eq!(second, Some(Err(StreamError::AbruptTermination)));
1409 assert_eq!(third, Some(Err(StreamError::AbruptTermination)));
1410 }
1411
1412 #[test]
1413 fn conflate_passes_through_without_rate_difference() {
1414 let (publisher, subscriber) =
1415 rate_probe(|source| source.conflate(|left, right| left + right));
1416
1417 for value in 1..=4 {
1418 subscriber.request(1);
1419 publisher.expect_request();
1420 publisher.send_next(value);
1421 assert_eq!(subscriber.expect_next(), value);
1422 }
1423 }
1424
1425 #[test]
1426 fn conflate_aggregates_while_downstream_is_silent() {
1427 let (publisher, subscriber) = rate_probe(|source| {
1428 source.conflate_with_seed(
1429 |item| vec![item],
1430 |mut items, item| {
1431 items.push(item);
1432 items
1433 },
1434 )
1435 });
1436
1437 for value in 1..=4 {
1438 publisher.expect_request();
1439 publisher.send_next(value);
1440 }
1441 publisher.expect_request();
1442 publisher.send_complete();
1443
1444 subscriber.request(2);
1445 assert_eq!(subscriber.expect_next(), vec![1, 2, 3, 4]);
1446 subscriber.expect_complete();
1447 }
1448
1449 #[test]
1450 fn batch_passes_through_without_rate_difference() {
1451 let (publisher, subscriber) =
1452 rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1453
1454 for value in 1..=4 {
1455 subscriber.request(1);
1456 publisher.expect_request();
1457 publisher.send_next(value);
1458 assert_eq!(subscriber.expect_next(), value);
1459 }
1460 }
1461
1462 #[test]
1463 fn batch_aggregates_while_downstream_is_silent() {
1464 let (publisher, subscriber) = rate_probe(|source| {
1465 source.batch(
1466 u64::MAX,
1467 |item| vec![item],
1468 |mut items, item| {
1469 items.insert(0, item);
1470 items
1471 },
1472 )
1473 });
1474
1475 for value in 1..=10 {
1476 publisher.expect_request();
1477 publisher.send_next(value);
1478 }
1479 publisher.expect_request();
1480 publisher.send_complete();
1481
1482 subscriber.request(1);
1483 assert_eq!(
1484 subscriber.expect_next(),
1485 vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
1486 );
1487 }
1488
1489 #[test]
1490 fn batch_weighted_keeps_heavy_elements_separate() {
1491 let (publisher, subscriber) = rate_probe(|source| {
1492 source.batch_weighted(3, |_| 4, |item| item, |left, right| left + right)
1493 });
1494
1495 publisher.expect_request();
1496 publisher.send_next(1);
1497 publisher.expect_request();
1498 publisher.send_next(2);
1499
1500 subscriber.request(1);
1501 assert_eq!(subscriber.expect_next(), 1);
1502
1503 publisher.send_next(3);
1504 publisher.send_complete();
1505
1506 subscriber.request(2);
1507 assert_eq!(subscriber.expect_next(), 2);
1508 assert_eq!(subscriber.expect_next(), 3);
1509 }
1510
1511 #[test]
1512 fn batch_backpressures_at_max_aggregate() {
1513 let (publisher, subscriber) =
1514 rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1515
1516 publisher.expect_request();
1517 publisher.send_next(1);
1518 publisher.expect_request();
1519 publisher.send_next(2);
1520
1521 publisher.expect_request();
1522 publisher.send_next(3);
1523
1524 subscriber.expect_no_message(Duration::from_millis(200));
1526
1527 subscriber.request(1);
1529 let first = subscriber.expect_next();
1530
1531 publisher.expect_request();
1532 publisher.send_next(4);
1533 publisher.send_complete();
1534
1535 let mut all_values = vec![first];
1538 all_values.extend(subscriber.drain_until_complete());
1539 let total: i32 = all_values.iter().sum();
1540 assert_eq!(total, 10, "all elements should be accounted for");
1541 for &v in &all_values {
1542 assert!((1..=7).contains(&v), "batch value {v} out of range");
1543 }
1544 }
1545
1546 #[test]
1547 fn expand_passes_through_without_rate_difference() {
1548 let (publisher, subscriber) = rate_probe(|source| source.expand(std::iter::once::<i32>));
1549
1550 for value in 1..=4 {
1551 publisher.expect_request();
1552 publisher.send_next(value);
1553 subscriber.request(1);
1554 assert_eq!(subscriber.expect_next(), value);
1555 }
1556 }
1557
1558 #[test]
1559 fn expand_elements_while_upstream_is_silent() {
1560 let (publisher, mut subscriber) = rate_probe(|source| source.expand(std::iter::repeat));
1561 subscriber.set_timeout(Duration::from_millis(250));
1562
1563 publisher.expect_request();
1564 publisher.send_next(42);
1565
1566 subscriber.request(4);
1567 assert_eq!(subscriber.expect_next(), 42);
1568 assert_eq!(subscriber.expect_next(), 42);
1569 assert_eq!(subscriber.expect_next(), 42);
1570 assert_eq!(subscriber.expect_next(), 42);
1571
1572 publisher.expect_request();
1573 publisher.send_next(-42);
1574 subscriber.expect_no_message(Duration::from_millis(250));
1575 subscriber.request(1);
1576 assert_eq!(subscriber.expect_next(), -42);
1577 }
1578
1579 #[test]
1580 fn expand_does_not_drop_last_element() {
1581 let (mut stream, _materializer) =
1582 materialize_buffered_stream(Source::from_iter([1, 2]).expand(std::iter::once::<i32>));
1583
1584 assert_eq!(stream.next(), Some(Ok(1)));
1585 assert_eq!(stream.next(), Some(Ok(2)));
1586 assert_eq!(stream.next(), None);
1587 }
1588
1589 #[test]
1590 fn expand_handles_finite_extrapolations() {
1591 let (mut stream, _materializer) = materialize_buffered_stream(
1592 Source::from_iter([1, 2]).expand(|item| (0..3).map(move |index| (item, index))),
1593 );
1594
1595 let mut output = Vec::new();
1596 for item in stream.by_ref() {
1597 output.push(item.expect("expand should not fail"));
1598 }
1599
1600 assert!(
1601 !output.is_empty(),
1602 "expand must emit at least the first real element"
1603 );
1604 assert_eq!(
1605 output
1606 .iter()
1607 .filter_map(|(item, index)| (*index == 0).then_some(*item))
1608 .collect::<Vec<_>>(),
1609 vec![1, 2],
1610 "each real upstream element must emit its first expanded value exactly once"
1611 );
1612
1613 let mut current_item = output[0].0;
1614 let mut expected_index = 0;
1615 for &(item, index) in &output {
1616 assert!(
1617 item == current_item || item == current_item + 1,
1618 "expanded output must stay on the current item or advance to the next real item"
1619 );
1620 if item != current_item {
1621 assert_eq!(
1622 item,
1623 current_item + 1,
1624 "real items must remain in upstream order"
1625 );
1626 assert_eq!(
1627 index, 0,
1628 "a new real item must begin with its first expanded value"
1629 );
1630 current_item = item;
1631 expected_index = 0;
1632 }
1633 assert_eq!(
1634 index, expected_index,
1635 "each real item may be followed only by its own ordered extrapolations"
1636 );
1637 expected_index += 1;
1638 }
1639
1640 assert_eq!(stream.next(), None);
1641 assert_eq!(stream.next(), None);
1642 }
1643
1644 #[test]
1645 fn expand_emits_first_value_even_if_terminal_is_already_visible() {
1646 let shared = SlotShared::new(NoExtra);
1647 {
1648 let mut state = shared
1649 .state
1650 .lock()
1651 .unwrap_or_else(|poison| poison.into_inner());
1652 state.slot = Some(42);
1653 state.terminal = Some(TerminalSignal::Complete);
1654 }
1655
1656 let mut stream = ExpandStream {
1657 shared,
1658 completion: None,
1659 current: None,
1660 expanded_once: false,
1661 seeded_from_upstream: false,
1662 expand: Arc::new(|item| (0..3).map(move |index| (item, index))),
1663 };
1664
1665 assert_eq!(stream.next(), Some(Ok((42, 0))));
1666 assert_eq!(stream.next(), None);
1667 }
1668
1669 #[test]
1670 fn extrapolate_initial_yields_to_real_element_already_in_slot() {
1671 let shared = SlotShared::new(NoExtra);
1672 {
1673 let mut state = shared
1674 .state
1675 .lock()
1676 .unwrap_or_else(|poison| poison.into_inner());
1677 state.slot = Some(7);
1678 }
1679
1680 let mut stream = ExpandStream {
1681 shared: Arc::clone(&shared),
1682 completion: None,
1683 current: Some(Box::new(std::iter::repeat(99))),
1686 expanded_once: false,
1687 seeded_from_upstream: false,
1688 expand: Arc::new(std::iter::repeat),
1689 };
1690
1691 assert_eq!(stream.next(), Some(Ok(7)));
1694
1695 finish_slot(&shared, TerminalSignal::Complete);
1696 assert_eq!(stream.next(), None);
1697 assert_eq!(stream.next(), None);
1698 }
1699
1700 #[test]
1701 fn extrapolate_preserves_original_before_filling_gaps() {
1702 let (publisher, subscriber) =
1703 rate_probe(|source| source.extrapolate(|item| std::iter::once(item + 100), None));
1704
1705 publisher.expect_request();
1706 publisher.send_next(1);
1707
1708 subscriber.request(2);
1709 assert_eq!(subscriber.expect_next(), 1);
1710 assert_eq!(subscriber.expect_next(), 101);
1711 }
1712
1713 #[test]
1714 fn extrapolate_emits_initial_element_before_upstream_arrives() {
1715 let (publisher, subscriber) =
1716 rate_probe(|source| source.extrapolate(std::iter::repeat, Some(0)));
1717
1718 subscriber.request(1);
1719 assert_eq!(subscriber.expect_next(), 0);
1720
1721 publisher.expect_request();
1722 publisher.send_next(42);
1723 subscriber.request(3);
1724 assert_eq!(subscriber.expect_next(), 42);
1725 assert_eq!(subscriber.expect_next(), 42);
1726 assert_eq!(subscriber.expect_next(), 42);
1727 }
1728
1729 #[test]
1730 fn aggregate_with_boundary_splits_by_size() {
1731 let result = Source::from_iter(1..=7)
1732 .aggregate_with_boundary(
1733 Vec::<i32>::new,
1734 |mut buffer, item| {
1735 buffer.push(item);
1736 let ready = buffer.len() >= 3;
1737 (buffer, ready)
1738 },
1739 |buffer| buffer,
1740 None,
1741 )
1742 .run_collect()
1743 .unwrap();
1744
1745 assert_eq!(result, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
1746 }
1747
1748 #[test]
1749 fn aggregate_with_boundary_honors_timer_trigger() {
1750 let (publisher, mut subscriber) = rate_probe(|source| {
1751 source.aggregate_with_boundary(
1752 Vec::<i32>::new,
1753 |mut buffer, item| {
1754 buffer.push(item);
1755 (buffer, false)
1756 },
1757 |buffer| buffer,
1758 Some(AggregateTimer::new(
1759 |buffer: &Vec<i32>| !buffer.is_empty(),
1760 Duration::from_millis(10),
1761 )),
1762 )
1763 });
1764 subscriber.set_timeout(Duration::from_millis(200));
1765
1766 publisher.expect_request();
1767 publisher.send_next(1);
1768 publisher.expect_request();
1769 publisher.send_next(2);
1770
1771 subscriber.request(1);
1772 assert_eq!(subscriber.expect_next(), vec![1, 2]);
1773 }
1774
1775 #[test]
1776 fn detach_passes_through_all_elements() {
1777 assert_eq!(
1778 Source::from_iter(1..=100).detach().run_collect().unwrap(),
1779 (1..=100).collect::<Vec<_>>()
1780 );
1781 }
1782
1783 #[test]
1784 fn detach_passes_through_failure() {
1785 let result = Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1786 .detach()
1787 .run_collect();
1788 assert_eq!(result, Err(StreamError::Failed("boom".to_owned())));
1789 }
1790
1791 #[test]
1792 fn detach_emits_last_element_when_completed_without_demand() {
1793 let (mut stream, _materializer) = materialize_buffered_stream(Source::single(42).detach());
1794
1795 assert_eq!(stream.next(), Some(Ok(42)));
1796 assert_eq!(stream.next(), None);
1797 }
1798}