1use super::*;
2
3#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4pub enum OverflowStrategy {
5 DropHead,
6 DropTail,
7 DropBuffer,
8 DropNew,
9 Backpressure,
10 Fail,
11}
12
13pub struct AggregateTimer<Agg> {
14 predicate: Arc<dyn Fn(&Agg) -> bool + Send + Sync>,
15 interval: Duration,
16}
17
18impl<Agg> Clone for AggregateTimer<Agg> {
19 fn clone(&self) -> Self {
20 Self {
21 predicate: Arc::clone(&self.predicate),
22 interval: self.interval,
23 }
24 }
25}
26
27impl<Agg> AggregateTimer<Agg> {
28 #[must_use]
29 pub fn new<F>(predicate: F, interval: Duration) -> Self
30 where
31 F: Fn(&Agg) -> bool + Send + Sync + 'static,
32 {
33 assert!(
34 interval > Duration::ZERO,
35 "aggregate_with_boundary timer interval must be greater than zero"
36 );
37 Self {
38 predicate: Arc::new(predicate),
39 interval,
40 }
41 }
42}
43
44#[derive(Clone)]
45enum TerminalSignal {
46 Complete,
47 Error(StreamError),
48}
49
50struct QueueShared<T> {
51 state: Mutex<QueueState<T>>,
52 available: Condvar,
53 cancelled: Arc<AtomicBool>,
54 capacity: usize,
55}
56
57struct QueueState<T> {
58 queue: VecDeque<T>,
59 terminal: Option<TerminalSignal>,
60}
61
62impl<T> QueueShared<T> {
63 fn new(capacity: usize) -> Arc<Self> {
64 Arc::new(Self {
65 state: Mutex::new(QueueState {
66 queue: VecDeque::with_capacity(capacity),
67 terminal: None,
68 }),
69 available: Condvar::new(),
70 cancelled: Arc::new(AtomicBool::new(false)),
71 capacity,
72 })
73 }
74}
75
76struct QueueStream<T> {
77 shared: Arc<QueueShared<T>>,
78 completion: Option<StreamCompletion<NotUsed>>,
79}
80
81impl<T> Iterator for QueueStream<T> {
82 type Item = StreamResult<T>;
83
84 fn next(&mut self) -> Option<Self::Item> {
85 let mut state = self
86 .shared
87 .state
88 .lock()
89 .unwrap_or_else(|poison| poison.into_inner());
90 loop {
91 if let Some(item) = state.queue.pop_front() {
92 self.shared.available.notify_all();
93 return Some(Ok(item));
94 }
95 if let Some(terminal) = state.terminal.clone() {
96 return match terminal {
97 TerminalSignal::Complete => None,
98 TerminalSignal::Error(error) => Some(Err(error)),
99 };
100 }
101 state = self
102 .shared
103 .available
104 .wait(state)
105 .unwrap_or_else(|poison| poison.into_inner());
106 }
107 }
108}
109
110impl<T> Drop for QueueStream<T> {
111 fn drop(&mut self) {
112 self.shared.cancelled.store(true, Ordering::SeqCst);
113 self.shared.available.notify_all();
114 let _ = self.completion.take();
115 }
116}
117
118struct SlotShared<T, Extra> {
119 state: Mutex<SlotState<T, Extra>>,
120 available: Condvar,
121 cancelled: Arc<AtomicBool>,
122}
123
124struct SlotState<T, Extra> {
125 slot: Option<T>,
126 terminal: Option<TerminalSignal>,
127 extra: Extra,
128}
129
130impl<T, Extra> SlotShared<T, Extra> {
131 fn new(extra: Extra) -> Arc<Self> {
132 Arc::new(Self {
133 state: Mutex::new(SlotState {
134 slot: None,
135 terminal: None,
136 extra,
137 }),
138 available: Condvar::new(),
139 cancelled: Arc::new(AtomicBool::new(false)),
140 })
141 }
142}
143
144struct SlotStream<T, Extra> {
145 shared: Arc<SlotShared<T, Extra>>,
146 completion: Option<StreamCompletion<NotUsed>>,
147}
148
149impl<T, Extra> Iterator for SlotStream<T, Extra> {
150 type Item = StreamResult<T>;
151
152 fn next(&mut self) -> Option<Self::Item> {
153 let mut state = self
154 .shared
155 .state
156 .lock()
157 .unwrap_or_else(|poison| poison.into_inner());
158 loop {
159 if let Some(item) = state.slot.take() {
160 self.shared.available.notify_all();
161 return Some(Ok(item));
162 }
163 if let Some(terminal) = state.terminal.clone() {
164 return match terminal {
165 TerminalSignal::Complete => None,
166 TerminalSignal::Error(error) => Some(Err(error)),
167 };
168 }
169 state = self
170 .shared
171 .available
172 .wait(state)
173 .unwrap_or_else(|poison| poison.into_inner());
174 }
175 }
176}
177
178impl<T, Extra> Drop for SlotStream<T, Extra> {
179 fn drop(&mut self) {
180 self.shared.cancelled.store(true, Ordering::SeqCst);
181 self.shared.available.notify_all();
182 let _ = self.completion.take();
183 }
184}
185
186fn finish_queue<T>(shared: &QueueShared<T>, terminal: TerminalSignal) {
187 let mut state = shared
188 .state
189 .lock()
190 .unwrap_or_else(|poison| poison.into_inner());
191 if state.terminal.is_none() {
192 state.terminal = Some(terminal);
193 }
194 drop(state);
195 shared.available.notify_all();
196}
197
198fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
199 let mut state = shared
200 .state
201 .lock()
202 .unwrap_or_else(|poison| poison.into_inner());
203 if state.terminal.is_none() {
204 state.terminal = Some(terminal);
205 }
206 drop(state);
207 shared.available.notify_all();
208}
209
210struct ProducerPanicGuard<T> {
211 shared: Arc<QueueShared<T>>,
212 armed: bool,
213}
214
215impl<T> ProducerPanicGuard<T> {
216 fn new(shared: Arc<QueueShared<T>>) -> Self {
217 Self {
218 shared,
219 armed: true,
220 }
221 }
222
223 fn disarm(&mut self) {
224 self.armed = false;
225 }
226}
227
228impl<T> Drop for ProducerPanicGuard<T> {
229 fn drop(&mut self) {
230 if self.armed {
231 finish_queue(
232 &self.shared,
233 TerminalSignal::Error(StreamError::AbruptTermination),
234 );
235 }
236 }
237}
238
239struct SlotProducerPanicGuard<T, Extra> {
240 shared: Arc<SlotShared<T, Extra>>,
241 armed: bool,
242}
243
244impl<T, Extra> SlotProducerPanicGuard<T, Extra> {
245 fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
246 Self {
247 shared,
248 armed: true,
249 }
250 }
251
252 fn disarm(&mut self) {
253 self.armed = false;
254 }
255}
256
257impl<T, Extra> Drop for SlotProducerPanicGuard<T, Extra> {
258 fn drop(&mut self) {
259 if self.armed {
260 finish_slot(
261 &self.shared,
262 TerminalSignal::Error(StreamError::AbruptTermination),
263 );
264 }
265 }
266}
267
268#[derive(Default)]
269struct NoExtra;
270
271struct BatchExtra<In> {
272 remaining: i128,
273 pending: Option<In>,
274}
275
276impl<In> BatchExtra<In> {
277 fn new(limit: u64) -> Self {
278 Self {
279 remaining: i128::from(limit),
280 pending: None,
281 }
282 }
283}
284
285struct BoundaryExtra {
286 ready: bool,
287}
288
289struct BoundaryStream<Agg, Emit> {
290 shared: Arc<SlotShared<Agg, BoundaryExtra>>,
291 completion: Option<StreamCompletion<NotUsed>>,
292 timer: Option<Cancellable>,
293 harvest: Arc<dyn Fn(Agg) -> Emit + Send + Sync>,
294}
295
296impl<Agg, Emit> Iterator for BoundaryStream<Agg, Emit> {
297 type Item = StreamResult<Emit>;
298
299 fn next(&mut self) -> Option<Self::Item> {
300 loop {
301 let (slot, terminal) = {
302 let mut state = self
303 .shared
304 .state
305 .lock()
306 .unwrap_or_else(|poison| poison.into_inner());
307 loop {
308 if state.extra.ready {
309 let slot = state.slot.take();
310 state.extra.ready = false;
311 self.shared.available.notify_all();
312 break (slot, None);
313 }
314 if state.terminal.is_some() {
315 if let Some(slot) = state.slot.take() {
316 self.shared.available.notify_all();
317 break (Some(slot), None);
318 }
319 break (None, state.terminal.clone());
320 }
321 state = self
322 .shared
323 .available
324 .wait(state)
325 .unwrap_or_else(|poison| poison.into_inner());
326 }
327 };
328
329 if let Some(agg) = slot {
330 return Some(Ok((self.harvest)(agg)));
331 }
332 if let Some(terminal) = terminal {
333 return match terminal {
334 TerminalSignal::Complete => None,
335 TerminalSignal::Error(error) => Some(Err(error)),
336 };
337 }
338 }
339 }
340}
341
342impl<Agg, Emit> Drop for BoundaryStream<Agg, Emit> {
343 fn drop(&mut self) {
344 self.shared.cancelled.store(true, Ordering::SeqCst);
345 self.shared.available.notify_all();
346 if let Some(timer) = self.timer.take() {
347 timer.cancel();
348 }
349 let _ = self.completion.take();
350 }
351}
352
353fn buffer_stage<T: Send + 'static>(
354 input: BoxStream<T>,
355 capacity: usize,
356 strategy: OverflowStrategy,
357 materializer: &Materializer,
358) -> StreamResult<BoxStream<T>> {
359 let shared = QueueShared::new(capacity);
360 let producer_shared = Arc::clone(&shared);
361 let cancelled = Arc::clone(&shared.cancelled);
362 let state = Arc::clone(&materializer.inner.state);
363 let completion = materializer.spawn_stream(move |_| {
364 let mut panic_guard = ProducerPanicGuard::new(Arc::clone(&producer_shared));
365 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
366 loop {
367 if cancelled.load(Ordering::SeqCst) {
368 panic_guard.disarm();
369 return Ok(NotUsed);
370 }
371
372 match input.next() {
373 Some(Ok(item)) => {
374 let mut guard = producer_shared
375 .state
376 .lock()
377 .unwrap_or_else(|poison| poison.into_inner());
378 match strategy {
379 OverflowStrategy::Backpressure => {
380 while guard.queue.len() == producer_shared.capacity
381 && !cancelled.load(Ordering::SeqCst)
382 {
383 guard = producer_shared
384 .available
385 .wait(guard)
386 .unwrap_or_else(|poison| poison.into_inner());
387 }
388 if cancelled.load(Ordering::SeqCst) {
389 panic_guard.disarm();
390 return Ok(NotUsed);
391 }
392 guard.queue.push_back(item);
393 }
394 OverflowStrategy::DropHead => {
395 if guard.queue.len() == producer_shared.capacity {
396 let _ = guard.queue.pop_front();
397 }
398 guard.queue.push_back(item);
399 }
400 OverflowStrategy::DropTail => {
401 if guard.queue.len() == producer_shared.capacity {
402 let _ = guard.queue.pop_back();
403 }
404 guard.queue.push_back(item);
405 }
406 OverflowStrategy::DropBuffer => {
407 if guard.queue.len() == producer_shared.capacity {
408 guard.queue.clear();
409 }
410 guard.queue.push_back(item);
411 }
412 OverflowStrategy::DropNew => {
413 if guard.queue.len() < producer_shared.capacity {
414 guard.queue.push_back(item);
415 }
416 }
417 OverflowStrategy::Fail => {
418 if guard.queue.len() == producer_shared.capacity {
419 guard.queue.clear();
420 drop(guard);
421 panic_guard.disarm();
422 finish_queue(
423 &producer_shared,
424 TerminalSignal::Error(StreamError::Failed(format!(
425 "Buffer overflow (max capacity was: {capacity})!"
426 ))),
427 );
428 return Ok(NotUsed);
429 }
430 guard.queue.push_back(item);
431 }
432 }
433 drop(guard);
434 producer_shared.available.notify_all();
435 }
436 Some(Err(error)) => {
437 panic_guard.disarm();
438 finish_queue(&producer_shared, TerminalSignal::Error(error));
439 return Ok(NotUsed);
440 }
441 None => {
442 panic_guard.disarm();
443 finish_queue(&producer_shared, TerminalSignal::Complete);
444 return Ok(NotUsed);
445 }
446 }
447 }
448 });
449
450 Ok(Box::new(QueueStream {
451 shared,
452 completion: Some(completion),
453 }))
454}
455
456fn batch_stage<In, Agg, Cost, Seed, Aggregate>(
457 input: BoxStream<In>,
458 limit: u64,
459 cost_fn: Arc<Cost>,
460 seed: Arc<Seed>,
461 aggregate: Arc<Aggregate>,
462 materializer: &Materializer,
463) -> StreamResult<BoxStream<Agg>>
464where
465 In: Send + 'static,
466 Agg: Send + 'static,
467 Cost: Fn(&In) -> u64 + Send + Sync + 'static,
468 Seed: Fn(In) -> Agg + Send + Sync + 'static,
469 Aggregate: Fn(Agg, In) -> Agg + Send + Sync + 'static,
470{
471 let shared = SlotShared::new(BatchExtra::new(limit));
472 let producer_shared = Arc::clone(&shared);
473 let cancelled = Arc::clone(&shared.cancelled);
474 let state = Arc::clone(&materializer.inner.state);
475 let completion = materializer.spawn_stream(move |_| {
476 let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
477 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
478 let mut carry = None::<In>;
479
480 loop {
481 if cancelled.load(Ordering::SeqCst) {
482 panic_guard.disarm();
483 return Ok(NotUsed);
484 }
485
486 let next = if let Some(item) = carry.take() {
487 Some(Ok(item))
488 } else {
489 input.next()
490 };
491
492 match next {
493 Some(Ok(item)) => {
494 let cost = i128::from(cost_fn(&item));
495 let current = {
496 let mut guard = producer_shared
497 .state
498 .lock()
499 .unwrap_or_else(|poison| poison.into_inner());
500
501 if guard.slot.is_none() {
502 None
503 } else if guard.extra.remaining < cost {
504 guard.extra.pending = Some(item);
505 while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
506 guard = producer_shared
507 .available
508 .wait(guard)
509 .unwrap_or_else(|poison| poison.into_inner());
510 }
511 if cancelled.load(Ordering::SeqCst) {
512 panic_guard.disarm();
513 return Ok(NotUsed);
514 }
515 carry = guard.extra.pending.take();
516 guard.extra.remaining = i128::from(limit);
517 continue;
518 } else {
519 let current = guard.slot.take();
520 guard.extra.remaining -= cost;
521 current
522 }
523 };
524
525 match current {
526 None => {
527 let next_agg = seed(item);
528 let mut guard = producer_shared
529 .state
530 .lock()
531 .unwrap_or_else(|poison| poison.into_inner());
532 guard.extra.remaining = i128::from(limit) - cost;
533 guard.slot = Some(next_agg);
534 drop(guard);
535 producer_shared.available.notify_all();
536 }
537 Some(current) => {
538 let next_agg = aggregate(current, item);
539 let mut guard = producer_shared
540 .state
541 .lock()
542 .unwrap_or_else(|poison| poison.into_inner());
543 guard.slot = Some(next_agg);
544 drop(guard);
545 producer_shared.available.notify_all();
546 }
547 }
548 }
549 Some(Err(error)) => {
550 panic_guard.disarm();
551 finish_slot(&producer_shared, TerminalSignal::Error(error));
552 return Ok(NotUsed);
553 }
554 None => {
555 panic_guard.disarm();
556 finish_slot(&producer_shared, TerminalSignal::Complete);
557 return Ok(NotUsed);
558 }
559 }
560 }
561 });
562
563 Ok(Box::new(SlotStream {
564 shared,
565 completion: Some(completion),
566 }))
567}
568
569fn expand_stage<In, Out, Expand, Iter>(
570 input: BoxStream<In>,
571 expand: Arc<Expand>,
572 initial: Option<Box<dyn Iterator<Item = Out> + Send>>,
573 materializer: &Materializer,
574) -> StreamResult<BoxStream<Out>>
575where
576 In: Send + 'static,
577 Out: Send + 'static,
578 Expand: Fn(In) -> Iter + Send + Sync + 'static,
579 Iter: Iterator<Item = Out> + Send + 'static,
580{
581 let shared = SlotShared::new(NoExtra);
582 let producer_shared = Arc::clone(&shared);
583 let cancelled = Arc::clone(&shared.cancelled);
584 let state = Arc::clone(&materializer.inner.state);
585 let completion = materializer.spawn_stream(move |_| {
586 let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
587 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
588 loop {
589 if cancelled.load(Ordering::SeqCst) {
590 panic_guard.disarm();
591 return Ok(NotUsed);
592 }
593
594 match input.next() {
595 Some(Ok(item)) => {
596 let mut guard = producer_shared
597 .state
598 .lock()
599 .unwrap_or_else(|poison| poison.into_inner());
600 while guard.slot.is_some() && !cancelled.load(Ordering::SeqCst) {
601 guard = producer_shared
602 .available
603 .wait(guard)
604 .unwrap_or_else(|poison| poison.into_inner());
605 }
606 if cancelled.load(Ordering::SeqCst) {
607 panic_guard.disarm();
608 return Ok(NotUsed);
609 }
610 guard.slot = Some(item);
611 drop(guard);
612 producer_shared.available.notify_all();
613 }
614 Some(Err(error)) => {
615 panic_guard.disarm();
616 finish_slot(&producer_shared, TerminalSignal::Error(error));
617 return Ok(NotUsed);
618 }
619 None => {
620 panic_guard.disarm();
621 finish_slot(&producer_shared, TerminalSignal::Complete);
622 return Ok(NotUsed);
623 }
624 }
625 }
626 });
627
628 Ok(Box::new(ExpandStream {
629 shared,
630 completion: Some(completion),
631 current: initial,
632 expanded_once: false,
633 seeded_from_upstream: false,
634 expand,
635 }))
636}
637
638struct ExpandStream<In, Out, Expand, Iter>
639where
640 Expand: Fn(In) -> Iter + Send + Sync + 'static,
641 Iter: Iterator<Item = Out> + Send + 'static,
642{
643 shared: Arc<SlotShared<In, NoExtra>>,
644 completion: Option<StreamCompletion<NotUsed>>,
645 current: Option<Box<dyn Iterator<Item = Out> + Send>>,
646 expanded_once: bool,
647 seeded_from_upstream: bool,
651 expand: Arc<Expand>,
652}
653
654impl<In, Out, Expand, Iter> Iterator for ExpandStream<In, Out, Expand, Iter>
655where
656 In: Send + 'static,
657 Out: Send + 'static,
658 Expand: Fn(In) -> Iter + Send + Sync + 'static,
659 Iter: Iterator<Item = Out> + Send + 'static,
660{
661 type Item = StreamResult<Out>;
662
663 fn next(&mut self) -> Option<Self::Item> {
664 loop {
665 if let Some(current) = &mut self.current
671 && !self.expanded_once
672 && self.seeded_from_upstream
673 {
674 if let Some(item) = current.next() {
675 self.expanded_once = true;
676 return Some(Ok(item));
677 }
678 self.current = None;
679 self.expanded_once = false;
680 }
681
682 enum Decision<In> {
687 NewElement(In),
688 Extrapolate,
689 EmitInitial,
690 Terminal(TerminalSignal),
691 }
692
693 let decision = {
694 let mut state = self
695 .shared
696 .state
697 .lock()
698 .unwrap_or_else(|poison| poison.into_inner());
699 loop {
700 if let Some(item) = state.slot.take() {
701 self.shared.available.notify_all();
702 break Decision::NewElement(item);
703 }
704
705 if let Some(terminal) = state.terminal.clone() {
706 break Decision::Terminal(terminal);
707 }
708
709 if self.current.is_some() {
710 break if self.expanded_once {
711 Decision::Extrapolate
712 } else {
713 Decision::EmitInitial
716 };
717 }
718
719 state = self
720 .shared
721 .available
722 .wait(state)
723 .unwrap_or_else(|poison| poison.into_inner());
724 }
725 };
726
727 match decision {
728 Decision::NewElement(item) => {
729 self.current = Some(Box::new((self.expand)(item)));
730 self.expanded_once = false;
731 self.seeded_from_upstream = true;
732 }
733 Decision::EmitInitial => {
734 if let Some(current) = &mut self.current
735 && let Some(item) = current.next()
736 {
737 self.expanded_once = true;
738 return Some(Ok(item));
739 }
740 self.current = None;
741 self.expanded_once = false;
742 }
743 Decision::Extrapolate => {
744 if let Some(current) = &mut self.current
745 && let Some(item) = current.next()
746 {
747 return Some(Ok(item));
748 }
749 self.current = None;
750 self.expanded_once = false;
751 }
752 Decision::Terminal(terminal) => {
753 return match terminal {
754 TerminalSignal::Complete => None,
755 TerminalSignal::Error(error) => Some(Err(error)),
756 };
757 }
758 }
759 }
760 }
761}
762
763impl<In, Out, Expand, Iter> Drop for ExpandStream<In, Out, Expand, Iter>
764where
765 Expand: Fn(In) -> Iter + Send + Sync + 'static,
766 Iter: Iterator<Item = Out> + Send + 'static,
767{
768 fn drop(&mut self) {
769 self.shared.cancelled.store(true, Ordering::SeqCst);
770 self.shared.available.notify_all();
771 let _ = self.completion.take();
772 }
773}
774
775fn aggregate_with_boundary_stage<In, Agg, Emit, Allocate, Aggregate, Harvest>(
776 input: BoxStream<In>,
777 allocate: Arc<Allocate>,
778 aggregate: Arc<Aggregate>,
779 harvest: Arc<Harvest>,
780 emit_on_timer: Option<AggregateTimer<Agg>>,
781 materializer: &Materializer,
782) -> StreamResult<BoxStream<Emit>>
783where
784 In: Send + 'static,
785 Agg: Send + 'static,
786 Emit: Send + 'static,
787 Allocate: Fn() -> Agg + Send + Sync + 'static,
788 Aggregate: Fn(Agg, In) -> (Agg, bool) + Send + Sync + 'static,
789 Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
790{
791 let shared = SlotShared::new(BoundaryExtra { ready: false });
792 let producer_shared = Arc::clone(&shared);
793 let cancelled = Arc::clone(&shared.cancelled);
794 let state = Arc::clone(&materializer.inner.state);
795 let completion = materializer.spawn_stream(move |_| {
796 let mut panic_guard = SlotProducerPanicGuard::new(Arc::clone(&producer_shared));
797 let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
798 loop {
799 if cancelled.load(Ordering::SeqCst) {
800 panic_guard.disarm();
801 return Ok(NotUsed);
802 }
803
804 match input.next() {
805 Some(Ok(item)) => {
806 let current = {
807 let mut guard = producer_shared
808 .state
809 .lock()
810 .unwrap_or_else(|poison| poison.into_inner());
811 while guard.extra.ready && !cancelled.load(Ordering::SeqCst) {
812 guard = producer_shared
813 .available
814 .wait(guard)
815 .unwrap_or_else(|poison| poison.into_inner());
816 }
817 if cancelled.load(Ordering::SeqCst) {
818 panic_guard.disarm();
819 return Ok(NotUsed);
820 }
821 guard.slot.take()
822 };
823
824 let current = current.unwrap_or_else(|| allocate());
825 let (updated, ready) = aggregate(current, item);
826 let mut guard = producer_shared
827 .state
828 .lock()
829 .unwrap_or_else(|poison| poison.into_inner());
830 guard.slot = Some(updated);
831 guard.extra.ready = ready;
832 drop(guard);
833 producer_shared.available.notify_all();
834 }
835 Some(Err(error)) => {
836 panic_guard.disarm();
837 finish_slot(&producer_shared, TerminalSignal::Error(error));
838 return Ok(NotUsed);
839 }
840 None => {
841 panic_guard.disarm();
842 finish_slot(&producer_shared, TerminalSignal::Complete);
843 return Ok(NotUsed);
844 }
845 }
846 }
847 });
848
849 let timer = emit_on_timer.map(|timer| {
850 let shared = Arc::clone(&shared);
851 let cancelled = Arc::clone(&shared.cancelled);
852 materializer.schedule_with_fixed_delay(timer.interval, timer.interval, move || {
853 if cancelled.load(Ordering::SeqCst) {
854 return;
855 }
856 let should_emit = {
857 let state = shared
858 .state
859 .lock()
860 .unwrap_or_else(|poison| poison.into_inner());
861 if state.slot.is_none() || state.extra.ready || state.terminal.is_some() {
862 None
863 } else {
864 Some(std::panic::catch_unwind(std::panic::AssertUnwindSafe(
865 || (timer.predicate)(state.slot.as_ref().expect("aggregate present")),
866 )))
867 }
868 };
869 match should_emit {
870 Some(Ok(true)) => {
871 let mut state = shared
872 .state
873 .lock()
874 .unwrap_or_else(|poison| poison.into_inner());
875 if state.slot.is_some() && !state.extra.ready && state.terminal.is_none() {
876 state.extra.ready = true;
877 }
878 drop(state);
879 shared.available.notify_all();
880 }
881 Some(Ok(false)) | None => {}
882 Some(Err(_)) => {
883 finish_slot(
884 &shared,
885 TerminalSignal::Error(StreamError::AbruptTermination),
886 );
887 }
888 }
889 })
890 });
891
892 Ok(Box::new(BoundaryStream {
893 shared,
894 completion: Some(completion),
895 timer,
896 harvest,
897 }))
898}
899
900impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
901 pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Flow<In, Out, Mat> {
902 assert!(size > 0, "buffer size must be greater than zero");
903 self.via(Flow::from_runtime_transform(move |input, materializer| {
904 buffer_stage(input, size, strategy, materializer)
905 }))
906 }
907
908 pub fn conflate_with_seed<Agg, Seed, Aggregate>(
909 self,
910 seed: Seed,
911 aggregate: Aggregate,
912 ) -> Flow<In, Agg, Mat>
913 where
914 Agg: Send + 'static,
915 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
916 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
917 {
918 let seed = Arc::new(seed);
919 let aggregate = Arc::new(aggregate);
920 self.via(Flow::from_runtime_transform(move |input, materializer| {
921 batch_stage(
922 input,
923 1,
924 Arc::new(|_: &Out| 0),
925 Arc::clone(&seed),
926 Arc::clone(&aggregate),
927 materializer,
928 )
929 }))
930 }
931
932 pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
933 self.conflate_with_seed(|item| item, aggregate)
934 }
935
936 pub fn batch<Agg, Seed, Aggregate>(
937 self,
938 max: u64,
939 seed: Seed,
940 aggregate: Aggregate,
941 ) -> Flow<In, Agg, Mat>
942 where
943 Agg: Send + 'static,
944 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
945 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
946 {
947 assert!(max > 0, "batch max must be greater than zero");
948 let seed = Arc::new(seed);
949 let aggregate = Arc::new(aggregate);
950 self.via(Flow::from_runtime_transform(move |input, materializer| {
951 batch_stage(
952 input,
953 max,
954 Arc::new(|_: &Out| 1),
955 Arc::clone(&seed),
956 Arc::clone(&aggregate),
957 materializer,
958 )
959 }))
960 }
961
962 pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
963 self,
964 max: u64,
965 cost_fn: Cost,
966 seed: Seed,
967 aggregate: Aggregate,
968 ) -> Flow<In, Agg, Mat>
969 where
970 Agg: Send + 'static,
971 Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
972 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
973 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
974 {
975 assert!(max > 0, "batch_weighted max must be greater than zero");
976 let cost_fn = Arc::new(cost_fn);
977 let seed = Arc::new(seed);
978 let aggregate = Arc::new(aggregate);
979 self.via(Flow::from_runtime_transform(move |input, materializer| {
980 batch_stage(
981 input,
982 max,
983 Arc::clone(&cost_fn),
984 Arc::clone(&seed),
985 Arc::clone(&aggregate),
986 materializer,
987 )
988 }))
989 }
990
991 pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Flow<In, Next, Mat>
992 where
993 Next: Send + 'static,
994 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
995 Iter: Iterator<Item = Next> + Send + 'static,
996 {
997 let expand = Arc::new(expand);
998 self.via(Flow::from_runtime_transform(move |input, materializer| {
999 expand_stage(input, Arc::clone(&expand), None, materializer)
1000 }))
1001 }
1002
1003 pub fn extrapolate<Expand, Iter>(
1004 self,
1005 extrapolator: Expand,
1006 initial: Option<Out>,
1007 ) -> Flow<In, Out, Mat>
1008 where
1009 Out: Clone + Sync,
1010 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1011 Iter: Iterator<Item = Out> + Send + 'static,
1012 {
1013 let extrapolator = Arc::new(extrapolator);
1014 self.via(Flow::from_runtime_transform(move |input, materializer| {
1015 let extrapolator = Arc::clone(&extrapolator);
1016 let initial = initial.clone().map(|item| {
1017 Box::new(std::iter::once(item)) as Box<dyn Iterator<Item = Out> + Send>
1018 });
1019 expand_stage(
1020 input,
1021 Arc::new(move |item: Out| {
1022 std::iter::once(item.clone()).chain((extrapolator)(item))
1023 }),
1024 initial,
1025 materializer,
1026 )
1027 }))
1028 }
1029
1030 pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1031 self,
1032 allocate: Allocate,
1033 aggregate: Aggregate,
1034 harvest: Harvest,
1035 emit_on_timer: Option<AggregateTimer<Agg>>,
1036 ) -> Flow<In, Emit, Mat>
1037 where
1038 Agg: Send + 'static,
1039 Emit: Send + 'static,
1040 Allocate: Fn() -> Agg + Send + Sync + 'static,
1041 Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1042 Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1043 {
1044 let allocate = Arc::new(allocate);
1045 let aggregate = Arc::new(aggregate);
1046 let harvest = Arc::new(harvest);
1047 self.via(Flow::from_runtime_transform(move |input, materializer| {
1048 aggregate_with_boundary_stage(
1049 input,
1050 Arc::clone(&allocate),
1051 Arc::clone(&aggregate),
1052 Arc::clone(&harvest),
1053 emit_on_timer.clone(),
1054 materializer,
1055 )
1056 }))
1057 }
1058
1059 pub fn detach(self) -> Flow<In, Out, Mat> {
1060 self.buffer(1, OverflowStrategy::Backpressure)
1061 }
1062}
1063
1064impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1065 pub fn buffer(self, size: usize, strategy: OverflowStrategy) -> Self {
1066 self.via(Flow::identity().buffer(size, strategy))
1067 }
1068
1069 pub fn conflate_with_seed<Agg, Seed, Aggregate>(
1070 self,
1071 seed: Seed,
1072 aggregate: Aggregate,
1073 ) -> Source<Agg, Mat>
1074 where
1075 Agg: Send + 'static,
1076 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1077 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1078 {
1079 self.via(Flow::identity().conflate_with_seed(seed, aggregate))
1080 }
1081
1082 pub fn conflate(self, aggregate: impl Fn(Out, Out) -> Out + Send + Sync + 'static) -> Self {
1083 self.via(Flow::identity().conflate(aggregate))
1084 }
1085
1086 pub fn batch<Agg, Seed, Aggregate>(
1087 self,
1088 max: u64,
1089 seed: Seed,
1090 aggregate: Aggregate,
1091 ) -> Source<Agg, Mat>
1092 where
1093 Agg: Send + 'static,
1094 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1095 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1096 {
1097 self.via(Flow::identity().batch(max, seed, aggregate))
1098 }
1099
1100 pub fn batch_weighted<Agg, Cost, Seed, Aggregate>(
1101 self,
1102 max: u64,
1103 cost_fn: Cost,
1104 seed: Seed,
1105 aggregate: Aggregate,
1106 ) -> Source<Agg, Mat>
1107 where
1108 Agg: Send + 'static,
1109 Cost: Fn(&Out) -> u64 + Send + Sync + 'static,
1110 Seed: Fn(Out) -> Agg + Send + Sync + 'static,
1111 Aggregate: Fn(Agg, Out) -> Agg + Send + Sync + 'static,
1112 {
1113 self.via(Flow::identity().batch_weighted(max, cost_fn, seed, aggregate))
1114 }
1115
1116 pub fn expand<Next, Expand, Iter>(self, expand: Expand) -> Source<Next, Mat>
1117 where
1118 Next: Send + 'static,
1119 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1120 Iter: Iterator<Item = Next> + Send + 'static,
1121 {
1122 self.via(Flow::identity().expand(expand))
1123 }
1124
1125 pub fn extrapolate<Expand, Iter>(self, extrapolator: Expand, initial: Option<Out>) -> Self
1126 where
1127 Out: Clone + Sync,
1128 Expand: Fn(Out) -> Iter + Send + Sync + 'static,
1129 Iter: Iterator<Item = Out> + Send + 'static,
1130 {
1131 self.via(Flow::identity().extrapolate(extrapolator, initial))
1132 }
1133
1134 pub fn aggregate_with_boundary<Agg, Emit, Allocate, Aggregate, Harvest>(
1135 self,
1136 allocate: Allocate,
1137 aggregate: Aggregate,
1138 harvest: Harvest,
1139 emit_on_timer: Option<AggregateTimer<Agg>>,
1140 ) -> Source<Emit, Mat>
1141 where
1142 Agg: Send + 'static,
1143 Emit: Send + 'static,
1144 Allocate: Fn() -> Agg + Send + Sync + 'static,
1145 Aggregate: Fn(Agg, Out) -> (Agg, bool) + Send + Sync + 'static,
1146 Harvest: Fn(Agg) -> Emit + Send + Sync + 'static,
1147 {
1148 self.via(Flow::identity().aggregate_with_boundary(
1149 allocate,
1150 aggregate,
1151 harvest,
1152 emit_on_timer,
1153 ))
1154 }
1155
1156 pub fn detach(self) -> Self {
1157 self.via(Flow::identity().detach())
1158 }
1159}
1160
1161#[cfg(test)]
1162mod tests {
1163 use super::*;
1164 use crate::testkit::{TestSink, TestSource};
1165 use std::sync::mpsc;
1166 use std::time::Duration;
1167
1168 fn materialize_buffered_stream<T: Send + 'static>(
1169 source: Source<T>,
1170 ) -> (BoxStream<T>, Materializer) {
1171 let materializer = Materializer::new();
1172 let (stream, _) = Arc::clone(&source.factory)
1173 .create(&materializer)
1174 .expect("buffer source materializes");
1175 (stream, materializer)
1176 }
1177
1178 fn buffered_probe(
1179 strategy: OverflowStrategy,
1180 ) -> (
1181 crate::testkit::TestPublisherProbe<i32>,
1182 crate::testkit::TestSubscriberProbe<i32>,
1183 ) {
1184 TestSource::probe::<i32>()
1185 .buffer(2, strategy)
1186 .to_mat(TestSink::probe(), Keep::both)
1187 .run()
1188 .expect("buffer probe materializes")
1189 }
1190
1191 fn rate_probe<T, U>(
1192 flow: impl FnOnce(
1193 Source<T, crate::testkit::TestPublisherProbe<T>>,
1194 ) -> Source<U, crate::testkit::TestPublisherProbe<T>>,
1195 ) -> (
1196 crate::testkit::TestPublisherProbe<T>,
1197 crate::testkit::TestSubscriberProbe<U>,
1198 )
1199 where
1200 T: Send + 'static,
1201 U: Send + 'static,
1202 {
1203 flow(TestSource::probe::<T>())
1204 .to_mat(TestSink::probe(), Keep::both)
1205 .run()
1206 .expect("rate probe materializes")
1207 }
1208
1209 #[test]
1210 fn buffer_drop_head_drops_oldest_buffered_elements() {
1211 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropHead);
1212 publisher.expect_request();
1213 publisher.send_next(1);
1214 publisher.expect_request();
1215 publisher.send_next(2);
1216 publisher.expect_request();
1217 publisher.send_next(3);
1218 publisher.expect_request();
1219 publisher.send_next(4);
1220 publisher.expect_request();
1221 publisher.send_complete();
1222
1223 subscriber.request(3);
1224 assert_eq!(subscriber.expect_next(), 3);
1225 assert_eq!(subscriber.expect_next(), 4);
1226 subscriber.expect_complete();
1227 }
1228
1229 #[test]
1230 fn buffer_pulls_eagerly_before_downstream_demand() {
1231 let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1232 publisher.set_timeout(Duration::from_millis(250));
1233 subscriber.set_timeout(Duration::from_millis(250));
1234
1235 publisher.expect_request();
1236 publisher.send_next(1);
1237 publisher.expect_request();
1238 publisher.send_next(2);
1239 subscriber.expect_no_message(Duration::from_millis(250));
1240
1241 subscriber.request(2);
1242 assert_eq!(subscriber.expect_next(), 1);
1243 assert_eq!(subscriber.expect_next(), 2);
1244 }
1245
1246 #[test]
1247 fn buffer_drop_tail_drops_newest_buffered_element() {
1248 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropTail);
1249 publisher.expect_request();
1250 publisher.send_next(1);
1251 publisher.expect_request();
1252 publisher.send_next(2);
1253 publisher.expect_request();
1254 publisher.send_next(3);
1255 publisher.expect_request();
1256 publisher.send_next(4);
1257 publisher.expect_request();
1258 publisher.send_complete();
1259
1260 subscriber.request(3);
1261 assert_eq!(subscriber.expect_next(), 1);
1262 assert_eq!(subscriber.expect_next(), 4);
1263 subscriber.expect_complete();
1264 }
1265
1266 #[test]
1267 fn buffer_drop_buffer_drops_all_buffered_elements() {
1268 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropBuffer);
1269 publisher.expect_request();
1270 publisher.send_next(1);
1271 publisher.expect_request();
1272 publisher.send_next(2);
1273 publisher.expect_request();
1274 publisher.send_next(3);
1275 publisher.expect_request();
1276 publisher.send_next(4);
1277 publisher.expect_request();
1278 publisher.send_complete();
1279
1280 subscriber.request(3);
1281 assert_eq!(subscriber.expect_next(), 3);
1282 assert_eq!(subscriber.expect_next(), 4);
1283 subscriber.expect_complete();
1284 }
1285
1286 #[test]
1287 fn buffer_drop_new_drops_incoming_elements() {
1288 let (publisher, subscriber) = buffered_probe(OverflowStrategy::DropNew);
1289 publisher.expect_request();
1290 publisher.send_next(1);
1291 publisher.expect_request();
1292 publisher.send_next(2);
1293 publisher.expect_request();
1294 publisher.send_next(3);
1295 publisher.expect_request();
1296 publisher.send_next(4);
1297 publisher.expect_request();
1298 publisher.send_complete();
1299
1300 subscriber.request(3);
1301 assert_eq!(subscriber.expect_next(), 1);
1302 assert_eq!(subscriber.expect_next(), 2);
1303 subscriber.expect_complete();
1304 }
1305
1306 #[test]
1307 fn buffer_backpressure_stops_pulling_when_full() {
1308 let (mut publisher, mut subscriber) = buffered_probe(OverflowStrategy::Backpressure);
1309 publisher.expect_request();
1310 publisher.send_next(1);
1311 publisher.expect_request();
1312 publisher.send_next(2);
1313 publisher.set_timeout(Duration::from_millis(250));
1314 subscriber.set_timeout(Duration::from_millis(250));
1315 subscriber.expect_no_message(Duration::from_millis(250));
1316
1317 subscriber.request(1);
1318 assert_eq!(subscriber.expect_next(), 1);
1319 publisher.expect_request();
1320 publisher.send_next(3);
1321 publisher.send_complete();
1322
1323 subscriber.request(3);
1324 assert_eq!(subscriber.expect_next(), 2);
1325 assert_eq!(subscriber.expect_next(), 3);
1326 subscriber.expect_complete();
1327 }
1328
1329 #[test]
1330 fn buffer_fail_surfaces_overflow_error() {
1331 let (publisher, subscriber) = buffered_probe(OverflowStrategy::Fail);
1332 publisher.expect_request();
1333 publisher.send_next(1);
1334 publisher.expect_request();
1335 publisher.send_next(2);
1336 publisher.expect_request();
1337 publisher.send_next(3);
1338 publisher.expect_cancellation();
1339
1340 subscriber.request(1);
1341 assert_eq!(
1342 subscriber.expect_error(),
1343 StreamError::Failed("Buffer overflow (max capacity was: 2)!".to_owned())
1344 );
1345 }
1346
1347 #[test]
1348 fn buffer_terminal_completion_is_sticky_across_repolls() {
1349 let (mut stream, _materializer) = materialize_buffered_stream(
1350 Source::from_iter([1, 2]).buffer(2, OverflowStrategy::Backpressure),
1351 );
1352
1353 assert_eq!(stream.next(), Some(Ok(1)));
1354 assert_eq!(stream.next(), Some(Ok(2)));
1355 assert_eq!(stream.next(), None);
1356 assert_eq!(stream.next(), None);
1357 }
1358
1359 #[test]
1360 fn buffer_terminal_failure_is_sticky_across_repolls() {
1361 let (mut stream, _materializer) = materialize_buffered_stream(
1362 Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1363 .buffer(2, OverflowStrategy::Backpressure),
1364 );
1365
1366 assert_eq!(
1367 stream.next(),
1368 Some(Err(StreamError::Failed("boom".to_owned())))
1369 );
1370 assert_eq!(
1371 stream.next(),
1372 Some(Err(StreamError::Failed("boom".to_owned())))
1373 );
1374 }
1375
1376 #[test]
1377 fn buffer_surfaces_producer_panics_as_stream_failure() {
1378 let (sender, receiver) = mpsc::channel();
1379
1380 std::thread::spawn(move || {
1381 let (mut stream, _materializer) = materialize_buffered_stream(
1382 Source::from_iter([1, 2, 3])
1383 .map(|item| {
1384 if item == 2 {
1385 panic!("boom");
1386 }
1387 item
1388 })
1389 .buffer(2, OverflowStrategy::Backpressure),
1390 );
1391
1392 sender
1393 .send((stream.next(), stream.next(), stream.next()))
1394 .expect("test thread sends buffered panic results");
1395 });
1396
1397 let (first, second, third) = receiver
1398 .recv_timeout(Duration::from_secs(1))
1399 .expect("buffer panic path should not hang");
1400 assert_eq!(first, Some(Ok(1)));
1401 assert_eq!(second, Some(Err(StreamError::AbruptTermination)));
1402 assert_eq!(third, Some(Err(StreamError::AbruptTermination)));
1403 }
1404
1405 #[test]
1406 fn conflate_passes_through_without_rate_difference() {
1407 let (publisher, subscriber) =
1408 rate_probe(|source| source.conflate(|left, right| left + right));
1409
1410 for value in 1..=4 {
1411 subscriber.request(1);
1412 publisher.expect_request();
1413 publisher.send_next(value);
1414 assert_eq!(subscriber.expect_next(), value);
1415 }
1416 }
1417
1418 #[test]
1419 fn conflate_aggregates_while_downstream_is_silent() {
1420 let (publisher, subscriber) = rate_probe(|source| {
1421 source.conflate_with_seed(
1422 |item| vec![item],
1423 |mut items, item| {
1424 items.push(item);
1425 items
1426 },
1427 )
1428 });
1429
1430 for value in 1..=4 {
1431 publisher.expect_request();
1432 publisher.send_next(value);
1433 }
1434 publisher.expect_request();
1435 publisher.send_complete();
1436
1437 subscriber.request(2);
1438 assert_eq!(subscriber.expect_next(), vec![1, 2, 3, 4]);
1439 subscriber.expect_complete();
1440 }
1441
1442 #[test]
1443 fn batch_passes_through_without_rate_difference() {
1444 let (publisher, subscriber) =
1445 rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1446
1447 for value in 1..=4 {
1448 subscriber.request(1);
1449 publisher.expect_request();
1450 publisher.send_next(value);
1451 assert_eq!(subscriber.expect_next(), value);
1452 }
1453 }
1454
1455 #[test]
1456 fn batch_aggregates_while_downstream_is_silent() {
1457 let (publisher, subscriber) = rate_probe(|source| {
1458 source.batch(
1459 u64::MAX,
1460 |item| vec![item],
1461 |mut items, item| {
1462 items.insert(0, item);
1463 items
1464 },
1465 )
1466 });
1467
1468 for value in 1..=10 {
1469 publisher.expect_request();
1470 publisher.send_next(value);
1471 }
1472 publisher.expect_request();
1473 publisher.send_complete();
1474
1475 subscriber.request(1);
1476 assert_eq!(
1477 subscriber.expect_next(),
1478 vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
1479 );
1480 }
1481
1482 #[test]
1483 fn batch_weighted_keeps_heavy_elements_separate() {
1484 let (publisher, subscriber) = rate_probe(|source| {
1485 source.batch_weighted(3, |_| 4, |item| item, |left, right| left + right)
1486 });
1487
1488 publisher.expect_request();
1489 publisher.send_next(1);
1490 publisher.expect_request();
1491 publisher.send_next(2);
1492
1493 subscriber.request(1);
1494 assert_eq!(subscriber.expect_next(), 1);
1495
1496 publisher.send_next(3);
1497 publisher.send_complete();
1498
1499 subscriber.request(2);
1500 assert_eq!(subscriber.expect_next(), 2);
1501 assert_eq!(subscriber.expect_next(), 3);
1502 }
1503
1504 #[test]
1505 fn batch_backpressures_at_max_aggregate() {
1506 let (publisher, subscriber) =
1507 rate_probe(|source| source.batch(2, |item| item, |left, right| left + right));
1508
1509 publisher.expect_request();
1510 publisher.send_next(1);
1511 publisher.expect_request();
1512 publisher.send_next(2);
1513
1514 publisher.expect_request();
1515 publisher.send_next(3);
1516
1517 subscriber.expect_no_message(Duration::from_millis(200));
1519
1520 subscriber.request(1);
1522 let first = subscriber.expect_next();
1523
1524 publisher.expect_request();
1525 publisher.send_next(4);
1526 publisher.send_complete();
1527
1528 let mut all_values = vec![first];
1531 all_values.extend(subscriber.drain_until_complete());
1532 let total: i32 = all_values.iter().sum();
1533 assert_eq!(total, 10, "all elements should be accounted for");
1534 for &v in &all_values {
1535 assert!((1..=7).contains(&v), "batch value {v} out of range");
1536 }
1537 }
1538
1539 #[test]
1540 fn expand_passes_through_without_rate_difference() {
1541 let (publisher, subscriber) = rate_probe(|source| source.expand(std::iter::once::<i32>));
1542
1543 for value in 1..=4 {
1544 publisher.expect_request();
1545 publisher.send_next(value);
1546 subscriber.request(1);
1547 assert_eq!(subscriber.expect_next(), value);
1548 }
1549 }
1550
1551 #[test]
1552 fn expand_elements_while_upstream_is_silent() {
1553 let (publisher, mut subscriber) = rate_probe(|source| source.expand(std::iter::repeat));
1554 subscriber.set_timeout(Duration::from_millis(250));
1555
1556 publisher.expect_request();
1557 publisher.send_next(42);
1558
1559 subscriber.request(4);
1560 assert_eq!(subscriber.expect_next(), 42);
1561 assert_eq!(subscriber.expect_next(), 42);
1562 assert_eq!(subscriber.expect_next(), 42);
1563 assert_eq!(subscriber.expect_next(), 42);
1564
1565 publisher.expect_request();
1566 publisher.send_next(-42);
1567 subscriber.expect_no_message(Duration::from_millis(250));
1568 subscriber.request(1);
1569 assert_eq!(subscriber.expect_next(), -42);
1570 }
1571
1572 #[test]
1573 fn expand_does_not_drop_last_element() {
1574 let (mut stream, _materializer) =
1575 materialize_buffered_stream(Source::from_iter([1, 2]).expand(std::iter::once::<i32>));
1576
1577 assert_eq!(stream.next(), Some(Ok(1)));
1578 assert_eq!(stream.next(), Some(Ok(2)));
1579 assert_eq!(stream.next(), None);
1580 }
1581
1582 #[test]
1583 fn expand_handles_finite_extrapolations() {
1584 let (mut stream, _materializer) = materialize_buffered_stream(
1585 Source::from_iter([1, 2]).expand(|item| (0..3).map(move |index| (item, index))),
1586 );
1587
1588 let mut output = Vec::new();
1589 for item in stream.by_ref() {
1590 output.push(item.expect("expand should not fail"));
1591 }
1592
1593 assert!(
1594 !output.is_empty(),
1595 "expand must emit at least the first real element"
1596 );
1597 assert_eq!(
1598 output
1599 .iter()
1600 .filter_map(|(item, index)| (*index == 0).then_some(*item))
1601 .collect::<Vec<_>>(),
1602 vec![1, 2],
1603 "each real upstream element must emit its first expanded value exactly once"
1604 );
1605
1606 let mut current_item = output[0].0;
1607 let mut expected_index = 0;
1608 for &(item, index) in &output {
1609 assert!(
1610 item == current_item || item == current_item + 1,
1611 "expanded output must stay on the current item or advance to the next real item"
1612 );
1613 if item != current_item {
1614 assert_eq!(
1615 item,
1616 current_item + 1,
1617 "real items must remain in upstream order"
1618 );
1619 assert_eq!(
1620 index, 0,
1621 "a new real item must begin with its first expanded value"
1622 );
1623 current_item = item;
1624 expected_index = 0;
1625 }
1626 assert_eq!(
1627 index, expected_index,
1628 "each real item may be followed only by its own ordered extrapolations"
1629 );
1630 expected_index += 1;
1631 }
1632
1633 assert_eq!(stream.next(), None);
1634 assert_eq!(stream.next(), None);
1635 }
1636
1637 #[test]
1638 fn expand_emits_first_value_even_if_terminal_is_already_visible() {
1639 let shared = SlotShared::new(NoExtra);
1640 {
1641 let mut state = shared
1642 .state
1643 .lock()
1644 .unwrap_or_else(|poison| poison.into_inner());
1645 state.slot = Some(42);
1646 state.terminal = Some(TerminalSignal::Complete);
1647 }
1648
1649 let mut stream = ExpandStream {
1650 shared,
1651 completion: None,
1652 current: None,
1653 expanded_once: false,
1654 seeded_from_upstream: false,
1655 expand: Arc::new(|item| (0..3).map(move |index| (item, index))),
1656 };
1657
1658 assert_eq!(stream.next(), Some(Ok((42, 0))));
1659 assert_eq!(stream.next(), None);
1660 }
1661
1662 #[test]
1663 fn extrapolate_initial_yields_to_real_element_already_in_slot() {
1664 let shared = SlotShared::new(NoExtra);
1665 {
1666 let mut state = shared
1667 .state
1668 .lock()
1669 .unwrap_or_else(|poison| poison.into_inner());
1670 state.slot = Some(7);
1671 }
1672
1673 let mut stream = ExpandStream {
1674 shared: Arc::clone(&shared),
1675 completion: None,
1676 current: Some(Box::new(std::iter::repeat(99))),
1679 expanded_once: false,
1680 seeded_from_upstream: false,
1681 expand: Arc::new(std::iter::repeat),
1682 };
1683
1684 assert_eq!(stream.next(), Some(Ok(7)));
1687
1688 finish_slot(&shared, TerminalSignal::Complete);
1689 assert_eq!(stream.next(), None);
1690 assert_eq!(stream.next(), None);
1691 }
1692
1693 #[test]
1694 fn extrapolate_preserves_original_before_filling_gaps() {
1695 let (publisher, subscriber) =
1696 rate_probe(|source| source.extrapolate(|item| std::iter::once(item + 100), None));
1697
1698 publisher.expect_request();
1699 publisher.send_next(1);
1700
1701 subscriber.request(2);
1702 assert_eq!(subscriber.expect_next(), 1);
1703 assert_eq!(subscriber.expect_next(), 101);
1704 }
1705
1706 #[test]
1707 fn extrapolate_emits_initial_element_before_upstream_arrives() {
1708 let (publisher, subscriber) =
1709 rate_probe(|source| source.extrapolate(std::iter::repeat, Some(0)));
1710
1711 subscriber.request(1);
1712 assert_eq!(subscriber.expect_next(), 0);
1713
1714 publisher.expect_request();
1715 publisher.send_next(42);
1716 subscriber.request(3);
1717 assert_eq!(subscriber.expect_next(), 42);
1718 assert_eq!(subscriber.expect_next(), 42);
1719 assert_eq!(subscriber.expect_next(), 42);
1720 }
1721
1722 #[test]
1723 fn aggregate_with_boundary_splits_by_size() {
1724 let result = Source::from_iter(1..=7)
1725 .aggregate_with_boundary(
1726 Vec::<i32>::new,
1727 |mut buffer, item| {
1728 buffer.push(item);
1729 let ready = buffer.len() >= 3;
1730 (buffer, ready)
1731 },
1732 |buffer| buffer,
1733 None,
1734 )
1735 .run_collect()
1736 .unwrap();
1737
1738 assert_eq!(result, vec![vec![1, 2, 3], vec![4, 5, 6], vec![7]]);
1739 }
1740
1741 #[test]
1742 fn aggregate_with_boundary_honors_timer_trigger() {
1743 let (publisher, mut subscriber) = rate_probe(|source| {
1744 source.aggregate_with_boundary(
1745 Vec::<i32>::new,
1746 |mut buffer, item| {
1747 buffer.push(item);
1748 (buffer, false)
1749 },
1750 |buffer| buffer,
1751 Some(AggregateTimer::new(
1752 |buffer: &Vec<i32>| !buffer.is_empty(),
1753 Duration::from_millis(10),
1754 )),
1755 )
1756 });
1757 subscriber.set_timeout(Duration::from_millis(200));
1758
1759 publisher.expect_request();
1760 publisher.send_next(1);
1761 publisher.expect_request();
1762 publisher.send_next(2);
1763
1764 subscriber.request(1);
1765 assert_eq!(subscriber.expect_next(), vec![1, 2]);
1766 }
1767
1768 #[test]
1769 fn detach_passes_through_all_elements() {
1770 assert_eq!(
1771 Source::from_iter(1..=100).detach().run_collect().unwrap(),
1772 (1..=100).collect::<Vec<_>>()
1773 );
1774 }
1775
1776 #[test]
1777 fn detach_passes_through_failure() {
1778 let result = Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
1779 .detach()
1780 .run_collect();
1781 assert_eq!(result, Err(StreamError::Failed("boom".to_owned())));
1782 }
1783
1784 #[test]
1785 fn detach_emits_last_element_when_completed_without_demand() {
1786 let (mut stream, _materializer) = materialize_buffered_stream(Source::single(42).detach());
1787
1788 assert_eq!(stream.next(), Some(Ok(42)));
1789 assert_eq!(stream.next(), None);
1790 }
1791}