1use std::{
13 collections::{BTreeMap, HashMap, VecDeque},
14 fmt,
15 future::Future,
16 hash::Hash,
17 marker::PhantomData,
18 panic::{AssertUnwindSafe, catch_unwind},
19 pin::Pin,
20 sync::{
21 Arc, Condvar, Mutex, OnceLock,
22 atomic::{AtomicBool, AtomicUsize, Ordering},
23 },
24 task::{Context, Poll},
25 thread,
26 time::Duration,
27};
28
29use futures::{channel::oneshot, executor::block_on};
30use thiserror::Error;
31use tokio::{
32 runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
33 task::{JoinError, JoinHandle},
34};
35
36pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
37pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
38pub(crate) type RuntimeTransform<In, Out> =
39 Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
40type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
41type HintedSinkRunner<In, Mat> =
42 dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
43type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
44const STREAM_READY_SPINS: usize = 256;
45const STREAM_SPIN_BACKOFF: usize = 8;
50const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
51
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
56struct InlineMicroSourceHint {
57 max_success_items: usize,
58}
59
60#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
61struct SourceHints {
62 inline_head_terminal: bool,
63 inline_micro: Option<InlineMicroSourceHint>,
66 terminal_consumer_batch: bool,
70}
71
72#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
73pub(crate) struct SourceRuntimeHints {
74 pub(crate) inline_micro_max_success_items: Option<usize>,
75 pub(crate) terminal_consumer_batch: bool,
76}
77
78impl SourceHints {
79 const fn with_inline_micro(max_success_items: usize) -> Self {
80 Self {
81 inline_head_terminal: true,
82 inline_micro: Some(InlineMicroSourceHint { max_success_items }),
83 terminal_consumer_batch: true,
84 }
85 }
86
87 const fn with_terminal_consumer_batch() -> Self {
88 Self {
89 inline_head_terminal: false,
90 inline_micro: None,
91 terminal_consumer_batch: true,
92 }
93 }
94
95 fn after_flow(self, flow: FlowHints) -> Self {
96 if flow.preserves_inline_head_terminal {
97 Self {
100 inline_head_terminal: true,
101 inline_micro: None,
102 terminal_consumer_batch: self.terminal_consumer_batch
103 && flow.preserves_terminal_consumer_batch,
104 }
105 } else {
106 Self {
107 inline_head_terminal: false,
108 inline_micro: None,
109 terminal_consumer_batch: self.terminal_consumer_batch
110 && flow.preserves_terminal_consumer_batch,
111 }
112 }
113 }
114
115 fn without_inline_micro(self) -> Self {
116 Self {
117 inline_head_terminal: self.inline_head_terminal,
118 inline_micro: None,
119 terminal_consumer_batch: self.terminal_consumer_batch,
120 }
121 }
122
123 fn runtime(self) -> SourceRuntimeHints {
124 SourceRuntimeHints {
125 inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
126 terminal_consumer_batch: self.terminal_consumer_batch,
127 }
128 }
129}
130
131#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
132struct FlowHints {
133 preserves_inline_head_terminal: bool,
134 preserves_terminal_consumer_batch: bool,
135}
136
137impl FlowHints {
138 const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
139 preserves_inline_head_terminal: true,
140 preserves_terminal_consumer_batch: true,
141 };
142
143 const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
144 preserves_inline_head_terminal: false,
145 preserves_terminal_consumer_batch: true,
146 };
147
148 fn then(self, next: Self) -> Self {
149 Self {
150 preserves_inline_head_terminal: self.preserves_inline_head_terminal
151 && next.preserves_inline_head_terminal,
152 preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
153 && next.preserves_terminal_consumer_batch,
154 }
155 }
156}
157
158struct PartitionSlot<Key, Out> {
159 key: Option<Key>,
160 active: usize,
161 queued: VecDeque<(usize, Out)>,
162 in_ready_queue: bool,
163}
164
165struct AbortOnDropHandle<T> {
166 handle: JoinHandle<T>,
167}
168
169impl<T> AbortOnDropHandle<T> {
170 fn new(handle: JoinHandle<T>) -> Self {
171 Self { handle }
172 }
173}
174
175impl<T> Drop for AbortOnDropHandle<T> {
176 fn drop(&mut self) {
177 self.handle.abort();
178 }
179}
180
181impl<T> Future for AbortOnDropHandle<T> {
182 type Output = Result<T, JoinError>;
183
184 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185 Pin::new(&mut self.handle).poll(cx)
186 }
187}
188
189impl<T> Unpin for AbortOnDropHandle<T> {}
190
191pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
192 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
193 RUNTIME.get_or_init(|| {
194 TokioRuntimeBuilder::new_multi_thread()
195 .enable_all()
196 .thread_name("datum-stream-tokio")
197 .build()
198 .expect("stream tokio runtime")
199 })
200}
201
202fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
203where
204 Fut: Future<Output = T> + Send + 'static,
205 T: Send + 'static,
206{
207 AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
208}
209
210pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
211 runtime::current_stream_cancelled()
212}
213
214pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
215where
216 F: FnOnce() -> T,
217{
218 catch_unwind(AssertUnwindSafe(f))
219 .map_err(|_| StreamError::Failed(format!("{context} panicked")))
220}
221
222impl<Key, Out> PartitionSlot<Key, Out> {
223 fn new(key: Key) -> Self {
224 Self {
225 key: Some(key),
226 active: 0,
227 queued: VecDeque::new(),
228 in_ready_queue: false,
229 }
230 }
231}
232
233#[inline(always)]
234fn partition_slot_for<Key, Out>(
235 key: Key,
236 slots_by_key: &mut HashMap<Key, usize>,
237 slots: &mut Vec<PartitionSlot<Key, Out>>,
238 free_slots: &mut Vec<usize>,
239) -> usize
240where
241 Key: Clone + Eq + Hash,
242{
243 if let Some(slot) = slots_by_key.get(&key) {
244 return *slot;
245 }
246
247 let slot = if let Some(slot) = free_slots.pop() {
248 let state = &mut slots[slot];
249 state.key = Some(key.clone());
250 state.active = 0;
251 state.queued.clear();
252 state.in_ready_queue = false;
253 slot
254 } else {
255 slots.push(PartitionSlot::new(key.clone()));
256 slots.len() - 1
257 };
258 slots_by_key.insert(key, slot);
259 slot
260}
261
262#[inline(always)]
263fn retire_partition_slot<Key, Out>(
264 slot: usize,
265 slots_by_key: &mut HashMap<Key, usize>,
266 slots: &mut [PartitionSlot<Key, Out>],
267 free_slots: &mut Vec<usize>,
268) where
269 Key: Eq + Hash,
270{
271 let state = &mut slots[slot];
272 if let Some(key) = state.key.take() {
273 slots_by_key.remove(&key);
274 }
275 state.active = 0;
276 state.queued.clear();
277 state.in_ready_queue = false;
278 free_slots.push(slot);
279}
280
281#[inline(always)]
282fn ready_partition_slot<Key, Out>(
283 slots: &mut [PartitionSlot<Key, Out>],
284 ready_slots: &mut VecDeque<usize>,
285 slot: usize,
286 per_partition: usize,
287) {
288 if let Some(state) = slots.get_mut(slot)
289 && state.key.is_some()
290 && !state.in_ready_queue
291 && state.active < per_partition
292 && !state.queued.is_empty()
293 {
294 state.in_ready_queue = true;
295 ready_slots.push_back(slot);
296 }
297}
298
299#[inline(always)]
300fn pop_ready_partition_slot<Key, Out>(
301 slots: &mut [PartitionSlot<Key, Out>],
302 ready_slots: &mut VecDeque<usize>,
303 per_partition: usize,
304) -> Option<(usize, usize, Out)> {
305 while let Some(slot) = ready_slots.pop_front() {
306 let mut requeue = false;
307 let item = if let Some(state) = slots.get_mut(slot) {
308 state.in_ready_queue = false;
309 if state.key.is_some() && state.active < per_partition {
310 let item = state.queued.pop_front().map(|(index, item)| {
311 state.active += 1;
312 (index, slot, item)
313 });
314 if !state.queued.is_empty() && state.active < per_partition {
315 state.in_ready_queue = true;
316 requeue = true;
317 }
318 item
319 } else {
320 None
321 }
322 } else {
323 None
324 };
325
326 if requeue {
327 ready_slots.push_back(slot);
328 }
329 if item.is_some() {
330 return item;
331 }
332 }
333 None
334}
335
336pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
337 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
338}
339
340struct FnSourceFactory<F>(F);
341
342impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
343where
344 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
345{
346 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
347 (self.0)(materializer)
348 }
349}
350
351struct MapSourceFactory<In, Out, Mat, F> {
352 source: Arc<dyn SourceFactory<In, Mat>>,
353 stage: F,
354 _marker: PhantomData<fn(In) -> Out>,
355}
356
357impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
358where
359 In: Send + 'static,
360 Out: Send + 'static,
361 Mat: Send + 'static,
362 F: Fn(In) -> Out + Send + Sync + 'static,
363{
364 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
365 let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
366 Ok((
367 Box::new(MapSourceStream {
368 input: stream,
369 factory: self,
370 }),
371 mat,
372 ))
373 }
374}
375
376struct MapSourceStream<In, Out, Mat, F> {
377 input: BoxStream<In>,
378 factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
379}
380
381impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
382where
383 F: Fn(In) -> Out,
384{
385 type Item = StreamResult<Out>;
386
387 fn next(&mut self) -> Option<Self::Item> {
388 self.input
389 .next()
390 .map(|item| item.map(|item| (self.factory.stage)(item)))
391 }
392}
393
394fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
395where
396 Out: Send + 'static,
397{
398 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
399 let mut current = 0usize;
400 Box::new(std::iter::from_fn(move || {
401 loop {
402 let index = next_active_optional_stream(&streams, current, |_| true)?;
403 current = (index + 1) % streams.len().max(1);
404 let Some(stream) = streams[index].as_mut() else {
405 continue;
406 };
407 match stream.next() {
408 Some(item) => return Some(item),
409 None => {
410 streams[index] = None;
411 if eager_complete {
412 return None;
413 }
414 }
415 }
416 }
417 }))
418}
419
420fn merge_prioritized_streams<Out>(
421 streams: Vec<BoxStream<Out>>,
422 priorities: Vec<usize>,
423 eager_complete: bool,
424) -> BoxStream<Out>
425where
426 Out: Send + 'static,
427{
428 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
429 let schedule: Vec<usize> = priorities
430 .into_iter()
431 .enumerate()
432 .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
433 .collect();
434 let mut schedule_index = 0usize;
435 Box::new(std::iter::from_fn(move || {
436 loop {
437 if streams.iter().all(Option::is_none) {
438 return None;
439 }
440 let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
441 let Some(stream) = streams[index].as_mut() else {
442 continue;
443 };
444 match stream.next() {
445 Some(item) => return Some(item),
446 None => {
447 streams[index] = None;
448 if eager_complete {
449 return None;
450 }
451 }
452 }
453 }
454 }))
455}
456
457fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
458where
459 Out: Ord + Send + 'static,
460{
461 let mut left_next: Option<Out> = None;
462 let mut right_next: Option<Out> = None;
463 let mut left_done = false;
464 let mut right_done = false;
465 Box::new(std::iter::from_fn(move || {
466 loop {
467 if left_next.is_none() && !left_done {
468 match left.next() {
469 Some(Ok(item)) => left_next = Some(item),
470 Some(Err(error)) => return Some(Err(error)),
471 None => left_done = true,
472 }
473 }
474 if right_next.is_none() && !right_done {
475 match right.next() {
476 Some(Ok(item)) => right_next = Some(item),
477 Some(Err(error)) => return Some(Err(error)),
478 None => right_done = true,
479 }
480 }
481
482 let next = match (&left_next, &right_next) {
483 (Some(left_item), Some(right_item)) => {
484 if left_item <= right_item {
485 left_next.take()
486 } else {
487 right_next.take()
488 }
489 }
490 (Some(_), None) if right_done => left_next.take(),
491 (None, Some(_)) if left_done => right_next.take(),
492 (None, None) if left_done && right_done => return None,
493 _ => continue,
494 };
495 if let Some(item) = next {
496 return Some(Ok(item));
497 }
498 }
499 }))
500}
501
502fn merge_latest_streams<Out>(
503 streams: Vec<BoxStream<Out>>,
504 eager_complete: bool,
505) -> BoxStream<Vec<Out>>
506where
507 Out: Clone + Send + 'static,
508{
509 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
510 let mut latest = vec![None; streams.len()];
511 let mut seen = 0usize;
512 let mut current = 0usize;
513 let mut pending = VecDeque::<Vec<Out>>::new();
514 Box::new(std::iter::from_fn(move || {
515 loop {
516 if let Some(output) = pending.pop_front() {
517 return Some(Ok(output));
518 }
519 if streams.iter().all(Option::is_none) {
520 return None;
521 }
522 let index = next_active_optional_stream(&streams, current, |_| true)?;
523 current = (index + 1) % streams.len().max(1);
524 let Some(stream) = streams[index].as_mut() else {
525 continue;
526 };
527 match stream.next() {
528 Some(Ok(item)) => {
529 if latest[index].is_none() {
530 seen += 1;
531 }
532 latest[index] = Some(item);
533 if seen == latest.len() {
534 pending.push_back(
535 latest
536 .iter()
537 .map(|item| item.clone().expect("merge-latest initialized"))
538 .collect(),
539 );
540 }
541 }
542 Some(Err(error)) => return Some(Err(error)),
543 None => {
544 streams[index] = None;
545 if eager_complete {
546 return None;
547 }
548 }
549 }
550 }
551 }))
552}
553
554fn zip_streams<Left, Right>(
555 mut left: BoxStream<Left>,
556 mut right: BoxStream<Right>,
557) -> BoxStream<(Left, Right)>
558where
559 Left: Send + 'static,
560 Right: Send + 'static,
561{
562 let mut left_next: Option<Left> = None;
563 let mut right_next: Option<Right> = None;
564 let mut left_done = false;
565 let mut right_done = false;
566 Box::new(std::iter::from_fn(move || {
567 loop {
568 if left_next.is_none() && !left_done {
569 match left.next() {
570 Some(Ok(item)) => left_next = Some(item),
571 Some(Err(error)) => return Some(Err(error)),
572 None => left_done = true,
573 }
574 }
575 if right_next.is_none() && !right_done {
576 match right.next() {
577 Some(Ok(item)) => right_next = Some(item),
578 Some(Err(error)) => return Some(Err(error)),
579 None => right_done = true,
580 }
581 }
582 match (left_next.take(), right_next.take()) {
583 (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
584 (left_item, right_item) => {
585 left_next = left_item;
586 right_next = right_item;
587 if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
588 return None;
589 }
590 }
591 }
592 }
593 }))
594}
595
596fn zip_latest_with_stream<Left, Right, Out, F>(
597 mut left: BoxStream<Left>,
598 mut right: BoxStream<Right>,
599 eager_complete: bool,
600 combine: Arc<F>,
601) -> BoxStream<Out>
602where
603 Left: Clone + Send + 'static,
604 Right: Clone + Send + 'static,
605 Out: Send + 'static,
606 F: Fn(Left, Right) -> Out + Send + Sync + 'static,
607{
608 let mut left_latest: Option<Left> = None;
609 let mut right_latest: Option<Right> = None;
610 let mut left_done = false;
611 let mut right_done = false;
612 let mut turn_left = true;
613 let mut pending = VecDeque::<Out>::new();
614
615 Box::new(std::iter::from_fn(move || {
616 loop {
617 if let Some(output) = pending.pop_front() {
618 return Some(Ok(output));
619 }
620 if eager_complete && (left_done || right_done) {
621 return None;
622 }
623 if left_done && right_done {
624 return None;
625 }
626 if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
630 return None;
631 }
632
633 let pull_left = if left_done {
634 false
635 } else if right_done {
636 true
637 } else {
638 let value = turn_left;
639 turn_left = !turn_left;
640 value
641 };
642
643 if pull_left {
644 match left.next() {
645 Some(Ok(item)) => {
646 left_latest = Some(item);
647 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
648 pending.push_back(combine(left_item.clone(), right_item.clone()));
649 }
650 }
651 Some(Err(error)) => return Some(Err(error)),
652 None => {
653 left_done = true;
654 if eager_complete {
655 return None;
656 }
657 }
658 }
659 } else {
660 match right.next() {
661 Some(Ok(item)) => {
662 right_latest = Some(item);
663 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
664 pending.push_back(combine(left_item.clone(), right_item.clone()));
665 }
666 }
667 Some(Err(error)) => return Some(Err(error)),
668 None => {
669 right_done = true;
670 if eager_complete {
671 return None;
672 }
673 }
674 }
675 }
676 }
677 }))
678}
679
680fn zip_all_stream<Left, Right>(
681 mut left: BoxStream<Left>,
682 mut right: BoxStream<Right>,
683 left_fill: Left,
684 right_fill: Right,
685) -> BoxStream<(Left, Right)>
686where
687 Left: Clone + Send + 'static,
688 Right: Clone + Send + 'static,
689{
690 let mut left_done = false;
691 let mut right_done = false;
692 Box::new(std::iter::from_fn(move || {
693 if left_done && right_done {
694 return None;
695 }
696
697 let left_item = if left_done {
698 None
699 } else {
700 match left.next() {
701 Some(Ok(item)) => Some(item),
702 Some(Err(error)) => return Some(Err(error)),
703 None => {
704 left_done = true;
705 None
706 }
707 }
708 };
709 let right_item = if right_done {
710 None
711 } else {
712 match right.next() {
713 Some(Ok(item)) => Some(item),
714 Some(Err(error)) => return Some(Err(error)),
715 None => {
716 right_done = true;
717 None
718 }
719 }
720 };
721
722 match (left_item, right_item) {
723 (None, None) if left_done && right_done => None,
724 (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
725 (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
726 (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
727 (None, None) => None,
728 }
729 }))
730}
731
732fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
733where
734 Out: Send + 'static,
735 Next: Send + 'static,
736 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
737{
738 let count = streams.len();
739 if count == 0 {
740 return Box::new(std::iter::empty());
741 }
742 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
743 let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
744 let mut current = 0usize;
745 Box::new(std::iter::from_fn(move || {
746 loop {
747 if slots.iter().all(Option::is_some) {
748 let values = slots
749 .iter_mut()
750 .map(|slot| slot.take().expect("zip-n slot filled"))
751 .collect();
752 return Some(Ok(zipper(values)));
753 }
754
755 let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
756 current = (index + 1) % count.max(1);
757 let Some(stream) = streams[index].as_mut() else {
758 continue;
759 };
760 match stream.next() {
761 Some(Ok(item)) => slots[index] = Some(item),
762 Some(Err(error)) => return Some(Err(error)),
763 None => {
764 streams[index] = None;
765 slots[index].as_ref()?;
766 }
767 }
768 }
769 }))
770}
771
772fn next_active_optional_stream<T, F>(
773 streams: &[Option<BoxStream<T>>],
774 current: usize,
775 predicate: F,
776) -> Option<usize>
777where
778 T: Send + 'static,
779 F: Fn(usize) -> bool,
780{
781 if streams.is_empty() {
782 return None;
783 }
784 for offset in 0..streams.len() {
785 let index = (current + offset) % streams.len();
786 if streams[index].is_some() && predicate(index) {
787 return Some(index);
788 }
789 }
790 None
791}
792
793fn next_weighted_stream<T>(
794 streams: &[Option<BoxStream<T>>],
795 schedule: &[usize],
796 schedule_index: &mut usize,
797) -> Option<usize>
798where
799 T: Send + 'static,
800{
801 if streams.is_empty() || schedule.is_empty() {
802 return None;
803 }
804 for _ in 0..schedule.len() {
805 let index = schedule[*schedule_index % schedule.len()];
806 *schedule_index = (*schedule_index + 1) % schedule.len();
807 if streams.get(index).is_some_and(Option::is_some) {
808 return Some(index);
809 }
810 }
811 None
812}
813
814pub(crate) mod async_boundary;
815mod completion;
816mod error;
817mod flow;
818mod rate;
819mod restart;
820mod runtime;
821mod sink;
822mod source;
823mod time;
824mod timer;
825
826pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
830 fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
831}
832
833pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
839 fn drain_terminal_batch(
840 &self,
841 materializer: &Materializer,
842 cancelled: &Arc<AtomicBool>,
843 batch: &mut Vec<In>,
844 ) -> StreamResult<TerminalSourceStatus>;
845
846 fn supports_direct_terminal(&self) -> bool {
847 false
848 }
849
850 fn try_register_direct_terminal(
851 &self,
852 _consumer: Box<dyn TerminalSinkConsumerDyn<In>>,
853 _cancelled: Arc<AtomicBool>,
854 ) -> Option<StreamResult<()>> {
855 None
856 }
857
858 fn cancel_terminal(&self) {}
859}
860
861#[derive(Clone, Copy, Debug, PartialEq, Eq)]
862pub(crate) enum TerminalSourceStatus {
863 Active,
864 Completed,
865}
866
867pub(crate) trait TerminalSinkConsumerDyn<In>: Send + 'static {
868 fn on_item(&mut self, item: In) -> StreamResult<()>;
869 fn finish(self: Box<Self>, result: StreamResult<()>);
870}
871
872pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
875 fn try_register(
879 &self,
880 hook: Arc<dyn SplitSegmentHookDyn>,
881 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
882
883 fn supports_terminal_drain(&self) -> bool {
884 false
885 }
886
887 fn try_register_direct_terminal(
888 &self,
889 _hook: Arc<dyn TerminalSourceHookDyn<In>>,
890 _materializer: &Materializer,
891 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
892 None
893 }
894
895 fn try_register_terminal_drain(
896 &self,
897 _hook: Arc<dyn TerminalSourceHookDyn<In>>,
898 _materializer: &Materializer,
899 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
900 None
901 }
902}
903
904use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
905
906pub(crate) use self::completion::StreamCancellation;
907
908pub use self::{
909 completion::{Cancellable, StreamCompletion},
910 error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
911 flow::{BidiFlow, Flow},
912 rate::{AggregateTimer, OverflowStrategy},
913 restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
914 runtime::{Materializer, Runtime},
915 sink::{RunnableGraph, Sink, SinkCombineStrategy},
916 source::{
917 Demand, IntoSource, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy,
918 },
919 time::{DelayOverflowStrategy, ThrottleMode},
920};
921
922#[cfg(test)]
923mod tests {
924 use super::*;
925 use crate::Attributes;
926 use crate::testkit::TestSink;
927 use std::fs;
928 use std::sync::{
929 Arc as StdArc,
930 atomic::{
931 AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
932 },
933 mpsc,
934 };
935 use std::time::Duration as StdDuration;
936 use std::time::Instant;
937
938 fn wait<T>(completion: StreamCompletion<T>) -> T {
939 completion.wait().unwrap()
940 }
941
942 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
943 let deadline = Instant::now() + timeout;
944 while Instant::now() < deadline {
945 if condition() {
946 return true;
947 }
948 thread::sleep(Duration::from_millis(2));
949 }
950 condition()
951 }
952
953 fn linux_thread_count(thread_name: &str) -> usize {
954 fs::read_dir("/proc/self/task")
955 .expect("task directory readable")
956 .filter_map(Result::ok)
957 .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
958 .filter(|name| name.trim() == thread_name)
959 .count()
960 }
961
962 #[test]
963 fn source_run_terminal_shortcuts_match_explicit_sinks() {
964 let explicit_fold: StreamResult<StreamCompletion<u64>> =
965 Source::from_iter(1_u64..=4).run_with(Sink::fold(0_u64, |acc, item| acc + item));
966 let sugared_fold: StreamResult<StreamCompletion<u64>> =
967 Source::from_iter(1_u64..=4).run_fold(0_u64, |acc, item| acc + item);
968 assert_eq!(wait(explicit_fold.unwrap()), 10);
969 assert_eq!(wait(sugared_fold.unwrap()), 10);
970
971 let explicit_reduce: StreamResult<StreamCompletion<u64>> =
972 Source::from_iter(1_u64..=4).run_with(Sink::reduce(|left: u64, right| left + right));
973 let sugared_reduce: StreamResult<StreamCompletion<u64>> =
974 Source::from_iter(1_u64..=4).run_reduce(|left, right| left + right);
975 assert_eq!(wait(explicit_reduce.unwrap()), 10);
976 assert_eq!(wait(sugared_reduce.unwrap()), 10);
977
978 let explicit_empty_reduce = Source::<u64>::empty()
979 .run_with(Sink::reduce(|left: u64, right| left + right))
980 .unwrap()
981 .wait();
982 let sugared_empty_reduce = Source::<u64>::empty()
983 .run_reduce(|left, right| left + right)
984 .unwrap()
985 .wait();
986 assert_eq!(sugared_empty_reduce, explicit_empty_reduce);
987 assert_eq!(sugared_empty_reduce, Err(StreamError::EmptyStream));
988
989 let explicit_sum = StdArc::new(StdAtomicUsize::new(0));
990 let explicit_sum_sink = StdArc::clone(&explicit_sum);
991 let explicit_foreach: StreamResult<StreamCompletion<NotUsed>> =
992 Source::from_iter(1_usize..=4).run_with(Sink::foreach(move |item| {
993 explicit_sum_sink.fetch_add(item, StdOrdering::SeqCst);
994 }));
995 assert_eq!(wait(explicit_foreach.unwrap()), NotUsed);
996
997 let sugared_sum = StdArc::new(StdAtomicUsize::new(0));
998 let sugared_sum_sink = StdArc::clone(&sugared_sum);
999 let sugared_foreach: StreamResult<StreamCompletion<NotUsed>> =
1000 Source::from_iter(1_usize..=4).run_foreach(move |item| {
1001 sugared_sum_sink.fetch_add(item, StdOrdering::SeqCst);
1002 });
1003 assert_eq!(wait(sugared_foreach.unwrap()), NotUsed);
1004
1005 let alias_sum = StdArc::new(StdAtomicUsize::new(0));
1006 let alias_sum_sink = StdArc::clone(&alias_sum);
1007 let alias_for_each: StreamResult<StreamCompletion<NotUsed>> =
1008 Source::from_iter(1_usize..=4).run_for_each(move |item| {
1009 alias_sum_sink.fetch_add(item, StdOrdering::SeqCst);
1010 });
1011 assert_eq!(wait(alias_for_each.unwrap()), NotUsed);
1012
1013 assert_eq!(explicit_sum.load(StdOrdering::SeqCst), 10);
1014 assert_eq!(sugared_sum.load(StdOrdering::SeqCst), 10);
1015 assert_eq!(alias_sum.load(StdOrdering::SeqCst), 10);
1016 }
1017
1018 #[test]
1019 fn source_constructor_sugar_matches_from_iter() {
1020 let expected = Source::from_iter(1_u64..=4).run_collect().unwrap();
1021
1022 assert_eq!(
1023 Source::from(vec![1_u64, 2, 3, 4]).run_collect().unwrap(),
1024 expected
1025 );
1026 assert_eq!(
1027 Source::from([1_u64, 2, 3, 4]).run_collect().unwrap(),
1028 expected
1029 );
1030 assert_eq!((1_u64..=4).into_source().run_collect().unwrap(), expected);
1031 }
1032
1033 #[test]
1034 fn source_constructor_sugar_keeps_existing_inference_paths() {
1035 let from_vec_into: Source<u64> = vec![1, 2, 3].into();
1036 let from_array_into: Source<u64> = [1, 2, 3].into();
1037 let from_iter = Source::from_iter(1_u64..=3);
1038 let from_iterable = Source::from_iterable(1_u64..=3);
1039 let from_iterator: Source<u64> = (1_u64..=3).collect();
1040 let from_range_into_source: Source<u64> = (1_u64..=3).into_source();
1041
1042 let expected = vec![1, 2, 3];
1043 assert_eq!(from_vec_into.run_collect().unwrap(), expected);
1044 assert_eq!(from_array_into.run_collect().unwrap(), expected);
1045 assert_eq!(from_iter.run_collect().unwrap(), expected);
1046 assert_eq!(from_iterable.run_collect().unwrap(), expected);
1047 assert_eq!(from_iterator.run_collect().unwrap(), expected);
1048 assert_eq!(from_range_into_source.run_collect().unwrap(), expected);
1049 }
1050
1051 #[test]
1052 fn source_async_boundary_preserves_results() {
1053 let expected = Source::from_iter(0_u64..128)
1054 .map(|item| item.wrapping_add(1))
1055 .filter(|item| item % 3 != 0)
1056 .map(|item| item * 2)
1057 .run_collect()
1058 .unwrap();
1059
1060 let actual = Source::from_iter(0_u64..128)
1061 .map(|item| item.wrapping_add(1))
1062 .async_boundary()
1063 .filter(|item| item % 3 != 0)
1064 .map(|item| item * 2)
1065 .run_collect()
1066 .unwrap();
1067
1068 assert_eq!(actual, expected);
1069 }
1070
1071 #[test]
1072 fn flow_async_boundary_preserves_results() {
1073 let expected = Source::from_iter(0_u64..128)
1074 .map(|item| item + 1)
1075 .map(|item| item * 3)
1076 .run_collect()
1077 .unwrap();
1078
1079 let flow = Flow::identity()
1080 .map(|item: u64| item + 1)
1081 .r#async()
1082 .map(|item| item * 3);
1083 let actual = Source::from_iter(0_u64..128)
1084 .via(flow)
1085 .run_collect()
1086 .unwrap();
1087
1088 assert_eq!(actual, expected);
1089 }
1090
1091 #[test]
1092 fn linear_async_boundary_matches_graph_async_boundary_shape() {
1093 use crate::{
1094 AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
1095 GraphFlowShape, MapStage,
1096 };
1097
1098 let graph = GraphDsl::try_create(|builder| {
1099 let first = builder.add(MapStage::new(|item: u64| item + 1));
1100 let boundary = builder.add(AsyncBoundary::<u64>::new());
1101 let second = builder.add(MapStage::new(|item: u64| item * 2));
1102
1103 builder.connect(first.outlet(), boundary.inlet())?;
1104 builder.connect(boundary.outlet(), second.inlet())?;
1105
1106 Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
1107 })
1108 .unwrap();
1109
1110 let linear = Source::from_iter(1_u64..=4)
1111 .map(|item| item + 1)
1112 .async_boundary_with_buffer(4)
1113 .map(|item| item * 2)
1114 .run_collect()
1115 .unwrap();
1116 let graph_output = graph.run_with_input(1_u64..=4).unwrap();
1117 let report = graph
1118 .run_async_boundary_count_with_input_report(
1119 1_u64..=4,
1120 AsyncBoundaryExecutionConfig {
1121 fused: FusedExecutionConfig { event_limit: 1024 },
1122 buffer_size: 4,
1123 },
1124 )
1125 .unwrap();
1126
1127 assert_eq!(linear, graph_output);
1128 assert_eq!(report.result, linear.len());
1129 assert_eq!(report.async_boundary_crossings, linear.len());
1130 }
1131
1132 #[test]
1133 fn async_boundary_regions_run_concurrently() {
1134 let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
1135 let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
1136 let (release_tx, release_rx) = mpsc::channel::<()>();
1137 let release_rx = StdArc::new(Mutex::new(release_rx));
1138
1139 let completion = Source::from_iter(0_u64..3)
1140 .map(move |item| {
1141 upstream_tx.send(item).expect("upstream probe receives");
1142 item
1143 })
1144 .async_boundary_with_buffer(1)
1145 .map({
1146 let release_rx = StdArc::clone(&release_rx);
1147 move |item| {
1148 if item == 0 {
1149 downstream_blocked_tx
1150 .send(())
1151 .expect("downstream probe receives");
1152 release_rx
1153 .lock()
1154 .expect("release receiver lock")
1155 .recv_timeout(StdDuration::from_secs(2))
1156 .expect("downstream release arrives");
1157 }
1158 item
1159 }
1160 })
1161 .run_with(Sink::collect())
1162 .unwrap();
1163
1164 assert_eq!(
1165 downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
1166 Ok(())
1167 );
1168 assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1169 assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1170
1171 release_tx.send(()).expect("release downstream");
1172 assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
1173 }
1174
1175 #[test]
1176 fn async_boundary_backpressures_slow_downstream() {
1177 let (produced_tx, produced_rx) = mpsc::channel::<u64>();
1178 let (release_tx, release_rx) = mpsc::channel::<()>();
1179 let release_rx = StdArc::new(Mutex::new(release_rx));
1180
1181 let completion = Source::from_iter(0_u64..8)
1182 .map(move |item| {
1183 produced_tx.send(item).expect("producer probe receives");
1184 item
1185 })
1186 .async_boundary_with_buffer(1)
1187 .map({
1188 let release_rx = StdArc::clone(&release_rx);
1189 move |item| {
1190 if item == 0 {
1191 release_rx
1192 .lock()
1193 .expect("release receiver lock")
1194 .recv_timeout(StdDuration::from_secs(2))
1195 .expect("downstream release arrives");
1196 }
1197 item
1198 }
1199 })
1200 .run_with(Sink::collect())
1201 .unwrap();
1202
1203 assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1204 assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1205 if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1206 assert_eq!(item, 2);
1207 }
1208 match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1209 Err(mpsc::RecvTimeoutError::Timeout) => {}
1210 other => panic!("async boundary handoff was not bounded: {other:?}"),
1211 }
1212
1213 release_tx.send(()).expect("release downstream");
1214 assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
1215 }
1216
1217 #[test]
1218 fn source_blueprints_are_reusable() {
1219 let source = Source::from_iter(0..5).map(|item| item + 1);
1220
1221 assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1222 assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1223 }
1224
1225 #[test]
1226 fn source_map_preserves_materialized_value() {
1227 let graph = Source::single(1)
1228 .map_materialized_value(|_| "source")
1229 .map(|item| item + 1)
1230 .to_mat(Sink::head(), Keep::both);
1231
1232 let materialized = graph.run().unwrap();
1233 assert_eq!(materialized.0, "source");
1234 assert_eq!(wait(materialized.1), 2);
1235 }
1236
1237 #[test]
1238 fn source_and_flow_compose() {
1239 let flow = Flow::identity()
1240 .map(|item: i32| item * 2)
1241 .filter(|item| item % 3 == 0);
1242
1243 let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
1244
1245 assert_eq!(result, vec![0, 6, 12]);
1246 }
1247
1248 #[test]
1249 fn sink_setup_sees_materializer_defaults_and_local_attributes() {
1250 let observed = StdArc::new(Mutex::new(None));
1251 let observed_in_setup = StdArc::clone(&observed);
1252 let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
1253 *observed_in_setup.lock().unwrap() = Some((
1254 attrs.name().map(str::to_owned),
1255 attrs.input_buffer_hint(),
1256 attrs.dispatcher_hint().map(str::to_owned),
1257 ));
1258 Sink::ignore()
1259 })
1260 .add_attributes(Attributes::named("sink-inner"))
1261 .add_attributes(Attributes::input_buffer(4, 4))
1262 .add_attributes(Attributes::dispatcher("bench-dispatcher"));
1263
1264 let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
1265 wait(
1266 Source::from_iter([1, 2, 3])
1267 .run_with_materializer(sink, &materializer)
1268 .unwrap(),
1269 );
1270
1271 assert_eq!(
1272 *observed.lock().unwrap(),
1273 Some((
1274 Some("sink-inner".to_owned()),
1275 Some((4, 4)),
1276 Some("bench-dispatcher".to_owned())
1277 ))
1278 );
1279 }
1280
1281 #[test]
1282 fn sink_pre_materialize_feeds_existing_materialization() {
1283 let materializer = Materializer::new();
1284 let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
1285 .pre_materialize(&materializer)
1286 .unwrap();
1287
1288 Source::from_iter([1, 2, 3])
1289 .run_with_materializer(pre, &materializer)
1290 .unwrap();
1291
1292 assert_eq!(wait(completion), vec![1, 2, 3]);
1293 }
1294
1295 #[test]
1296 fn flow_from_sink_and_source_connects_both_sides() {
1297 assert_eq!(
1298 Source::from_iter([1, 2, 3])
1299 .via(Flow::from_sink_and_source(
1300 Sink::foreach(|_item: i32| {}),
1301 Source::from_iter([10, 20, 30]),
1302 ))
1303 .run_collect()
1304 .unwrap(),
1305 vec![10, 20, 30]
1306 );
1307 }
1308
1309 #[test]
1310 fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
1311 let completed = StdArc::new(StdAtomicBool::new(false));
1312 let on_complete = StdArc::clone(&completed);
1313 let flow = Flow::from_sink_and_source(
1314 Sink::on_complete(move || {
1315 on_complete.store(true, StdOrdering::SeqCst);
1316 }),
1317 Source::single(10),
1318 );
1319
1320 let result = Source::from_iter([1, 2, 3])
1321 .via(flow)
1322 .run_collect()
1323 .unwrap();
1324
1325 assert_eq!(result, vec![10]);
1326 assert!(wait_until(StdDuration::from_secs(1), || {
1327 completed.load(StdOrdering::SeqCst)
1328 }));
1329 }
1330
1331 #[test]
1332 fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1333 let cancellable = StdArc::new(Mutex::new(None));
1334 let observed = StdArc::clone(&cancellable);
1335 let flow = Flow::from_sink_and_source_coupled(
1336 Sink::ignore(),
1337 Source::tick(
1338 StdDuration::from_millis(50),
1339 StdDuration::from_millis(50),
1340 10,
1341 )
1342 .map_materialized_value(move |handle| {
1343 *observed.lock().unwrap() = Some(handle.clone());
1344 handle
1345 }),
1346 );
1347
1348 let completion = Source::from_iter(std::iter::empty::<i32>())
1349 .via(flow)
1350 .run_with(Sink::ignore())
1351 .unwrap();
1352 assert!(wait_until(StdDuration::from_secs(1), || {
1353 cancellable
1354 .lock()
1355 .unwrap()
1356 .as_ref()
1357 .is_some_and(Cancellable::is_cancelled)
1358 }));
1359 assert_eq!(wait(completion), NotUsed);
1360 }
1361
1362 #[test]
1363 fn bidi_flow_join_and_atop_compose() {
1364 let codec = BidiFlow::from_flows(
1365 Flow::identity().map(|item: i32| item + 1),
1366 Flow::identity().map(|item: i32| item * 2),
1367 )
1368 .named("codec");
1369 let framing = BidiFlow::from_flows(
1370 Flow::identity().map(|item: i32| item * 3),
1371 Flow::identity().map(|item: i32| item - 4),
1372 );
1373
1374 let joined = codec
1375 .clone()
1376 .join(Flow::identity().map(|item: i32| item - 5));
1377 let stacked = codec.atop(framing).join(Flow::identity());
1378
1379 assert_eq!(
1380 Source::single(10).via(joined).run_collect().unwrap(),
1381 vec![12]
1382 );
1383 assert_eq!(
1384 Source::single(10).via(stacked).run_collect().unwrap(),
1385 vec![58]
1386 );
1387 }
1388
1389 #[test]
1390 fn flow_buffer_then_map_runs_end_to_end() {
1391 let flow = Flow::identity()
1392 .buffer(8, OverflowStrategy::Backpressure)
1393 .map(|item: i32| item + 1);
1394
1395 let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1396
1397 assert_eq!(result, vec![1, 2, 3, 4]);
1398 }
1399
1400 #[test]
1401 fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1402 fn buffered_flow() -> Flow<i32, i32> {
1403 Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1404 }
1405
1406 assert_eq!(
1407 Source::from_iter(0..4)
1408 .via(buffered_flow().filter(|item| *item % 2 == 0))
1409 .run_collect()
1410 .unwrap(),
1411 vec![0, 2]
1412 );
1413 assert_eq!(
1414 Source::from_iter(0..4)
1415 .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1416 .run_collect()
1417 .unwrap(),
1418 vec![1, 3]
1419 );
1420 assert_eq!(
1421 Source::from_iter(0..4)
1422 .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1423 .run_collect()
1424 .unwrap(),
1425 vec![10, 12]
1426 );
1427 assert_eq!(
1428 Source::from_iter(0..3)
1429 .via(buffered_flow().map_concat(|item| [item, item + 10]))
1430 .run_collect()
1431 .unwrap(),
1432 vec![0, 10, 1, 11, 2, 12]
1433 );
1434 assert_eq!(
1435 Source::from_iter(0..3)
1436 .via(buffered_flow().stateful_map(5, |state, item| {
1437 *state += item;
1438 *state
1439 }))
1440 .run_collect()
1441 .unwrap(),
1442 vec![5, 6, 8]
1443 );
1444 assert_eq!(
1445 Source::from_iter(0..3)
1446 .via(buffered_flow().stateful_map_concat(0, |state, item| {
1447 *state += item;
1448 [*state, item]
1449 }))
1450 .run_collect()
1451 .unwrap(),
1452 vec![0, 0, 1, 1, 3, 2]
1453 );
1454 assert_eq!(
1455 Source::from_iter(0..4)
1456 .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1457 .run_collect()
1458 .unwrap(),
1459 vec![1, 2, 3, 4]
1460 );
1461 assert_eq!(
1462 Source::from_iter(0..4)
1463 .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1464 .run_collect()
1465 .unwrap(),
1466 vec![1, 2, 3, 4]
1467 );
1468 assert_eq!(
1469 Source::from_iter(0..4)
1470 .via(buffered_flow().map_async_partitioned(
1471 2,
1472 1,
1473 |item| item % 2,
1474 |item| async move { Ok(item + 1) },
1475 ))
1476 .run_collect()
1477 .unwrap(),
1478 vec![1, 2, 3, 4]
1479 );
1480 assert_eq!(
1481 Source::from_iter(0..5)
1482 .via(buffered_flow().take(3))
1483 .run_collect()
1484 .unwrap(),
1485 vec![0, 1, 2]
1486 );
1487 assert_eq!(
1488 Source::from_iter(0..5)
1489 .via(buffered_flow().drop(2))
1490 .run_collect()
1491 .unwrap(),
1492 vec![2, 3, 4]
1493 );
1494 assert_eq!(
1495 Source::from_iter(0..5)
1496 .via(buffered_flow().take_while(|item| *item < 3))
1497 .run_collect()
1498 .unwrap(),
1499 vec![0, 1, 2]
1500 );
1501 assert_eq!(
1502 Source::from_iter(0..5)
1503 .via(buffered_flow().drop_while(|item| *item < 3))
1504 .run_collect()
1505 .unwrap(),
1506 vec![3, 4]
1507 );
1508 assert_eq!(
1509 Source::from_iter(0..3)
1510 .via(buffered_flow().limit(5))
1511 .run_collect()
1512 .unwrap(),
1513 vec![0, 1, 2]
1514 );
1515 assert_eq!(
1516 Source::from_iter(0..5)
1517 .via(buffered_flow().grouped(2))
1518 .run_collect()
1519 .unwrap(),
1520 vec![vec![0, 1], vec![2, 3], vec![4]]
1521 );
1522 assert_eq!(
1523 Source::from_iter(1..=3)
1524 .via(buffered_flow().scan(0, |acc, item| acc + item))
1525 .run_collect()
1526 .unwrap(),
1527 vec![0, 1, 3, 6]
1528 );
1529 assert_eq!(
1530 Source::from_iter(1..=4)
1531 .via(buffered_flow().sliding(2, 1))
1532 .run_collect()
1533 .unwrap(),
1534 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1535 );
1536 assert_eq!(
1537 Source::from_iter(1..=4)
1538 .via(buffered_flow().fold(0, |acc, item| acc + item))
1539 .run_collect()
1540 .unwrap(),
1541 vec![10]
1542 );
1543 assert_eq!(
1544 Source::from_iter(1..=4)
1545 .via(buffered_flow().reduce(|acc, item| acc + item))
1546 .run_collect()
1547 .unwrap(),
1548 vec![10]
1549 );
1550 assert_eq!(
1551 Source::from_factory(|| {
1552 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1553 })
1554 .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1555 .run_collect(),
1556 Err(StreamError::Failed("mapped".into()))
1557 );
1558 assert_eq!(
1559 Source::<i32>::failed(StreamError::Failed("boom".into()))
1560 .via(buffered_flow().recover(|_| Some(42)))
1561 .run_collect()
1562 .unwrap(),
1563 vec![42]
1564 );
1565 assert_eq!(
1566 Source::<i32>::failed(StreamError::Failed("boom".into()))
1567 .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1568 .run_collect()
1569 .unwrap(),
1570 vec![7, 8]
1571 );
1572 assert_eq!(
1573 Source::<i32>::failed(StreamError::Failed("boom".into()))
1574 .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1575 .run_collect()
1576 .unwrap(),
1577 vec![9]
1578 );
1579 assert_eq!(
1580 Source::from_factory(|| {
1581 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1582 })
1583 .via(buffered_flow().on_error_complete())
1584 .run_collect()
1585 .unwrap(),
1586 vec![1]
1587 );
1588
1589 let materialized = Source::from_iter([1, 2, 3])
1590 .run_with(
1591 buffered_flow()
1592 .via(Flow::identity().map(|item| item + 1))
1593 .map_materialized_value(|_| "buffered-flow")
1594 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1595 )
1596 .unwrap();
1597 assert_eq!(materialized.0, "buffered-flow");
1598 assert_eq!(wait(materialized.1), 9);
1599
1600 let kept = Source::from_iter([1, 2, 3])
1601 .run_with(
1602 buffered_flow()
1603 .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1604 .to(Sink::fold(0, |acc, item| acc + item)),
1605 )
1606 .unwrap();
1607 assert_eq!(kept, "combined");
1608 }
1609
1610 #[test]
1611 fn runtime_rate_flows_compose_in_flow_form() {
1612 let conflate = Flow::identity()
1613 .conflate(|left: i32, right| left + right)
1614 .map(|item| item + 1);
1615 assert_eq!(
1616 Source::single(4).via(conflate).run_collect().unwrap(),
1617 vec![5]
1618 );
1619
1620 let batch = Flow::identity()
1621 .batch(4, |item: i32| item, |left, right| left + right)
1622 .map(|item| item + 1);
1623 assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1624
1625 let expand = Flow::identity()
1626 .expand(std::iter::once::<i32>)
1627 .map(|item| item + 1);
1628 assert_eq!(
1629 Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1630 vec![1, 2, 3, 4]
1631 );
1632
1633 let aggregate = Flow::identity()
1634 .aggregate_with_boundary(
1635 Vec::<i32>::new,
1636 |mut items, item| {
1637 items.push(item);
1638 let ready = !items.is_empty();
1639 (items, ready)
1640 },
1641 |items| items.into_iter().sum::<i32>(),
1642 None,
1643 )
1644 .map(|item| item + 1);
1645 assert_eq!(
1646 Source::from_iter(0..4)
1647 .via(aggregate)
1648 .run_collect()
1649 .unwrap(),
1650 vec![1, 2, 3, 4]
1651 );
1652
1653 let detached = Flow::identity().detach().map(|item: i32| item + 1);
1654 assert_eq!(
1655 Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1656 vec![1, 2, 3, 4]
1657 );
1658 }
1659
1660 #[test]
1661 fn high_use_source_flow_operators_work() {
1662 let result = Source::from_iter(0..8)
1663 .drop(1)
1664 .take(5)
1665 .filter_not(|item| item % 2 == 0)
1666 .map_concat(|item| [item, item + 10])
1667 .grouped(3)
1668 .run_collect()
1669 .unwrap();
1670
1671 assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1672 }
1673
1674 #[test]
1675 fn prefix_and_tail_emits_prefix_and_live_tail() {
1676 let mut outer = Source::from_iter(0..5)
1677 .prefix_and_tail(2)
1678 .run_collect()
1679 .unwrap();
1680 assert_eq!(outer.len(), 1);
1681 let (prefix, tail) = outer.pop().unwrap();
1682 assert_eq!(prefix, vec![0, 1]);
1683 assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1684 assert_eq!(
1685 tail.run_collect(),
1686 Err(StreamError::Failed(
1687 "substream source cannot be materialized more than once".into()
1688 ))
1689 );
1690 }
1691
1692 #[test]
1693 fn prefix_and_tail_fails_before_prefix_is_ready() {
1694 let result = Source::from_factory(|| {
1695 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1696 })
1697 .prefix_and_tail(2)
1698 .run_collect();
1699 assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1700 }
1701
1702 #[test]
1703 fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1704 let mut outer = Source::from_factory(|| {
1705 Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1706 })
1707 .prefix_and_tail(2)
1708 .run_collect()
1709 .unwrap();
1710 let (prefix, tail) = outer.pop().unwrap();
1711 assert_eq!(prefix, vec![1, 2]);
1712 assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1713 }
1714
1715 #[test]
1716 fn prefix_and_tail_accepts_non_clone_elements() {
1717 #[derive(Debug, PartialEq, Eq)]
1718 struct NonClone(u8);
1719
1720 let mut outer = Source::from_factory(|| {
1721 Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1722 })
1723 .prefix_and_tail(2)
1724 .run_collect()
1725 .unwrap();
1726 let (prefix, tail) = outer.pop().unwrap();
1727 assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1728 assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1729 }
1730
1731 #[test]
1732 fn flat_map_prefix_materializes_on_short_upstream_completion() {
1733 let values = Source::from_iter([1, 2])
1734 .flat_map_prefix(3, |prefix| {
1735 let sum = prefix.into_iter().sum::<i32>();
1736 Flow::identity().prepend(Source::single(sum))
1737 })
1738 .run_collect()
1739 .unwrap();
1740 assert_eq!(values, vec![3]);
1741 }
1742
1743 #[test]
1744 fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1745 let invoked = StdArc::new(StdAtomicBool::new(false));
1746 let invoked_for_stage = StdArc::clone(&invoked);
1747 let result = Source::from_factory(|| {
1748 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1749 })
1750 .flat_map_prefix(3, move |_prefix| {
1751 invoked_for_stage.store(true, StdOrdering::SeqCst);
1752 Flow::identity()
1753 })
1754 .run_collect();
1755 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1756 assert!(!invoked.load(StdOrdering::SeqCst));
1757 }
1758
1759 #[test]
1760 fn flat_map_concat_flattens_nested_sources_sequentially() {
1761 let values = Source::from_iter([1, 2, 3])
1762 .flat_map_concat(|item| Source::from_iter(0..item))
1763 .run_collect()
1764 .unwrap();
1765 assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1766 }
1767
1768 #[test]
1769 fn flat_map_merge_respects_breadth_bound() {
1770 let active = StdArc::new(StdAtomicUsize::new(0));
1771 let max_active = StdArc::new(StdAtomicUsize::new(0));
1772 let active_for_stage = StdArc::clone(&active);
1773 let max_for_stage = StdArc::clone(&max_active);
1774
1775 let mut values = Source::from_iter(0..6)
1776 .flat_map_merge(2, move |item| {
1777 let active = StdArc::clone(&active_for_stage);
1778 let max_active = StdArc::clone(&max_for_stage);
1779 Source::future(move || {
1780 let active = StdArc::clone(&active);
1781 let max_active = StdArc::clone(&max_active);
1782 async move {
1783 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1784 loop {
1785 let seen = max_active.load(StdOrdering::SeqCst);
1786 if now <= seen {
1787 break;
1788 }
1789 if max_active
1790 .compare_exchange(
1791 seen,
1792 now,
1793 StdOrdering::SeqCst,
1794 StdOrdering::SeqCst,
1795 )
1796 .is_ok()
1797 {
1798 break;
1799 }
1800 }
1801 thread::sleep(StdDuration::from_millis(20));
1802 active.fetch_sub(1, StdOrdering::SeqCst);
1803 Ok(item)
1804 }
1805 })
1806 })
1807 .run_collect()
1808 .unwrap();
1809 values.sort_unstable();
1810 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1811 assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1812 }
1813
1814 #[test]
1815 fn flat_map_merge_propagates_inner_failures() {
1816 let result = Source::from_iter([0, 1, 2])
1817 .flat_map_merge(2, |item| {
1818 if item == 1 {
1819 Source::failed(StreamError::Failed("boom".into()))
1820 } else {
1821 Source::single(item)
1822 }
1823 })
1824 .run_collect();
1825 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1826 }
1827
1828 #[test]
1829 fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1830 let (release_tx, release_rx) = mpsc::channel();
1831 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1832 let queue = Source::from_factory(move || {
1833 let release_rx = StdArc::clone(&release_rx);
1834 let mut step = 0_u8;
1835 Box::new(std::iter::from_fn(move || {
1836 let item = match step {
1837 0 => Some(Ok(0)),
1838 1 => {
1839 release_rx
1840 .lock()
1841 .unwrap()
1842 .as_ref()
1843 .expect("release receiver available")
1844 .recv_timeout(StdDuration::from_secs(1))
1845 .expect("timed out waiting to release second upstream element");
1846 Some(Ok(1))
1847 }
1848 _ => None,
1849 };
1850 step += 1;
1851 item
1852 }))
1853 })
1854 .flat_map_merge(2, |item| Source::single(item + 10))
1855 .run_with(Sink::queue())
1856 .unwrap();
1857
1858 assert_eq!(queue.pull().unwrap(), Some(10));
1859 release_tx.send(()).unwrap();
1860 assert_eq!(queue.pull().unwrap(), Some(11));
1861 assert!(queue.pull().unwrap().is_none());
1862 }
1863
1864 #[test]
1865 fn group_by_routes_keys_and_drops_closed_keys() {
1866 let outer = Source::from_iter([0, 1, 2, 3, 4])
1867 .group_by(4, |item| item % 2, false)
1868 .run_with(Sink::queue())
1869 .unwrap();
1870
1871 let even = outer.pull().unwrap().unwrap();
1872 let even_completion = even.run_with(Sink::ignore()).unwrap();
1873 let odd = outer.pull().unwrap().unwrap();
1874 drop(even_completion);
1875
1876 assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1877 assert!(outer.pull().unwrap().is_none());
1878 }
1879
1880 #[test]
1881 fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1882 let outer = Source::from_iter([0, 1, 2])
1883 .group_by(2, |item| *item, false)
1884 .run_with(Sink::queue())
1885 .unwrap();
1886
1887 let _ = outer.pull().unwrap().unwrap();
1888 let _ = outer.pull().unwrap().unwrap();
1889 assert!(matches!(
1890 outer.pull(),
1891 Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1892 ));
1893 }
1894
1895 #[test]
1896 fn group_by_can_recreate_closed_substreams_when_enabled() {
1897 let (release_tx, release_rx) = mpsc::channel();
1898 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1899 let outer = Source::from_factory(move || {
1900 let release_rx = StdArc::clone(&release_rx);
1901 let mut step = 0_u8;
1902 Box::new(std::iter::from_fn(move || {
1903 let item = match step {
1904 0 => Some(Ok(0)),
1905 1 => Some(Ok(1)),
1906 2 => {
1907 release_rx
1908 .lock()
1909 .unwrap()
1910 .as_ref()
1911 .expect("release receiver available")
1912 .recv_timeout(StdDuration::from_secs(1))
1913 .expect("timed out waiting to release recreated key");
1914 Some(Ok(0))
1915 }
1916 _ => None,
1917 };
1918 step += 1;
1919 item
1920 }))
1921 })
1922 .group_by(4, |item| item % 2, true)
1923 .run_with(Sink::queue())
1924 .unwrap();
1925
1926 let even = outer.pull().unwrap().unwrap();
1927 assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1928 release_tx.send(()).unwrap();
1929
1930 let odd = outer.pull().unwrap().unwrap();
1931 assert_eq!(odd.run_collect().unwrap(), vec![1]);
1932
1933 let recreated_even = outer.pull().unwrap().unwrap();
1934 assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1935 assert!(outer.pull().unwrap().is_none());
1936 }
1937
1938 #[test]
1939 fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1940 let outer = Source::from_iter([0, 1])
1941 .group_by(
1942 4,
1943 |item| {
1944 assert_ne!(*item, 1, "boom");
1945 item % 2
1946 },
1947 false,
1948 )
1949 .run_with(Sink::queue())
1950 .unwrap();
1951
1952 let substream = outer.pull().unwrap().unwrap();
1953 let (result_tx, result_rx) = mpsc::channel();
1954 thread::spawn(move || {
1955 let _ = result_tx.send(substream.run_collect());
1956 });
1957
1958 assert_eq!(
1959 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1960 Err(StreamError::AbruptTermination)
1961 );
1962 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1963 }
1964
1965 #[test]
1966 fn split_when_starts_new_substream_on_boundary_element() {
1967 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1968 .split_when(|item| *item == 0)
1969 .run_with(Sink::queue())
1970 .unwrap();
1971
1972 let first = outer.pull().unwrap().unwrap();
1973 assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1974 let second = outer.pull().unwrap().unwrap();
1975 assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1976 let third = outer.pull().unwrap().unwrap();
1977 assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1978 assert!(outer.pull().unwrap().is_none());
1979 }
1980
1981 #[test]
1982 fn split_after_ends_current_substream_on_boundary_element() {
1983 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1984 .split_after(|item| *item == 0)
1985 .run_with(Sink::queue())
1986 .unwrap();
1987
1988 let first = outer.pull().unwrap().unwrap();
1989 assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1990 let second = outer.pull().unwrap().unwrap();
1991 assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1992 let third = outer.pull().unwrap().unwrap();
1993 assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1994 assert!(outer.pull().unwrap().is_none());
1995 }
1996
1997 #[test]
1998 fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1999 let outer = Source::from_iter([1, 2])
2000 .split_when(|item| {
2001 assert_ne!(*item, 2, "boom");
2002 false
2003 })
2004 .run_with(Sink::queue())
2005 .unwrap();
2006
2007 let substream = outer.pull().unwrap().unwrap();
2008 let (result_tx, result_rx) = mpsc::channel();
2009 thread::spawn(move || {
2010 let _ = result_tx.send(substream.run_collect());
2011 });
2012
2013 assert_eq!(
2014 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
2015 Err(StreamError::AbruptTermination)
2016 );
2017 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
2018 }
2019
2020 #[test]
2021 fn split_when_pre_buffer_segments_match_expected_count() {
2022 let outer = Source::from_iter(0..100)
2023 .split_when(|item| *item != 0 && *item % 10 == 0)
2024 .run_with(Sink::queue())
2025 .unwrap();
2026 let mut segment_count = 0;
2027 while let Some(substream) = outer.pull().unwrap() {
2028 let items: Vec<i32> = substream.run_collect().unwrap();
2029 assert!(!items.is_empty(), "segment should not be empty");
2030 segment_count += 1;
2031 }
2032 assert_eq!(segment_count, 10, "100 elements in segments of 10");
2033 }
2034
2035 #[test]
2036 fn split_after_pre_buffer_segments_match_expected_count() {
2037 let outer = Source::from_iter(0..100)
2038 .split_after(|item| (*item + 1) % 10 == 0)
2039 .run_with(Sink::queue())
2040 .unwrap();
2041 let mut segment_count = 0;
2042 let mut total = 0_i32;
2043 while let Some(substream) = outer.pull().unwrap() {
2044 let items: Vec<i32> = substream.run_collect().unwrap();
2045 assert!(!items.is_empty(), "segment should not be empty");
2046 total += items.len() as i32;
2047 segment_count += 1;
2048 }
2049 assert_eq!(segment_count, 10);
2050 assert_eq!(total, 100);
2051 }
2052
2053 #[test]
2054 fn group_by_single_key_fused_matches_general_path() {
2055 let outer = Source::from_iter(0..1000i64)
2056 .group_by(1, |_| 0u8, false)
2057 .run_with(Sink::queue())
2058 .unwrap();
2059 let substream = outer.pull().unwrap().unwrap();
2060 let items: Vec<i64> = substream.run_collect().unwrap();
2061 assert_eq!(items.len(), 1000);
2062 assert_eq!(items[0], 0);
2063 assert_eq!(items[999], 999);
2064 assert!(outer.pull().unwrap().is_none());
2065 }
2066
2067 #[test]
2068 fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
2069 let outer = Source::from_iter([0, 1, 0])
2070 .group_by(2, |item| *item, false)
2071 .run_with(Sink::queue())
2072 .unwrap();
2073 let mut sources = vec![];
2074 while let Some(source) = outer.pull().unwrap() {
2075 sources.push(source);
2076 }
2077 assert_eq!(sources.len(), 2);
2078 assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
2079 assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
2080 }
2081
2082 #[test]
2083 fn flat_map_merge_lock_lighter_matches_expected_count() {
2084 let items = Source::from_iter(0..20)
2085 .flat_map_merge(2, |item| Source::single(item + 100))
2086 .run_with(Sink::queue())
2087 .unwrap();
2088 let mut count = 0;
2089 while items.pull().unwrap().is_some() {
2090 count += 1;
2091 }
2092 assert_eq!(count, 20);
2093 }
2094
2095 #[test]
2105 fn group_by_single_key_emits_substream_before_upstream_completes() {
2106 let (tx, rx) = mpsc::sync_channel::<i32>(0);
2109 let rx = StdArc::new(std::sync::Mutex::new(rx));
2110
2111 let outer = Source::from_factory({
2112 let rx = StdArc::clone(&rx);
2113 move || {
2114 let rx = StdArc::clone(&rx);
2115 Box::new(std::iter::from_fn(move || {
2116 rx.lock().unwrap().recv().ok().map(Ok)
2117 })) as BoxStream<i32>
2118 }
2119 })
2120 .group_by(1, |_| 0u8, false)
2121 .run_with(Sink::queue())
2122 .unwrap();
2123
2124 let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
2127 let outer_thread = thread::spawn(move || {
2128 let substream = outer.pull().unwrap().expect("expected a substream");
2129 sub_tx.send(substream).unwrap();
2130 });
2131
2132 tx.send(0).unwrap();
2135
2136 let substream = sub_rx
2138 .recv_timeout(StdDuration::from_secs(5))
2139 .expect("timed out — group_by buffered first element before emitting substream");
2140
2141 for i in 1..100_i32 {
2143 tx.send(i).unwrap();
2144 }
2145 drop(tx);
2146
2147 let items: Vec<i32> = substream.run_collect().unwrap();
2148 assert_eq!(items.len(), 100);
2149 outer_thread.join().unwrap();
2150 }
2151
2152 #[test]
2153 fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
2154 const STREAMS: usize = 32;
2155 const ROUNDS: usize = 8;
2156 const ITEMS: i64 = 8;
2157
2158 for _ in 0..ROUNDS {
2159 let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
2160 let mut handles = Vec::with_capacity(STREAMS);
2161
2162 for _ in 0..STREAMS {
2163 let barrier = StdArc::clone(&barrier);
2164 handles.push(thread::spawn(move || {
2165 let (tx, rx) = mpsc::sync_channel::<i64>(0);
2166 let rx = StdArc::new(std::sync::Mutex::new(rx));
2167
2168 let outer = Source::from_factory({
2169 let rx = StdArc::clone(&rx);
2170 move || {
2171 let rx = StdArc::clone(&rx);
2172 Box::new(std::iter::from_fn(move || {
2173 rx.lock().unwrap().recv().ok().map(Ok)
2174 })) as BoxStream<i64>
2175 }
2176 })
2177 .group_by(1, |_| 0_u8, false)
2178 .run_with(Sink::queue())
2179 .unwrap();
2180
2181 barrier.wait();
2182
2183 tx.send(0).unwrap();
2184 let substream = outer.pull().unwrap().expect("expected group_by substream");
2185 let subqueue = substream.run_with(Sink::queue()).unwrap();
2186 assert_eq!(subqueue.pull().unwrap(), Some(0));
2187
2188 for item in 1..ITEMS {
2189 tx.send(item).unwrap();
2190 assert_eq!(subqueue.pull().unwrap(), Some(item));
2191 }
2192 drop(tx);
2193
2194 assert!(subqueue.pull().unwrap().is_none());
2195 assert!(outer.pull().unwrap().is_none());
2196 }));
2197 }
2198
2199 for handle in handles {
2200 handle.join().expect("group_by stress worker panicked");
2201 }
2202 }
2203 }
2204
2205 #[test]
2212 fn split_when_emits_substream_before_segment_ends() {
2213 const SEGMENT_LEN: usize = 300;
2217
2218 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2219 for i in 0..SEGMENT_LEN as i32 {
2220 tx.send(i).unwrap();
2221 }
2222 tx.send(-1).unwrap(); tx.send(99).unwrap(); drop(tx);
2225
2226 let outer = Source::from_iter(rx)
2227 .split_when(|item| *item == -1)
2228 .run_with(Sink::queue())
2229 .unwrap();
2230
2231 let (result_tx, result_rx) = mpsc::channel();
2232 thread::spawn(move || {
2233 let first = outer.pull().unwrap().expect("expected first substream");
2234 let items: Vec<i32> = first.run_collect().unwrap();
2235 let second = outer.pull().unwrap().expect("expected second substream");
2236 let items2: Vec<i32> = second.run_collect().unwrap();
2237 let done = outer.pull().unwrap().is_none();
2238 let _ = result_tx.send((items, items2, done));
2239 });
2240
2241 let (items, items2, done) = result_rx
2242 .recv_timeout(StdDuration::from_secs(5))
2243 .expect("timed out — split_when is buffering the whole segment");
2244 assert_eq!(items.len(), SEGMENT_LEN);
2245 assert_eq!(items2, vec![-1, 99]);
2246 assert!(done);
2247 }
2248
2249 #[test]
2250 fn split_after_emits_substream_before_segment_ends() {
2251 const SEGMENT_LEN: usize = 300;
2252
2253 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2254 for i in 0..SEGMENT_LEN as i32 {
2255 tx.send(i).unwrap();
2256 }
2257 tx.send(-1).unwrap(); tx.send(99).unwrap();
2259 drop(tx);
2260
2261 let outer = Source::from_iter(rx)
2262 .split_after(|item| *item == -1)
2263 .run_with(Sink::queue())
2264 .unwrap();
2265
2266 let (result_tx, result_rx) = mpsc::channel();
2267 thread::spawn(move || {
2268 let first = outer.pull().unwrap().expect("expected first substream");
2269 let items: Vec<i32> = first.run_collect().unwrap();
2270 let second = outer.pull().unwrap().expect("expected second substream");
2271 let items2: Vec<i32> = second.run_collect().unwrap();
2272 let done = outer.pull().unwrap().is_none();
2273 let _ = result_tx.send((items, items2, done));
2274 });
2275
2276 let (items, items2, done) = result_rx
2277 .recv_timeout(StdDuration::from_secs(5))
2278 .expect("timed out — split_after is buffering the whole segment");
2279 assert_eq!(items.len(), SEGMENT_LEN + 1);
2281 assert_eq!(items2, vec![99]);
2282 assert!(done);
2283 }
2284
2285 #[test]
2289 fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
2290 for _ in 0..20 {
2291 let result = Source::from_iter(0..50_i32)
2292 .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
2293 .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
2294 .unwrap()
2295 .wait();
2296 assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
2299 }
2300 }
2301
2302 #[test]
2307 fn flat_map_merge_single_mutex_race_stress() {
2308 for _ in 0..20 {
2309 let result = Source::from_iter(0..100_i64)
2310 .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
2311 .run_with(Sink::fold(0i64, |acc, v| acc + v))
2312 .unwrap()
2313 .wait();
2314 assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
2318 }
2319 }
2320
2321 #[test]
2328 fn split_when_bounded_memory_rendezvous() {
2329 const SEGMENT: usize = 100;
2332 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2333 for i in 0..SEGMENT as i32 {
2334 tx.send(i).unwrap();
2335 }
2336 tx.send(-1).unwrap(); for i in 0..10_i32 {
2339 tx.send(i).unwrap();
2340 }
2341 drop(tx);
2342
2343 let outer = Source::from_iter(rx)
2344 .split_when(|item| *item == -1)
2345 .run_with(Sink::queue())
2346 .unwrap();
2347
2348 let (result_tx, result_rx) = mpsc::channel();
2349 thread::spawn(move || {
2350 let first = outer.pull().unwrap().expect("first segment");
2351 let seg1: Vec<i32> = first.run_collect().unwrap();
2352 let second = outer.pull().unwrap().expect("second segment");
2353 let seg2: Vec<i32> = second.run_collect().unwrap();
2354 let done = outer.pull().unwrap().is_none();
2355 result_tx.send((seg1, seg2, done)).unwrap();
2356 });
2357
2358 let (seg1, seg2, done) = result_rx
2359 .recv_timeout(StdDuration::from_secs(5))
2360 .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2361 assert_eq!(seg1.len(), SEGMENT, "first segment length");
2362 assert_eq!(seg2[0], -1, "boundary element starts second segment");
2363 assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2364 assert!(done);
2365 }
2366
2367 #[test]
2371 fn group_by_single_key_bounded_memory_rendezvous() {
2372 const N: usize = 200;
2375 let outer = Source::from_iter(0..N as i64)
2376 .group_by(1, |_| 0u8, false)
2377 .run_with(Sink::queue())
2378 .unwrap();
2379
2380 let (result_tx, result_rx) = mpsc::channel();
2381 thread::spawn(move || {
2382 let substream = outer.pull().unwrap().expect("substream");
2383 let items: Vec<i64> = substream.run_collect().unwrap();
2384 let done = outer.pull().unwrap().is_none();
2385 result_tx.send((items, done)).unwrap();
2386 });
2387
2388 let (items, done) = result_rx
2389 .recv_timeout(StdDuration::from_secs(5))
2390 .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2391 assert_eq!(items.len(), N, "all items delivered");
2392 assert_eq!(items[0], 0);
2393 assert_eq!(items[N - 1], (N - 1) as i64);
2394 assert!(done);
2395 }
2396
2397 #[test]
2398 fn scan_emits_seed_and_accumulated_values() {
2399 let result = Source::from_iter(1..=3)
2400 .scan(0, |acc, item| acc + item)
2401 .run_collect()
2402 .unwrap();
2403
2404 assert_eq!(result, vec![0, 1, 3, 6]);
2405 }
2406
2407 #[test]
2408 fn limit_fails_after_max_elements() {
2409 let result = Source::from_iter(0..3).limit(2).run_collect();
2410
2411 assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2412 }
2413
2414 #[test]
2415 fn limit_weighted_fails_with_limit_error_like_akka() {
2416 let result = Source::from_iter(["this", "is", "some", "string"])
2417 .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2418 .run_collect();
2419
2420 assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2421 }
2422
2423 #[test]
2424 fn grouped_weighted_allows_oversized_first_element_like_akka() {
2425 let result = Source::from_iter([10_usize, 1, 2])
2426 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2427 .run_collect()
2428 .unwrap();
2429
2430 assert_eq!(result, vec![vec![10], vec![1, 2]]);
2431 }
2432
2433 #[test]
2434 fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2435 let result = Source::from_iter([1_usize, 10, 2])
2436 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2437 .run_collect()
2438 .unwrap();
2439
2440 assert_eq!(result, vec![vec![1, 10], vec![2]]);
2441 }
2442
2443 #[test]
2444 fn sink_terminals_materialize_results() {
2445 let sum = Source::from_iter(1..=4)
2446 .run_with(Sink::fold(0, |acc, item| acc + item))
2447 .unwrap();
2448
2449 assert_eq!(wait(sum), 10);
2450 assert_eq!(
2451 wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2452 1
2453 );
2454 assert_eq!(
2455 wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2456 4
2457 );
2458 }
2459
2460 #[test]
2461 fn all_terminal_sink_variants_complete() {
2462 assert_eq!(
2463 wait(
2464 Source::from_iter([1, 2, 3])
2465 .run_with(Sink::collect())
2466 .unwrap()
2467 ),
2468 vec![1, 2, 3]
2469 );
2470 assert_eq!(
2471 wait(
2472 Source::<i32>::empty()
2473 .run_with(Sink::head_option())
2474 .unwrap()
2475 ),
2476 None
2477 );
2478 assert_eq!(
2479 wait(
2480 Source::from_iter([1, 2, 3])
2481 .run_with(Sink::last_option())
2482 .unwrap()
2483 ),
2484 Some(3)
2485 );
2486 assert_eq!(
2487 wait(
2488 Source::from_iter([1, 2, 3])
2489 .run_with(Sink::reduce(|acc, item| acc + item))
2490 .unwrap()
2491 ),
2492 6
2493 );
2494
2495 let seen = StdArc::new(StdAtomicUsize::new(0));
2496 let seen_by_sink = StdArc::clone(&seen);
2497 assert_eq!(
2498 wait(
2499 Source::from_iter([1_usize, 2, 3])
2500 .run_with(Sink::foreach(move |item| {
2501 seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2502 }))
2503 .unwrap()
2504 ),
2505 NotUsed
2506 );
2507 assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2508 }
2509
2510 #[test]
2511 fn take_last_zero_returns_empty_vector() {
2512 let result = Source::from_iter([1, 2, 3])
2513 .run_with(Sink::take_last(0))
2514 .unwrap();
2515
2516 assert_eq!(wait(result), Vec::<i32>::new());
2517 }
2518
2519 #[test]
2520 fn bounded_head_terminals_complete_inline() {
2521 let materializer = Materializer::new();
2522
2523 let mut head = Source::from_iter(0_u64..1_000)
2524 .run_with_materializer(Sink::head(), &materializer)
2525 .unwrap();
2526 assert_eq!(materializer.active_streams(), 0);
2527 assert_eq!(head.try_wait(), Some(Ok(0)));
2528
2529 let mut filtered_head = Source::from_iter(0_u64..1_000)
2530 .filter(|item| *item >= 10)
2531 .run_with_materializer(Sink::head(), &materializer)
2532 .unwrap();
2533 assert_eq!(materializer.active_streams(), 0);
2534 assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2535
2536 let mut head_option = Source::<u64>::empty()
2537 .run_with_materializer(Sink::head_option(), &materializer)
2538 .unwrap();
2539 assert_eq!(materializer.active_streams(), 0);
2540 assert_eq!(head_option.try_wait(), Some(Ok(None)));
2541 }
2542
2543 #[test]
2544 fn bounded_head_fast_path_preserves_terminal_errors() {
2545 let materializer = Materializer::new();
2546
2547 let mut empty = Source::<u64>::empty()
2548 .run_with_materializer(Sink::head(), &materializer)
2549 .unwrap();
2550 assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2551
2552 let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2553 .run_with_materializer(Sink::head(), &materializer)
2554 .unwrap();
2555 assert_eq!(
2556 failed.try_wait(),
2557 Some(Err(StreamError::Failed("boom".into())))
2558 );
2559 assert_eq!(materializer.active_streams(), 0);
2560 }
2561
2562 #[test]
2563 fn runnable_graph_composes_source_and_sink() {
2564 let graph = Source::from_iter(1..=4)
2565 .map(|item| item * 2)
2566 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2567
2568 assert_eq!(wait(graph.run().unwrap()), 20);
2569
2570 let graph = Source::single(1)
2571 .map_materialized_value(|_| 20)
2572 .to(Sink::ignore())
2573 .map_materialized_value(|value| value + 1);
2574 assert_eq!(graph.run().unwrap(), 21);
2575
2576 let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2577 assert_eq!(ignored, NotUsed);
2578 }
2579
2580 #[test]
2581 fn materialized_values_follow_keep_defaults() {
2582 let source = Source::single(1).map_materialized_value(|_| "source");
2583 let flow = Flow::identity().map_materialized_value(|_| "flow");
2584
2585 let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2586 assert_eq!(source_mat.unwrap(), "source");
2587
2588 let combined = source
2589 .via_mat(flow, Keep::both)
2590 .to_mat(Sink::ignore(), Keep::both)
2591 .run()
2592 .unwrap();
2593 assert_eq!(combined.0, ("source", "flow"));
2594 assert_eq!(wait(combined.1), NotUsed);
2595
2596 let sink_mat = Source::single(41)
2597 .map_materialized_value(|_| "ignored source")
2598 .run_with(Sink::fold(1, |acc, item| acc + item))
2599 .unwrap();
2600 assert_eq!(wait(sink_mat), 42);
2601 }
2602
2603 #[test]
2604 fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2605 let sink = Flow::identity()
2606 .map(|item: i32| item + 1)
2607 .map_materialized_value(|_| "flow")
2608 .to(Sink::fold(0, |acc, item| acc + item));
2609
2610 let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2611
2612 assert_eq!(materialized, "flow");
2613 let explicit = Flow::identity()
2614 .map(|item: i32| item + 1)
2615 .map_materialized_value(|_| "flow")
2616 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2617 .run_with(Source::from_iter([1, 2, 3]))
2618 .unwrap();
2619 assert_eq!(explicit, NotUsed);
2620
2621 let explicit = Source::from_iter([1, 2, 3])
2622 .run_with(
2623 Flow::identity()
2624 .map(|item: i32| item + 1)
2625 .map_materialized_value(|_| "flow")
2626 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2627 )
2628 .unwrap();
2629 assert_eq!(explicit.0, "flow");
2630 assert_eq!(wait(explicit.1), 9);
2631 }
2632
2633 #[test]
2634 fn materializer_shutdown_fails_materialization() {
2635 let materializer = Materializer::new();
2636 let named = materializer.with_name_prefix("test-stream");
2637 materializer.shutdown();
2638
2639 let graph = Source::single(1).to(Sink::ignore());
2640
2641 assert_eq!(named.name_prefix(), "test-stream");
2642 assert_eq!(
2643 graph.run_with_materializer(&named),
2644 Err(StreamError::AbruptTermination)
2645 );
2646 }
2647
2648 #[test]
2649 fn materializer_shutdown_fails_running_stream_completion() {
2650 let materializer = Materializer::new();
2651 let completion = Source::repeat(1)
2652 .run_with_materializer(Sink::ignore(), &materializer)
2653 .unwrap();
2654
2655 assert_eq!(materializer.active_streams(), 1);
2656 materializer.shutdown();
2657 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2658 assert_eq!(materializer.active_streams(), 0);
2659 }
2660
2661 #[test]
2662 fn dropped_stream_completion_cancels_running_stream() {
2663 let materializer = Materializer::new();
2664 let completion = Source::repeat(1)
2665 .run_with_materializer(Sink::ignore(), &materializer)
2666 .unwrap();
2667
2668 assert_eq!(materializer.active_streams(), 1);
2669 drop(completion);
2670 for _ in 0..50 {
2671 if materializer.active_streams() == 0 {
2672 break;
2673 }
2674 thread::sleep(Duration::from_millis(5));
2675 }
2676 assert_eq!(materializer.active_streams(), 0);
2677 }
2678
2679 #[test]
2680 fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2681 let materializer = Materializer::new();
2682 let (once_tx, once_rx) = mpsc::channel();
2683 let once = materializer.schedule_once(Duration::from_millis(5), move || {
2684 once_tx.send(()).unwrap();
2685 });
2686 once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2687 assert!(!once.is_cancelled());
2688
2689 let (cancelled_tx, cancelled_rx) = mpsc::channel();
2690 let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2691 cancelled_tx.send(()).unwrap();
2692 });
2693 assert!(cancelled.cancel());
2694 assert!(!cancelled.cancel());
2695 assert!(cancelled.is_cancelled());
2696 assert!(
2697 cancelled_rx
2698 .recv_timeout(Duration::from_millis(75))
2699 .is_err()
2700 );
2701
2702 let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2703 let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2704 let fixed_delay = materializer.schedule_with_fixed_delay(
2705 Duration::from_millis(1),
2706 Duration::from_millis(5),
2707 move || {
2708 fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2709 },
2710 );
2711 thread::sleep(Duration::from_millis(25));
2712 assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2713 fixed_delay.cancel();
2714
2715 let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2716 let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2717 let fixed_rate = materializer.schedule_at_fixed_rate(
2718 Duration::from_millis(1),
2719 Duration::from_millis(5),
2720 move || {
2721 fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2722 },
2723 );
2724 thread::sleep(Duration::from_millis(25));
2725 assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2726 fixed_rate.cancel();
2727
2728 let shutdown_materializer = Materializer::new();
2729 let (shutdown_tx, shutdown_rx) = mpsc::channel();
2730 shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2731 shutdown_tx.send(()).unwrap();
2732 });
2733 shutdown_materializer.shutdown();
2734 assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2735 }
2736
2737 #[test]
2738 fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2739 use std::sync::{Condvar, Mutex};
2740
2741 #[derive(Debug)]
2742 enum TimerEvent {
2743 Started(usize, Instant),
2744 Completed(usize, Instant),
2745 }
2746
2747 let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2748 rx.recv_timeout(Duration::from_secs(20))
2749 .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2750 };
2751 let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2752 let (released, condvar) = &**gate;
2753 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2754 *released = true;
2755 condvar.notify_all();
2756 };
2757
2758 let interval = Duration::from_secs(2);
2759 let overrun = interval + Duration::from_millis(250);
2760
2761 let rate_materializer = Materializer::new();
2762 let (rate_tx, rate_rx) = mpsc::channel();
2763 let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2764 let rate_task_runs = StdArc::clone(&rate_runs);
2765 let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2766 let rate_task_gate = StdArc::clone(&rate_gate);
2767 let fixed_rate =
2768 rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2769 let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2770 rate_tx
2771 .send(TimerEvent::Started(run, Instant::now()))
2772 .unwrap();
2773 if run == 1 {
2774 let (released, condvar) = &*rate_task_gate;
2775 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2776 while !*released {
2777 released = condvar
2778 .wait(released)
2779 .unwrap_or_else(|poison| poison.into_inner());
2780 }
2781 rate_tx
2782 .send(TimerEvent::Completed(run, Instant::now()))
2783 .unwrap();
2784 }
2785 });
2786 let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2787 TimerEvent::Started(1, at) => at,
2788 other => panic!("fixed-rate first task: unexpected event {other:?}"),
2789 };
2790 assert!(wait_until(Duration::from_secs(20), || {
2791 rate_first_started.elapsed() >= overrun
2792 }));
2793 release(&rate_gate);
2794 let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2795 TimerEvent::Completed(1, at) => at,
2796 other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2797 };
2798 let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2799 TimerEvent::Started(2, at) => at,
2800 other => panic!("fixed-rate second task: unexpected event {other:?}"),
2801 };
2802 fixed_rate.cancel();
2803 rate_materializer.shutdown();
2804
2805 let delay_materializer = Materializer::new();
2806 let (delay_tx, delay_rx) = mpsc::channel();
2807 let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2808 let delay_task_runs = StdArc::clone(&delay_runs);
2809 let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2810 let delay_task_gate = StdArc::clone(&delay_gate);
2811 let fixed_delay =
2812 delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2813 let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2814 delay_tx
2815 .send(TimerEvent::Started(run, Instant::now()))
2816 .unwrap();
2817 if run == 1 {
2818 let (released, condvar) = &*delay_task_gate;
2819 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2820 while !*released {
2821 released = condvar
2822 .wait(released)
2823 .unwrap_or_else(|poison| poison.into_inner());
2824 }
2825 delay_tx
2826 .send(TimerEvent::Completed(run, Instant::now()))
2827 .unwrap();
2828 }
2829 });
2830 let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2831 TimerEvent::Started(1, at) => at,
2832 other => panic!("fixed-delay first task: unexpected event {other:?}"),
2833 };
2834 assert!(wait_until(Duration::from_secs(20), || {
2835 delay_first_started.elapsed() >= overrun
2836 }));
2837 release(&delay_gate);
2838 let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2839 TimerEvent::Completed(1, at) => at,
2840 other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2841 };
2842 let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2843 TimerEvent::Started(2, at) => at,
2844 other => panic!("fixed-delay second task: unexpected event {other:?}"),
2845 };
2846 fixed_delay.cancel();
2847 delay_materializer.shutdown();
2848
2849 let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2850 let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2851 let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2852 let delay_gap = delay_second_started.duration_since(delay_first_completed);
2853 assert!(
2854 rate_task_time >= interval,
2855 "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2856 );
2857 assert!(
2858 rate_catch_up < interval,
2859 "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2860 );
2861 assert!(
2862 delay_task_time >= interval,
2863 "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2864 );
2865 assert!(
2866 delay_gap >= interval,
2867 "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2868 );
2869 }
2870
2871 #[test]
2872 fn runtime_repeating_timer_cancellation_stops_future_fires() {
2873 let materializer = Materializer::new();
2874 let (tx, rx) = mpsc::channel();
2875 let timer = materializer.schedule_at_fixed_rate(
2876 Duration::from_millis(1),
2877 Duration::from_millis(30),
2878 move || {
2879 tx.send(()).unwrap();
2880 },
2881 );
2882
2883 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2884 assert!(timer.cancel());
2885 assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2886 materializer.shutdown();
2887 }
2888
2889 #[test]
2890 fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2891 let materializer = Materializer::new();
2892 materializer.schedule_once(Duration::from_millis(1), || {
2893 panic!("timer boom");
2894 });
2895
2896 let (tx, rx) = mpsc::channel();
2897 materializer.schedule_once(Duration::from_millis(20), move || {
2898 tx.send(()).unwrap();
2899 });
2900
2901 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2902 materializer.shutdown();
2903 }
2904
2905 #[test]
2906 fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2907 let materializer = Materializer::new();
2908 let panic_count = StdArc::new(StdAtomicUsize::new(0));
2909 let panic_count_task = StdArc::clone(&panic_count);
2910 materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2911 panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2912 panic!("fixed-rate boom");
2913 });
2914
2915 assert!(wait_until(Duration::from_millis(150), || {
2916 panic_count.load(StdOrdering::SeqCst) == 1
2917 }));
2918
2919 let (tx, rx) = mpsc::channel();
2920 materializer.schedule_once(Duration::from_millis(30), move || {
2921 tx.send(()).unwrap();
2922 });
2923 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2924
2925 thread::sleep(Duration::from_millis(90));
2926 assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2927 materializer.shutdown();
2928 }
2929
2930 #[test]
2931 fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2932 let materializer = Materializer::new();
2933 let started = StdArc::new(StdAtomicBool::new(false));
2934 let started_task = StdArc::clone(&started);
2935 let slow_timer = materializer.schedule_at_fixed_rate(
2936 Duration::ZERO,
2937 Duration::from_millis(250),
2938 move || {
2939 started_task.store(true, StdOrdering::SeqCst);
2940 thread::sleep(Duration::from_millis(200));
2941 },
2942 );
2943
2944 assert!(wait_until(Duration::from_millis(100), || {
2945 started.load(StdOrdering::SeqCst)
2946 }));
2947
2948 let start = Instant::now();
2949 let (tx, rx) = mpsc::channel();
2950 materializer.schedule_once(Duration::from_millis(10), move || {
2951 tx.send(Instant::now()).unwrap();
2952 });
2953 let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2954 let elapsed = fired_at.duration_since(start);
2955
2956 slow_timer.cancel();
2957 materializer.shutdown();
2958 assert!(
2959 elapsed < Duration::from_millis(150),
2960 "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2961 );
2962 }
2963
2964 #[test]
2965 fn runtime_shutdown_stops_timer_driver_thread() {
2966 let materializer = Materializer::new();
2967 assert!(wait_until(Duration::from_secs(1), || materializer
2968 .timer_driver_is_live()));
2969
2970 materializer.shutdown();
2971 assert!(wait_until(Duration::from_secs(2), || !materializer
2972 .timer_driver_is_live()));
2973 }
2974
2975 #[test]
2976 fn runtime_timer_driver_orders_many_timers_by_deadline() {
2977 let materializer = Materializer::new();
2978 let (tx, rx) = mpsc::channel();
2979 let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2980
2981 for (delay_ms, value) in schedule {
2982 let tx = tx.clone();
2983 materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2984 tx.send(value).unwrap();
2985 });
2986 }
2987 drop(tx);
2988
2989 let mut received = Vec::new();
2990 for _ in 0..schedule.len() {
2991 received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2992 }
2993 materializer.shutdown();
2994
2995 assert_eq!(received, vec![1, 2, 3, 4, 5]);
2996 }
2997
2998 #[test]
2999 fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
3000 let materializer = Materializer::new();
3001 let thread_name = materializer.timer_thread_name().to_owned();
3002 let linux_thread_name = thread_name.chars().take(15).collect::<String>();
3008 assert!(wait_until(Duration::from_secs(5), || {
3009 materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
3010 }));
3011 let live_timer_threads = linux_thread_count(&linux_thread_name);
3012
3013 for _ in 0..128 {
3014 materializer.schedule_once(Duration::from_secs(60), || {});
3015 }
3016
3017 assert!(
3018 wait_until(Duration::from_secs(5), || {
3019 materializer.timer_driver_is_live()
3020 && linux_thread_count(&linux_thread_name) == live_timer_threads
3021 }),
3022 "scheduling timers should not create extra timer threads for a runtime",
3023 );
3024 materializer.shutdown();
3025 assert!(wait_until(Duration::from_secs(5), || {
3026 !materializer.timer_driver_is_live()
3027 && linux_thread_count(&linux_thread_name) < live_timer_threads
3028 }));
3029 }
3030
3031 #[test]
3032 fn cancelled_and_never_sinks_have_distinct_materialization_results() {
3033 assert_eq!(
3034 Source::repeat(1)
3035 .run_with(Sink::cancelled())
3036 .expect("cancelled sink materializes"),
3037 NotUsed
3038 );
3039 assert_eq!(
3040 Source::single(1)
3041 .run_with(Sink::never())
3042 .expect("never sink materializes")
3043 .try_wait(),
3044 None
3045 );
3046 }
3047
3048 #[test]
3049 fn never_sink_finishes_on_materializer_shutdown() {
3050 let materializer = Materializer::new();
3051 let completion = Source::single(1)
3052 .run_with_materializer(Sink::never(), &materializer)
3053 .unwrap();
3054
3055 materializer.shutdown();
3056 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
3057 }
3058
3059 #[test]
3060 fn dropping_source_never_completion_releases_parked_worker() {
3061 let materializer = Materializer::new();
3062 let completion = Source::<i32>::never()
3063 .run_with_materializer(Sink::ignore(), &materializer)
3064 .unwrap();
3065
3066 assert!(wait_until(StdDuration::from_secs(1), || {
3067 materializer.active_streams() == 1
3068 }));
3069 assert_eq!(materializer.active_streams(), 1);
3070
3071 drop(completion);
3072
3073 assert!(wait_until(StdDuration::from_secs(15), || {
3074 materializer.active_streams() == 0
3075 }));
3076 assert_eq!(materializer.active_streams(), 0);
3077 }
3078
3079 #[test]
3080 fn future_and_maybe_sources_emit_values() {
3081 let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
3082 assert_eq!(future_value, vec![7]);
3083
3084 let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
3085 .run_collect()
3086 .unwrap();
3087 assert_eq!(future_source, vec![1, 2, 3]);
3088
3089 let (handle, source) = Source::maybe();
3090 assert_eq!(
3091 source.clone().run_collect(),
3092 Err(StreamError::MaybeIncomplete)
3093 );
3094 handle.complete(9).unwrap();
3095 assert_eq!(source.run_collect().unwrap(), vec![9]);
3096 }
3097
3098 #[test]
3099 fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
3100 assert_eq!(
3101 Source::cycle(|| [1, 2, 3].into_iter())
3102 .take(8)
3103 .run_collect()
3104 .unwrap(),
3105 vec![1, 2, 3, 1, 2, 3, 1, 2]
3106 );
3107 assert_eq!(
3108 Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
3109 Err(StreamError::Failed("empty iterator".into()))
3110 );
3111 assert_eq!(
3112 Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
3113 .run_collect()
3114 .unwrap(),
3115 vec![0, 1, 2, 3]
3116 );
3117 assert_eq!(
3118 Source::unfold_async(0, |state| async move {
3119 Ok((state < 4).then_some((state + 1, state * 2)))
3120 })
3121 .run_collect()
3122 .unwrap(),
3123 vec![0, 2, 4, 6]
3124 );
3125 assert!(matches!(
3126 Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
3127 Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
3128 ));
3129 }
3130
3131 #[test]
3132 fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
3133 let created = StdArc::new(StdAtomicUsize::new(0));
3134 let created_for_source = StdArc::clone(&created);
3135 let source = Source::<i32>::lazy_source(move || {
3136 created_for_source.fetch_add(1, StdOrdering::SeqCst);
3137 Source::from_iter([7, 8]).map_materialized_value(|_| 99)
3138 });
3139 let materializer = Materializer::new();
3140 let (mut stream, mut mat) = StdArc::clone(&source.factory)
3141 .create(&materializer)
3142 .unwrap();
3143
3144 assert_eq!(created.load(StdOrdering::SeqCst), 0);
3145 assert!(mat.try_wait().is_none());
3146 assert_eq!(stream.next().unwrap().unwrap(), 7);
3147 assert_eq!(mat.wait().unwrap(), 99);
3148 assert_eq!(created.load(StdOrdering::SeqCst), 1);
3149 assert_eq!(stream.next().unwrap().unwrap(), 8);
3150
3151 let never_created = StdArc::new(StdAtomicUsize::new(0));
3152 let never_created_for_source = StdArc::clone(&never_created);
3153 let mat = Source::<i32>::lazy_future_source(move || {
3154 never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
3155 async { Ok(Source::single(1)) }
3156 })
3157 .to(Sink::cancelled())
3158 .run()
3159 .unwrap();
3160 assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
3161 assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
3162
3163 let lazy_future = StdArc::new(StdAtomicUsize::new(0));
3164 let lazy_future_for_source = StdArc::clone(&lazy_future);
3165 let source = Source::lazy_future(move || {
3166 lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
3167 async { Ok(42) }
3168 });
3169 let (mut stream, _) = StdArc::clone(&source.factory)
3170 .create(&Materializer::new())
3171 .unwrap();
3172 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
3173 assert_eq!(stream.next().unwrap().unwrap(), 42);
3174 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
3175 }
3176
3177 #[test]
3178 fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
3179 let closed = StdArc::new(StdAtomicUsize::new(0));
3180 let closed_on_complete = StdArc::clone(&closed);
3181 let values = Source::unfold_resource(
3182 || Ok(std::collections::VecDeque::from([1, 2, 3])),
3183 |items| Ok(items.pop_front()),
3184 move |_items| {
3185 closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
3186 Ok(())
3187 },
3188 )
3189 .run_collect()
3190 .unwrap();
3191 assert_eq!(values, vec![1, 2, 3]);
3192 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3193
3194 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3195 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3196 let failed = Source::<i32>::unfold_resource(
3197 || Ok(()),
3198 |_| Err(StreamError::Failed("read".into())),
3199 move |_| {
3200 closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
3201 Err(StreamError::Failed("close".into()))
3202 },
3203 )
3204 .run_collect();
3205 assert_eq!(failed, Err(StreamError::Failed("read".into())));
3206 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3207
3208 let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
3209 let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
3210 let first = Source::unfold_resource(
3211 || Ok(0_usize),
3212 |next| {
3213 let item = *next;
3214 *next += 1;
3215 Ok(Some(item))
3216 },
3217 move |_| {
3218 closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
3219 Ok(())
3220 },
3221 )
3222 .run_with(Sink::head())
3223 .unwrap();
3224 assert_eq!(first.wait().unwrap(), 0);
3225 assert!(wait_until(Duration::from_millis(250), || {
3226 closed_on_cancel.load(StdOrdering::SeqCst) == 1
3227 }));
3228 }
3229
3230 #[test]
3231 fn wp6b_async_resource_and_async_accumulators_are_sequential() {
3232 let closed = StdArc::new(StdAtomicUsize::new(0));
3233 let closed_for_close = StdArc::clone(&closed);
3234 let values = Source::unfold_resource_async(
3235 || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
3236 |items| {
3237 let item = items.pop_front();
3238 async move { Ok(item) }
3239 },
3240 move |_items| {
3241 let closed = StdArc::clone(&closed_for_close);
3242 async move {
3243 closed.fetch_add(1, StdOrdering::SeqCst);
3244 Ok(())
3245 }
3246 },
3247 )
3248 .run_collect()
3249 .unwrap();
3250 assert_eq!(values, vec![1, 2, 3]);
3251 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3252
3253 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3254 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3255 let failed = Source::<i32>::unfold_resource_async(
3256 || async { Ok(()) },
3257 |_resource| async { Err(StreamError::Failed("read".into())) },
3258 move |_resource| {
3259 let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
3260 async move {
3261 closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
3262 Err(StreamError::Failed("close".into()))
3263 }
3264 },
3265 )
3266 .run_collect();
3267 assert_eq!(failed, Err(StreamError::Failed("read".into())));
3268 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3269
3270 let active = StdArc::new(StdAtomicUsize::new(0));
3271 let max_active = StdArc::new(StdAtomicUsize::new(0));
3272 let active_for_stage = StdArc::clone(&active);
3273 let max_for_stage = StdArc::clone(&max_active);
3274 let scanned = Source::from_iter(1..=4)
3275 .scan_async(0, move |acc, item| {
3276 let active = StdArc::clone(&active_for_stage);
3277 let max_active = StdArc::clone(&max_for_stage);
3278 async move {
3279 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3280 max_active.fetch_max(now, StdOrdering::SeqCst);
3281 tokio::time::sleep(Duration::from_millis(1)).await;
3282 active.fetch_sub(1, StdOrdering::SeqCst);
3283 Ok(acc + item)
3284 }
3285 })
3286 .run_collect()
3287 .unwrap();
3288 assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
3289 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3290
3291 let folded = Source::from_iter(1..=4)
3292 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3293 .run_collect()
3294 .unwrap();
3295 assert_eq!(folded, vec![10]);
3296 }
3297
3298 #[test]
3299 fn wp6b_fold_async_materialization_does_not_drain_upstream() {
3300 let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
3301 let started = StdArc::new(StdAtomicBool::new(false));
3302 let source = {
3303 let release = StdArc::clone(&release);
3304 let started = StdArc::clone(&started);
3305 Source::from_factory(move || {
3306 let release = StdArc::clone(&release);
3307 let started = StdArc::clone(&started);
3308 let mut emitted = false;
3309 Box::new(std::iter::from_fn(move || {
3310 if emitted {
3311 return None;
3312 }
3313 emitted = true;
3314 started.store(true, StdOrdering::SeqCst);
3315 let (released, available) = &*release;
3316 let mut released = released.lock().unwrap();
3317 while !*released {
3318 released = available.wait(released).unwrap();
3319 }
3320 Some(Ok(1))
3321 }))
3322 })
3323 };
3324
3325 let (materialized_tx, materialized_rx) = mpsc::channel();
3326 let join = thread::spawn(move || {
3327 let queue = source
3328 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3329 .run_with(Sink::queue())
3330 .unwrap();
3331 materialized_tx.send(queue).unwrap();
3332 });
3333
3334 let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3335 Ok(queue) => queue,
3336 Err(error) => {
3337 let (released, available) = &*release;
3338 *released.lock().unwrap() = true;
3339 available.notify_all();
3340 let _ = join.join();
3341 panic!("fold_async materialization did not return before first pull: {error}");
3342 }
3343 };
3344 let (released, _) = &*release;
3345 assert!(
3346 !*released.lock().unwrap(),
3347 "test source was released before materialization returned"
3348 );
3349
3350 let (released, available) = &*release;
3351 *released.lock().unwrap() = true;
3352 available.notify_all();
3353 assert_eq!(queue.pull().unwrap(), Some(1));
3354 assert_eq!(queue.pull().unwrap(), None);
3355 join.join().unwrap();
3356 assert!(started.load(StdOrdering::SeqCst));
3357 }
3358
3359 #[test]
3360 fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3361 let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3362 let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3363 let empty_sink = Source::<i32>::empty()
3364 .run_with(Sink::lazy_sink(move || {
3365 lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3366 Sink::ignore()
3367 }))
3368 .unwrap();
3369 assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3370 assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3371
3372 let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3373 let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3374 Source::from_iter([1_usize, 2, 3])
3375 .run_with(Sink::foreach_async(2, move |item| {
3376 let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3377 async move {
3378 foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3379 Ok(())
3380 }
3381 }))
3382 .unwrap()
3383 .wait()
3384 .unwrap();
3385 assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3386
3387 let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3388 let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3389 let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3390 lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3391 Flow::identity()
3392 .map(|item: i32| item + 10)
3393 .map_materialized_value(|_| 123)
3394 });
3395 let mat = (lazy_flow.materialize)().unwrap();
3396 let mut stream = match lazy_flow.transform {
3397 flow::FlowTransform::Runtime(transform) => {
3398 transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3399 }
3400 flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3401 };
3402 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3403 assert_eq!(stream.next().unwrap().unwrap(), 11);
3404 assert_eq!(mat.wait().unwrap(), 123);
3405 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3406 assert_eq!(stream.next().unwrap().unwrap(), 12);
3407
3408 let future_flow = Source::from_iter([1, 2])
3409 .via_mat(
3410 Flow::future_flow(|| async {
3411 Ok(Flow::identity()
3412 .map(|item: i32| item * 2)
3413 .map_materialized_value(|_| 77))
3414 }),
3415 Keep::right,
3416 )
3417 .to_mat(Sink::collect(), Keep::both)
3418 .run()
3419 .unwrap();
3420 assert_eq!(future_flow.0.wait().unwrap(), 77);
3421 assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3422 }
3423
3424 #[test]
3425 fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3426 for round in 0..50 {
3431 let counter = StdArc::new(StdAtomicUsize::new(1));
3432 let factory_counter = StdArc::clone(&counter);
3433 let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3434 let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3435 Flow::identity()
3436 .map(move |x: usize| x * 100 + id)
3437 .map_materialized_value(move |_| id)
3438 });
3439 let lazy_again = lazy.clone();
3440
3441 let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3442 .via_mat(lazy, Keep::right)
3443 .via_mat(lazy_again, Keep::both)
3444 .to_mat(Sink::collect(), Keep::both)
3445 .run()
3446 .unwrap();
3447
3448 let first_id = first_mat.wait().unwrap();
3449 let second_id = second_mat.wait().unwrap();
3450 let element = out.wait().unwrap()[0];
3451 assert_eq!(
3452 element,
3453 first_id * 100 + second_id,
3454 "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3455 );
3456 assert_ne!(
3457 first_id, second_id,
3458 "round {round}: same factory instance paired twice"
3459 );
3460 }
3461 }
3462
3463 #[test]
3464 fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3465 for _ in 0..20 {
3466 let next_id = StdArc::new(StdAtomicUsize::new(0));
3467 let next_id_for_factory = StdArc::clone(&next_id);
3468 let flow = Flow::<i32, i32>::lazy_flow(move || {
3469 let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3470 Flow::identity()
3471 .map(move |item: i32| item + (id as i32 * 100))
3472 .map_materialized_value(move |_| id)
3473 });
3474 let barrier = StdArc::new(std::sync::Barrier::new(3));
3475
3476 let spawn_materialization = |input: i32| {
3477 let flow = flow.clone();
3478 let barrier = StdArc::clone(&barrier);
3479 thread::spawn(move || {
3480 barrier.wait();
3481 let (mat, values) = Source::single(input)
3482 .via_mat(flow, Keep::right)
3483 .to_mat(Sink::collect(), Keep::both)
3484 .run()
3485 .unwrap();
3486 (input, mat.wait().unwrap(), values.wait().unwrap())
3487 })
3488 };
3489
3490 let first = spawn_materialization(1);
3491 let second = spawn_materialization(2);
3492 barrier.wait();
3493
3494 for result in [first.join().unwrap(), second.join().unwrap()] {
3495 let (input, mat_id, values) = result;
3496 assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3497 }
3498 assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3499 }
3500 }
3501
3502 #[test]
3503 fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3504 let queue = Source::from_factory(|| {
3505 Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3506 })
3507 .map_with_resource(
3508 || Ok(()),
3509 |_resource, item| Ok(item + 10),
3510 |_resource| Ok(Some(99)),
3511 )
3512 .run_with(Sink::queue())
3513 .unwrap();
3514
3515 assert_eq!(queue.pull().unwrap(), Some(11));
3516 assert_eq!(queue.pull().unwrap(), Some(99));
3517 assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3518
3519 let failed: StreamResult<Vec<i32>> = Source::single(1)
3520 .map_with_resource(
3521 || Ok(()),
3522 |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3523 |_resource| -> StreamResult<Option<i32>> {
3524 Err(StreamError::Failed("close".into()))
3525 },
3526 )
3527 .run_collect();
3528 assert_eq!(failed, Err(StreamError::Failed("map".into())));
3529 }
3530
3531 #[test]
3532 fn stateful_and_terminal_source_operators_work() {
3533 let stateful = Source::from_iter([1, 2, 3])
3534 .stateful_map(0, |sum, item| {
3535 *sum += item;
3536 *sum
3537 })
3538 .run_collect()
3539 .unwrap();
3540 assert_eq!(stateful, vec![1, 3, 6]);
3541
3542 let concat = Source::from_iter([1, 2, 3])
3543 .stateful_map_concat(0, |sum, item| {
3544 *sum += item;
3545 [item, *sum]
3546 })
3547 .run_collect()
3548 .unwrap();
3549 assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3550
3551 assert_eq!(
3552 Source::from_iter([1, 2, 3])
3553 .fold(10, |acc, item| acc + item)
3554 .run_collect()
3555 .unwrap(),
3556 vec![16]
3557 );
3558 assert_eq!(
3559 Source::from_iter([1, 2, 3])
3560 .reduce(|acc, item| acc + item)
3561 .run_collect()
3562 .unwrap(),
3563 vec![6]
3564 );
3565 }
3566
3567 #[test]
3568 fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3569 let concat = Source::single(())
3570 .map_concat(|_| 0_u64..)
3571 .take(1)
3572 .run_collect()
3573 .unwrap();
3574 assert_eq!(concat, vec![0]);
3575
3576 let sliding = Source::repeat(1_u64)
3577 .sliding(2, 1)
3578 .take(1)
3579 .run_collect()
3580 .unwrap();
3581 assert_eq!(sliding, vec![vec![1, 1]]);
3582 }
3583
3584 #[test]
3585 fn fan_in_source_operators_follow_ordering_rules() {
3586 assert_eq!(
3587 Source::from_iter([1, 2])
3588 .concat(Source::from_iter([3, 4]))
3589 .run_collect()
3590 .unwrap(),
3591 vec![1, 2, 3, 4]
3592 );
3593 assert_eq!(
3594 Source::from_iter([3, 4])
3595 .prepend(Source::from_iter([1, 2]))
3596 .run_collect()
3597 .unwrap(),
3598 vec![1, 2, 3, 4]
3599 );
3600 assert_eq!(
3601 Source::empty()
3602 .or_else(Source::from_iter([10, 20]))
3603 .run_collect()
3604 .unwrap(),
3605 vec![10, 20]
3606 );
3607 assert_eq!(
3608 Source::from_iter([1, 2])
3609 .or_else(Source::from_iter([10, 20]))
3610 .run_collect()
3611 .unwrap(),
3612 vec![1, 2]
3613 );
3614 assert_eq!(
3615 Source::from_iter([1, 2, 3])
3616 .interleave(Source::from_iter([10, 11, 12]), 2)
3617 .run_collect()
3618 .unwrap(),
3619 vec![1, 2, 10, 11, 3, 12]
3620 );
3621 }
3622
3623 #[test]
3624 fn fan_in_flow_operators_compose_with_primary_stream() {
3625 let concat = Source::from_iter([1, 2])
3626 .via(Flow::identity().concat(Source::from_iter([3, 4])))
3627 .run_collect()
3628 .unwrap();
3629 assert_eq!(concat, vec![1, 2, 3, 4]);
3630
3631 let prepend = Source::from_iter([3, 4])
3632 .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3633 .run_collect()
3634 .unwrap();
3635 assert_eq!(prepend, vec![1, 2, 3, 4]);
3636
3637 let interleave = Source::from_iter([1, 2, 3])
3638 .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3639 .run_collect()
3640 .unwrap();
3641 assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3642
3643 let merge_sorted = Source::from_iter([1, 4])
3644 .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3645 .run_collect()
3646 .unwrap();
3647 assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3648
3649 let zip_latest = Source::from_iter([1, 2])
3650 .via(Flow::identity().zip_latest(Source::single(10)))
3651 .run_collect()
3652 .unwrap();
3653 assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3654
3655 let zip_latest_with = Source::from_iter([1, 2])
3656 .via(
3657 Flow::identity()
3658 .zip_latest_with(Source::single(10), false, |left, right| left + right),
3659 )
3660 .run_collect()
3661 .unwrap();
3662 assert_eq!(zip_latest_with, vec![11, 12]);
3663 }
3664
3665 #[test]
3666 fn fan_in_operators_propagate_errors_and_eager_close() {
3667 assert!(matches!(
3668 Source::failed(StreamError::Failed("boom".into()))
3669 .or_else(Source::from_iter([1, 2]))
3670 .run_collect(),
3671 Err(StreamError::Failed(_))
3672 ));
3673 assert!(matches!(
3674 Source::from_iter([1, 2])
3675 .prepend(Source::failed(StreamError::Failed("boom".into())))
3676 .run_collect(),
3677 Err(StreamError::Failed(_))
3678 ));
3679 assert_eq!(
3680 Source::from_iter([1, 2])
3681 .interleave_all([Source::empty()], 1, true)
3682 .run_collect()
3683 .unwrap(),
3684 vec![1]
3685 );
3686 }
3687
3688 #[test]
3689 fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3690 use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3691
3692 let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3693 AtomicUsize::new(0),
3694 AtomicUsize::new(0),
3695 AtomicUsize::new(0),
3696 ]);
3697
3698 let make_source = |idx: usize| {
3699 let pulls = Arc::clone(&pulls);
3700 Source::from_materialized_factory(move |_| {
3701 let pulls = Arc::clone(&pulls);
3702 let mut emitted = false;
3703 Ok((
3704 Box::new(std::iter::from_fn(move || {
3705 pulls[idx].fetch_add(1, Ordering::SeqCst);
3706 if !emitted && idx == 0 {
3707 emitted = true;
3708 Some(Ok(42))
3709 } else {
3710 None
3711 }
3712 })) as BoxStream<i32>,
3713 NotUsed,
3714 ))
3715 })
3716 };
3717
3718 let result = make_source(0)
3719 .interleave_all([make_source(1), make_source(2)], 1, false)
3720 .run_with(Sink::head());
3721
3722 assert_eq!(wait(result.unwrap()), 42);
3723 assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3724 assert_eq!(
3725 pulls[1].load(Ordering::SeqCst),
3726 0,
3727 "second input should not be pulled when downstream cancels after first element"
3728 );
3729 assert_eq!(
3730 pulls[2].load(Ordering::SeqCst),
3731 0,
3732 "third input should not be pulled before its turn"
3733 );
3734 }
3735
3736 #[test]
3737 fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3738 assert_eq!(
3739 Source::from_iter([1, 2, 3, 4])
3740 .interleave_all(
3741 [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3742 1,
3743 false
3744 )
3745 .run_collect()
3746 .unwrap(),
3747 vec![1, 10, 20, 2, 21, 3, 22, 4]
3748 );
3749 }
3750
3751 #[test]
3752 fn remaining_merge_and_zip_family_matches_expected_ordering() {
3753 assert_eq!(
3754 Source::from_iter([1, 4])
3755 .merge_sorted(Source::from_iter([2, 3, 5]))
3756 .run_collect()
3757 .unwrap(),
3758 vec![1, 2, 3, 4, 5]
3759 );
3760
3761 assert_eq!(
3762 Source::from_iter([1, 2])
3763 .merge_latest(Source::single(10), false)
3764 .run_collect()
3765 .unwrap(),
3766 vec![vec![1, 10], vec![2, 10]]
3767 );
3768
3769 assert_eq!(
3770 Source::from_iter([1, 2, 3])
3771 .merge_all([Source::from_iter([10, 11])], false)
3772 .run_collect()
3773 .unwrap(),
3774 vec![1, 10, 2, 11, 3]
3775 );
3776
3777 assert_eq!(
3778 Source::from_iter([1, 2, 3])
3779 .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3780 .run_collect()
3781 .unwrap(),
3782 vec![11, 13, 15]
3783 );
3784
3785 assert_eq!(
3786 Source::from_iter([1, 2])
3787 .zip_latest(Source::single(10))
3788 .run_collect()
3789 .unwrap(),
3790 vec![(1, 10), (2, 10)]
3791 );
3792
3793 assert_eq!(
3794 Source::from_iter([1, 2, 3])
3795 .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3796 .run_collect()
3797 .unwrap(),
3798 vec![11, 12, 13]
3799 );
3800
3801 assert_eq!(
3802 Source::from_iter([1, 2])
3803 .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3804 .run_collect()
3805 .unwrap(),
3806 vec![(1, 10), (2, 11), (-1, 12)]
3807 );
3808
3809 assert_eq!(
3810 Source::from_iter([5, 6, 7])
3811 .zip_with_index()
3812 .run_collect()
3813 .unwrap(),
3814 vec![(5, 0), (6, 1), (7, 2)]
3815 );
3816
3817 assert_eq!(
3818 Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3819 .run_collect()
3820 .unwrap(),
3821 vec![vec![1, 10], vec![2, 20]]
3822 );
3823
3824 assert_eq!(
3825 Source::zip_with_n(
3826 [
3827 Source::from_iter([1, 2]),
3828 Source::from_iter([10, 20]),
3829 Source::from_iter([100, 200]),
3830 ],
3831 |values| values.into_iter().sum::<i32>(),
3832 )
3833 .run_collect()
3834 .unwrap(),
3835 vec![111, 222]
3836 );
3837
3838 assert_eq!(
3839 Source::merge_prioritized_n(
3840 [
3841 (Source::from_iter([1, 2, 3, 4]), 2),
3842 (Source::from_iter([10, 11]), 1),
3843 ],
3844 false,
3845 )
3846 .run_collect()
3847 .unwrap(),
3848 vec![1, 2, 10, 3, 4, 11]
3849 );
3850
3851 assert_eq!(
3852 Source::combine(
3853 Source::from_iter([1, 2, 3]),
3854 Source::from_iter([10, 11]),
3855 std::iter::empty::<Source<i32, NotUsed>>(),
3856 SourceCombineStrategy::Merge {
3857 eager_complete: false,
3858 },
3859 )
3860 .run_collect()
3861 .unwrap(),
3862 vec![1, 10, 2, 11, 3]
3863 );
3864
3865 let combined_sink = Sink::combine(
3866 Sink::ignore(),
3867 Sink::ignore(),
3868 std::iter::empty::<Sink<i32, NotUsed>>(),
3869 SinkCombineStrategy::Broadcast,
3870 );
3871 assert_eq!(
3872 Source::from_iter([1, 2, 3])
3873 .run_with(combined_sink)
3874 .unwrap(),
3875 NotUsed
3876 );
3877 }
3878
3879 #[test]
3880 fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3881 let first_count = StdArc::new(StdAtomicUsize::new(0));
3885 let second_count = StdArc::new(StdAtomicUsize::new(0));
3886 let first_counter = StdArc::clone(&first_count);
3887 let second_counter = StdArc::clone(&second_count);
3888 let combined = Sink::combine(
3889 Sink::foreach(move |_: i32| {
3890 first_counter.fetch_add(1, StdOrdering::SeqCst);
3891 }),
3892 Sink::foreach(move |_: i32| {
3893 second_counter.fetch_add(1, StdOrdering::SeqCst);
3894 }),
3895 std::iter::empty::<Sink<i32, NotUsed>>(),
3896 SinkCombineStrategy::Broadcast,
3897 );
3898 assert_eq!(
3899 Source::from_iter(0..100).run_with(combined).unwrap(),
3900 NotUsed
3901 );
3902 assert!(wait_until(StdDuration::from_secs(1), || {
3907 first_count.load(StdOrdering::SeqCst) == 100
3908 && second_count.load(StdOrdering::SeqCst) == 100
3909 }));
3910 }
3911
3912 #[test]
3913 fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3914 assert_eq!(
3918 Source::from_iter(std::iter::empty::<i32>())
3919 .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3920 .run_collect()
3921 .unwrap(),
3922 Vec::<i32>::new()
3923 );
3924 assert_eq!(
3925 Source::repeat(10)
3926 .zip_latest_with(
3927 Source::from_iter(std::iter::empty::<i32>()),
3928 false,
3929 |left, right| left + right,
3930 )
3931 .run_collect()
3932 .unwrap(),
3933 Vec::<i32>::new()
3934 );
3935 }
3936
3937 #[test]
3938 fn zip_family_completion_boundaries_match_expected_results() {
3939 assert_eq!(
3940 Source::from_iter([1, 2, 3])
3941 .zip_with(Source::from_iter([10]), |left, right| left + right)
3942 .run_collect()
3943 .unwrap(),
3944 vec![11]
3945 );
3946
3947 assert_eq!(
3948 Source::from_iter([1, 2, 3])
3949 .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3950 .run_collect()
3951 .unwrap(),
3952 vec![11, 12]
3953 );
3954
3955 assert_eq!(
3956 Source::zip_n([
3957 Source::from_iter([1, 2, 3]),
3958 Source::from_iter([10]),
3959 Source::from_iter([100, 200, 300]),
3960 ])
3961 .run_collect()
3962 .unwrap(),
3963 vec![vec![1, 10, 100]]
3964 );
3965 }
3966
3967 #[test]
3968 fn combine_strategies_follow_merge_concat_and_priority_rules() {
3969 assert_eq!(
3970 Source::combine(
3971 Source::from_iter([1, 2]),
3972 Source::from_iter([10, 11]),
3973 [Source::from_iter([100])],
3974 SourceCombineStrategy::Concat,
3975 )
3976 .run_collect()
3977 .unwrap(),
3978 vec![1, 2, 10, 11, 100]
3979 );
3980
3981 assert_eq!(
3982 Source::combine(
3983 Source::from_iter([1, 2, 3, 4]),
3984 Source::from_iter([10, 11]),
3985 std::iter::empty::<Source<i32, NotUsed>>(),
3986 SourceCombineStrategy::Prioritized {
3987 priorities: vec![2, 1],
3988 eager_complete: false,
3989 },
3990 )
3991 .run_collect()
3992 .unwrap(),
3993 vec![1, 2, 10, 3, 4, 11]
3994 );
3995 }
3996
3997 #[test]
3998 fn concat_lazy_defers_follow_on_source_until_needed() {
3999 let source_counter = StdArc::new(StdAtomicUsize::new(0));
4000 let source_counter_clone = StdArc::clone(&source_counter);
4001 let lazy_source = Source::from_materialized_factory(move |_| {
4002 source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
4003 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
4004 });
4005 let source_head = Source::single(1)
4006 .concat_lazy(lazy_source)
4007 .run_with(Sink::head());
4008 assert_eq!(wait(source_head.unwrap()), 1);
4009 assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
4010
4011 let flow_counter = StdArc::new(StdAtomicUsize::new(0));
4012 let flow_counter_clone = StdArc::clone(&flow_counter);
4013 let lazy_flow_source = Source::from_materialized_factory(move |_| {
4014 flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
4015 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
4016 });
4017 let flow_head = Source::single(1)
4018 .via(Flow::identity().concat_lazy(lazy_flow_source))
4019 .run_with(Sink::head());
4020 assert_eq!(wait(flow_head.unwrap()), 1);
4021 assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
4022 }
4023
4024 #[test]
4025 fn also_to_completes_when_side_sink_cancels() {
4026 assert_eq!(
4027 Source::from_iter([1, 2, 3])
4028 .also_to(Sink::cancelled())
4029 .run_collect()
4030 .unwrap(),
4031 Vec::<i32>::new()
4032 );
4033 assert_eq!(
4034 Source::from_iter([1, 2, 3])
4035 .also_to_all([Sink::cancelled(), Sink::cancelled()])
4036 .run_collect()
4037 .unwrap(),
4038 Vec::<i32>::new()
4039 );
4040 }
4041
4042 #[test]
4043 fn also_to_completes_gracefully_when_side_sink_disconnects() {
4044 let result = Source::from_iter(0..100)
4045 .also_to(Sink::head())
4046 .run_collect()
4047 .unwrap();
4048 assert!(!result.is_empty(), "main should emit at least one element");
4049 assert!(
4050 result.len() < 100,
4051 "main should complete early when side disconnects"
4052 );
4053 }
4054
4055 #[test]
4056 fn also_to_propagates_original_error_when_side_is_disconnected() {
4057 let err = StreamError::Failed("distinctive-boom".into());
4058 assert!(matches!(
4059 Source::<i32>::failed(err.clone())
4060 .also_to(Sink::cancelled())
4061 .run_collect(),
4062 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
4063 ));
4064 assert!(matches!(
4065 Source::<i32>::failed(err.clone())
4066 .also_to_all([Sink::cancelled()])
4067 .run_collect(),
4068 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
4069 ));
4070 assert!(matches!(
4071 Source::<i32>::failed(err)
4072 .divert_to(Sink::cancelled(), |_: &i32| true)
4073 .run_collect(),
4074 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
4075 ));
4076 }
4077
4078 #[test]
4079 fn divert_to_routes_matching_elements_to_side_sink() {
4080 let diverted = Source::from_iter([1, 2, 3, 4])
4081 .divert_to(Sink::ignore(), |item| item % 2 == 0)
4082 .run_collect()
4083 .unwrap();
4084 assert_eq!(diverted, vec![1, 3]);
4085 }
4086
4087 #[test]
4088 fn wire_tap_drops_when_side_sink_backpressures() {
4089 let tapped = Source::from_iter([1, 2, 3])
4090 .wire_tap(Sink::head())
4091 .run_collect()
4092 .unwrap();
4093 assert_eq!(tapped, vec![1, 2, 3]);
4094
4095 let tapped_via_flow = Source::from_iter([1, 2, 3])
4096 .via(Flow::identity().wire_tap(Sink::head()))
4097 .run_collect()
4098 .unwrap();
4099 assert_eq!(tapped_via_flow, vec![1, 2, 3]);
4100 }
4101
4102 #[test]
4103 fn async_mapping_variants_complete() {
4104 let ordered = Source::from_iter(0..4)
4105 .map_async(2, |item| async move { Ok(item * 2) })
4106 .run_collect()
4107 .unwrap();
4108 assert_eq!(ordered, vec![0, 2, 4, 6]);
4109
4110 let unordered = Source::from_iter(0..4)
4111 .map_async_unordered(2, |item| async move { Ok(item * 2) })
4112 .run_collect()
4113 .unwrap();
4114 assert_eq!(unordered, vec![0, 2, 4, 6]);
4115
4116 let partitioned = Source::from_iter(0..4)
4117 .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
4118 .run_collect()
4119 .unwrap();
4120 assert_eq!(partitioned, vec![1, 2, 3, 4]);
4121 }
4122
4123 #[test]
4124 fn map_async_ordered_bounds_pulls_behind_stuck_head() {
4125 let pulls = StdArc::new(StdAtomicUsize::new(0));
4126 let pulls_for_source = StdArc::clone(&pulls);
4127 let probe = Source::from_fn_iter(move || {
4128 let pulls = StdArc::clone(&pulls_for_source);
4129 std::iter::from_fn(move || {
4130 let next = pulls.fetch_add(1, StdOrdering::SeqCst);
4131 Some(next)
4132 })
4133 })
4134 .map_async(2, |item| async move {
4135 if item == 0 {
4136 tokio::time::sleep(StdDuration::from_millis(300)).await;
4137 }
4138 Ok(item)
4139 })
4140 .run_with(TestSink::probe())
4141 .unwrap();
4142
4143 probe.request(16);
4144 thread::sleep(StdDuration::from_millis(100));
4145 assert!(
4146 pulls.load(StdOrdering::SeqCst) <= 3,
4147 "pulled {} elements with parallelism=2 behind a stuck ordered head",
4148 pulls.load(StdOrdering::SeqCst)
4149 );
4150 }
4151
4152 #[test]
4153 fn async_mapping_parks_until_woken_future_completes() {
4154 struct WakeOnceFuture {
4155 value: Option<u64>,
4156 ready: StdArc<StdAtomicBool>,
4157 started: bool,
4158 polls: StdArc<StdAtomicUsize>,
4159 latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
4160 }
4161
4162 impl std::future::Future for WakeOnceFuture {
4163 type Output = StreamResult<u64>;
4164
4165 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4166 let this = self.as_mut().get_mut();
4167 this.polls.fetch_add(1, StdOrdering::SeqCst);
4168 *this.latest_waker.lock().expect("latest wake slot mutex") =
4169 Some(cx.waker().clone());
4170 if this.ready.load(StdOrdering::SeqCst) {
4171 return Poll::Ready(Ok(this.value.take().unwrap()));
4172 }
4173
4174 if !this.started {
4175 this.started = true;
4176 let ready = StdArc::clone(&this.ready);
4177 let latest_waker = StdArc::clone(&this.latest_waker);
4178 thread::spawn(move || {
4179 thread::sleep(Duration::from_millis(20));
4180 ready.store(true, StdOrdering::SeqCst);
4181 if let Some(waker) =
4182 latest_waker.lock().expect("latest wake slot mutex").take()
4183 {
4184 waker.wake();
4185 }
4186 });
4187 }
4188 Poll::Pending
4189 }
4190 }
4191
4192 let polls = StdArc::new(StdAtomicUsize::new(0));
4193 let polls_for_stage = StdArc::clone(&polls);
4194 let start = Instant::now();
4195
4196 let values = Source::single(41)
4197 .map_async(1, move |item| WakeOnceFuture {
4198 value: Some(item + 1),
4199 ready: StdArc::new(StdAtomicBool::new(false)),
4200 started: false,
4201 polls: StdArc::clone(&polls_for_stage),
4202 latest_waker: StdArc::new(Mutex::new(None)),
4203 })
4204 .run_collect()
4205 .unwrap();
4206
4207 assert_eq!(values, vec![42]);
4208 let elapsed = start.elapsed();
4209 assert!(
4210 elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
4211 "pending future should park until woken once, elapsed={elapsed:?}"
4212 );
4213 assert!(
4214 polls.load(StdOrdering::SeqCst) < 4096,
4215 "pending future was repolled too aggressively"
4216 );
4217 }
4218
4219 #[test]
4220 fn async_mapping_emits_before_unbounded_upstream_finishes() {
4221 let ordered = Source::repeat(1)
4222 .map_async(2, |item| async move { Ok(item + 1) })
4223 .take(1)
4224 .run_collect()
4225 .unwrap();
4226 assert_eq!(ordered, vec![2]);
4227
4228 let unordered = Source::repeat(1)
4229 .map_async_unordered(2, |item| async move { Ok(item + 1) })
4230 .take(1)
4231 .run_collect()
4232 .unwrap();
4233 assert_eq!(unordered, vec![2]);
4234
4235 let partitioned = Source::repeat(1)
4236 .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
4237 .take(1)
4238 .run_collect()
4239 .unwrap();
4240 assert_eq!(partitioned, vec![2]);
4241 }
4242
4243 #[test]
4244 fn partitioned_async_mapping_limits_same_key_concurrency() {
4245 let active = StdArc::new(StdAtomicUsize::new(0));
4246 let max_active = StdArc::new(StdAtomicUsize::new(0));
4247 let active_for_stage = StdArc::clone(&active);
4248 let max_for_stage = StdArc::clone(&max_active);
4249
4250 let values = Source::from_iter(0..6)
4251 .map_async_partitioned(
4252 4,
4253 1,
4254 |_| 0_u8,
4255 move |item| {
4256 let active = StdArc::clone(&active_for_stage);
4257 let max_active = StdArc::clone(&max_for_stage);
4258 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4259 max_active.fetch_max(current, StdOrdering::SeqCst);
4260 async move {
4261 thread::sleep(Duration::from_millis(1));
4262 active.fetch_sub(1, StdOrdering::SeqCst);
4263 Ok(item)
4264 }
4265 },
4266 )
4267 .run_collect()
4268 .unwrap();
4269
4270 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
4271 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4272 }
4273
4274 #[test]
4275 fn partitioned_async_mapping_scans_past_blocked_pending_key() {
4276 let active = StdArc::new(StdAtomicUsize::new(0));
4277 let max_active = StdArc::new(StdAtomicUsize::new(0));
4278 let active_for_stage = StdArc::clone(&active);
4279 let max_for_stage = StdArc::clone(&max_active);
4280 let (release_tx, release_rx) = oneshot::channel::<()>();
4281 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
4282 let release_rx_for_stage = StdArc::clone(&release_rx);
4283 let max_for_release = StdArc::clone(&max_active);
4284
4285 let releaser = thread::spawn(move || {
4286 let deadline = Instant::now() + StdDuration::from_secs(1);
4287 while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
4288 thread::yield_now();
4289 }
4290 let _ = release_tx.send(());
4291 });
4292
4293 let values = Source::from_iter([0, 2, 1])
4294 .map_async_partitioned(
4295 2,
4296 1,
4297 |item| item % 2,
4298 move |item| {
4299 let active = StdArc::clone(&active_for_stage);
4300 let max_active = StdArc::clone(&max_for_stage);
4301 let release_rx = StdArc::clone(&release_rx_for_stage);
4302 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4303 max_active.fetch_max(current, StdOrdering::SeqCst);
4304 async move {
4305 if item == 0 {
4306 let receiver = release_rx
4307 .lock()
4308 .expect("release receiver mutex")
4309 .take()
4310 .expect("release receiver present");
4311 let _ = receiver.await;
4312 }
4313 active.fetch_sub(1, StdOrdering::SeqCst);
4314 Ok(item)
4315 }
4316 },
4317 )
4318 .run_collect()
4319 .unwrap();
4320 releaser.join().unwrap();
4321
4322 assert_eq!(values, vec![0, 2, 1]);
4323 assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
4324 }
4325
4326 #[test]
4327 fn partitioned_async_mapping_p1_still_evaluates_partition() {
4328 let partitions = StdArc::new(StdAtomicUsize::new(0));
4329 let partitions_for_stage = StdArc::clone(&partitions);
4330
4331 let values = Source::from_iter(0..8)
4332 .map_async_partitioned(
4333 1,
4334 1,
4335 move |item| {
4336 partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4337 item % 2
4338 },
4339 |item| async move { Ok(item + 1) },
4340 )
4341 .run_collect()
4342 .unwrap();
4343
4344 assert_eq!(values, (1..9).collect::<Vec<_>>());
4345 assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4346 }
4347
4348 #[test]
4349 fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4350 let active_by_key =
4351 StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4352 let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4353 let active_for_stage = StdArc::clone(&active_by_key);
4354 let max_for_stage = StdArc::clone(&max_by_key);
4355
4356 let values = Source::from_iter(0..512_usize)
4357 .map_async_partitioned(
4358 32,
4359 1,
4360 |item| item % 16,
4361 move |item| {
4362 let active = StdArc::clone(&active_for_stage);
4363 let max_active = StdArc::clone(&max_for_stage);
4364 let key = item % 16;
4365 let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4366 max_active[key].fetch_max(current, StdOrdering::SeqCst);
4367 async move {
4368 active[key].fetch_sub(1, StdOrdering::SeqCst);
4369 Ok(item)
4370 }
4371 },
4372 )
4373 .run_collect()
4374 .unwrap();
4375
4376 assert_eq!(values, (0..512).collect::<Vec<_>>());
4377 for max_active in max_by_key.iter() {
4378 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4379 }
4380 }
4381
4382 #[test]
4383 fn error_operators_map_recover_and_complete() {
4384 let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4385 .map_error(|_| StreamError::Failed("mapped".into()))
4386 .run_collect();
4387 assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4388
4389 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4390 .recover(|error| match error {
4391 StreamError::Failed(_) => Some(42),
4392 _ => None,
4393 })
4394 .run_collect()
4395 .unwrap();
4396 assert_eq!(recovered, vec![42]);
4397
4398 let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4399 .recover(|_| None)
4400 .run_collect();
4401 assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4402
4403 let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4404 .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4405 .run_collect()
4406 .unwrap();
4407 assert_eq!(recovered_with, vec![1, 2]);
4408
4409 let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4410 .recover_with_retries(1, |_| None)
4411 .run_collect();
4412 assert_eq!(
4413 declined_recover_with,
4414 Err(StreamError::Failed("declined".into()))
4415 );
4416
4417 let completed = Source::from_factory(|| {
4418 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4419 })
4420 .on_error_complete()
4421 .run_collect()
4422 .unwrap();
4423 assert_eq!(completed, vec![1]);
4424 }
4425
4426 #[test]
4427 fn sliding_matches_akka_window_semantics() {
4428 assert_eq!(
4430 Source::from_iter(1..=4)
4431 .sliding(3, 1)
4432 .run_collect()
4433 .unwrap(),
4434 vec![vec![1, 2, 3], vec![2, 3, 4]]
4435 );
4436 assert_eq!(
4437 Source::from_iter(1..=4)
4438 .sliding(2, 1)
4439 .run_collect()
4440 .unwrap(),
4441 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4442 );
4443 assert_eq!(
4445 Source::from_iter(1..=3)
4446 .sliding(3, 1)
4447 .run_collect()
4448 .unwrap(),
4449 vec![vec![1, 2, 3]]
4450 );
4451 assert_eq!(
4453 Source::from_iter(1..=2)
4454 .sliding(3, 1)
4455 .run_collect()
4456 .unwrap(),
4457 vec![vec![1, 2]]
4458 );
4459 assert_eq!(
4461 Source::from_iter(1..=3)
4462 .sliding(1, 1)
4463 .run_collect()
4464 .unwrap(),
4465 vec![vec![1], vec![2], vec![3]]
4466 );
4467 assert_eq!(
4469 Source::from_iter(1..=6)
4470 .sliding(2, 3)
4471 .run_collect()
4472 .unwrap(),
4473 vec![vec![1, 2], vec![4, 5]]
4474 );
4475 assert_eq!(
4477 Source::from_iter(1..=3)
4478 .sliding(2, 4)
4479 .run_collect()
4480 .unwrap(),
4481 vec![vec![1, 2]]
4482 );
4483 }
4484
4485 #[test]
4486 fn recover_with_retries_indefinitely_like_akka() {
4487 let attempts = StdArc::new(StdAtomicUsize::new(0));
4488 let attempts_in_stage = StdArc::clone(&attempts);
4489 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4492 .recover_with(move |_error| {
4493 if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4494 Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4495 } else {
4496 Some(Source::from_iter([42]))
4497 }
4498 })
4499 .run_collect()
4500 .unwrap();
4501 assert_eq!(recovered, vec![42]);
4502 assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4503 }
4504
4505 #[test]
4506 fn many_concurrent_streams_do_not_starve_the_pool() {
4507 let materializer = Materializer::new();
4516 let busy = 6_usize;
4517
4518 let mut held = Vec::with_capacity(busy);
4519 for _ in 0..busy {
4520 held.push(
4521 Source::single(1_u64)
4522 .run_with_materializer(Sink::never(), &materializer)
4523 .unwrap(),
4524 );
4525 }
4526
4527 for _ in 0..400 {
4528 if materializer.active_streams() >= busy {
4529 break;
4530 }
4531 thread::sleep(Duration::from_millis(5));
4532 }
4533 assert_eq!(materializer.active_streams(), busy);
4534
4535 let sum = Source::from_iter(0_u64..5)
4538 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4539 .unwrap();
4540 assert_eq!(sum.wait().unwrap(), 10);
4541
4542 materializer.shutdown();
4543 for completion in held {
4544 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4545 }
4546 }
4547}