1use std::{
4 collections::{BTreeMap, HashMap, VecDeque},
5 fmt,
6 future::Future,
7 hash::Hash,
8 marker::PhantomData,
9 panic::{AssertUnwindSafe, catch_unwind},
10 pin::Pin,
11 sync::{
12 Arc, Condvar, Mutex, OnceLock,
13 atomic::{AtomicBool, AtomicUsize, Ordering},
14 },
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23 runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24 task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30 Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type HintedSinkRunner<In, Mat> =
33 dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
34type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
35const STREAM_READY_SPINS: usize = 256;
36const STREAM_SPIN_BACKOFF: usize = 8;
41const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47struct InlineMicroSourceHint {
48 max_success_items: usize,
49}
50
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52struct SourceHints {
53 inline_head_terminal: bool,
54 inline_micro: Option<InlineMicroSourceHint>,
57 terminal_consumer_batch: bool,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub(crate) struct SourceRuntimeHints {
65 pub(crate) inline_micro_max_success_items: Option<usize>,
66 pub(crate) terminal_consumer_batch: bool,
67}
68
69impl SourceHints {
70 const fn with_inline_micro(max_success_items: usize) -> Self {
71 Self {
72 inline_head_terminal: true,
73 inline_micro: Some(InlineMicroSourceHint { max_success_items }),
74 terminal_consumer_batch: true,
75 }
76 }
77
78 const fn with_terminal_consumer_batch() -> Self {
79 Self {
80 inline_head_terminal: false,
81 inline_micro: None,
82 terminal_consumer_batch: true,
83 }
84 }
85
86 fn after_flow(self, flow: FlowHints) -> Self {
87 if flow.preserves_inline_head_terminal {
88 Self {
91 inline_head_terminal: true,
92 inline_micro: None,
93 terminal_consumer_batch: self.terminal_consumer_batch
94 && flow.preserves_terminal_consumer_batch,
95 }
96 } else {
97 Self {
98 inline_head_terminal: false,
99 inline_micro: None,
100 terminal_consumer_batch: self.terminal_consumer_batch
101 && flow.preserves_terminal_consumer_batch,
102 }
103 }
104 }
105
106 fn without_inline_micro(self) -> Self {
107 Self {
108 inline_head_terminal: self.inline_head_terminal,
109 inline_micro: None,
110 terminal_consumer_batch: self.terminal_consumer_batch,
111 }
112 }
113
114 fn runtime(self) -> SourceRuntimeHints {
115 SourceRuntimeHints {
116 inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
117 terminal_consumer_batch: self.terminal_consumer_batch,
118 }
119 }
120}
121
122#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
123struct FlowHints {
124 preserves_inline_head_terminal: bool,
125 preserves_terminal_consumer_batch: bool,
126}
127
128impl FlowHints {
129 const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
130 preserves_inline_head_terminal: true,
131 preserves_terminal_consumer_batch: true,
132 };
133
134 const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
135 preserves_inline_head_terminal: false,
136 preserves_terminal_consumer_batch: true,
137 };
138
139 fn then(self, next: Self) -> Self {
140 Self {
141 preserves_inline_head_terminal: self.preserves_inline_head_terminal
142 && next.preserves_inline_head_terminal,
143 preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
144 && next.preserves_terminal_consumer_batch,
145 }
146 }
147}
148
149struct PartitionSlot<Key, Out> {
150 key: Option<Key>,
151 active: usize,
152 queued: VecDeque<(usize, Out)>,
153 in_ready_queue: bool,
154}
155
156struct AbortOnDropHandle<T> {
157 handle: JoinHandle<T>,
158}
159
160impl<T> AbortOnDropHandle<T> {
161 fn new(handle: JoinHandle<T>) -> Self {
162 Self { handle }
163 }
164}
165
166impl<T> Drop for AbortOnDropHandle<T> {
167 fn drop(&mut self) {
168 self.handle.abort();
169 }
170}
171
172impl<T> Future for AbortOnDropHandle<T> {
173 type Output = Result<T, JoinError>;
174
175 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 Pin::new(&mut self.handle).poll(cx)
177 }
178}
179
180impl<T> Unpin for AbortOnDropHandle<T> {}
181
182pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
183 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
184 RUNTIME.get_or_init(|| {
185 TokioRuntimeBuilder::new_multi_thread()
186 .enable_all()
187 .thread_name("datum-stream-tokio")
188 .build()
189 .expect("stream tokio runtime")
190 })
191}
192
193fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
194where
195 Fut: Future<Output = T> + Send + 'static,
196 T: Send + 'static,
197{
198 AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
199}
200
201pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
202 runtime::current_stream_cancelled()
203}
204
205pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
206where
207 F: FnOnce() -> T,
208{
209 catch_unwind(AssertUnwindSafe(f))
210 .map_err(|_| StreamError::Failed(format!("{context} panicked")))
211}
212
213impl<Key, Out> PartitionSlot<Key, Out> {
214 fn new(key: Key) -> Self {
215 Self {
216 key: Some(key),
217 active: 0,
218 queued: VecDeque::new(),
219 in_ready_queue: false,
220 }
221 }
222}
223
224#[inline(always)]
225fn partition_slot_for<Key, Out>(
226 key: Key,
227 slots_by_key: &mut HashMap<Key, usize>,
228 slots: &mut Vec<PartitionSlot<Key, Out>>,
229 free_slots: &mut Vec<usize>,
230) -> usize
231where
232 Key: Clone + Eq + Hash,
233{
234 if let Some(slot) = slots_by_key.get(&key) {
235 return *slot;
236 }
237
238 let slot = if let Some(slot) = free_slots.pop() {
239 let state = &mut slots[slot];
240 state.key = Some(key.clone());
241 state.active = 0;
242 state.queued.clear();
243 state.in_ready_queue = false;
244 slot
245 } else {
246 slots.push(PartitionSlot::new(key.clone()));
247 slots.len() - 1
248 };
249 slots_by_key.insert(key, slot);
250 slot
251}
252
253#[inline(always)]
254fn retire_partition_slot<Key, Out>(
255 slot: usize,
256 slots_by_key: &mut HashMap<Key, usize>,
257 slots: &mut [PartitionSlot<Key, Out>],
258 free_slots: &mut Vec<usize>,
259) where
260 Key: Eq + Hash,
261{
262 let state = &mut slots[slot];
263 if let Some(key) = state.key.take() {
264 slots_by_key.remove(&key);
265 }
266 state.active = 0;
267 state.queued.clear();
268 state.in_ready_queue = false;
269 free_slots.push(slot);
270}
271
272#[inline(always)]
273fn ready_partition_slot<Key, Out>(
274 slots: &mut [PartitionSlot<Key, Out>],
275 ready_slots: &mut VecDeque<usize>,
276 slot: usize,
277 per_partition: usize,
278) {
279 if let Some(state) = slots.get_mut(slot)
280 && state.key.is_some()
281 && !state.in_ready_queue
282 && state.active < per_partition
283 && !state.queued.is_empty()
284 {
285 state.in_ready_queue = true;
286 ready_slots.push_back(slot);
287 }
288}
289
290#[inline(always)]
291fn pop_ready_partition_slot<Key, Out>(
292 slots: &mut [PartitionSlot<Key, Out>],
293 ready_slots: &mut VecDeque<usize>,
294 per_partition: usize,
295) -> Option<(usize, usize, Out)> {
296 while let Some(slot) = ready_slots.pop_front() {
297 let mut requeue = false;
298 let item = if let Some(state) = slots.get_mut(slot) {
299 state.in_ready_queue = false;
300 if state.key.is_some() && state.active < per_partition {
301 let item = state.queued.pop_front().map(|(index, item)| {
302 state.active += 1;
303 (index, slot, item)
304 });
305 if !state.queued.is_empty() && state.active < per_partition {
306 state.in_ready_queue = true;
307 requeue = true;
308 }
309 item
310 } else {
311 None
312 }
313 } else {
314 None
315 };
316
317 if requeue {
318 ready_slots.push_back(slot);
319 }
320 if item.is_some() {
321 return item;
322 }
323 }
324 None
325}
326
327pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
328 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
329}
330
331struct FnSourceFactory<F>(F);
332
333impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
334where
335 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
336{
337 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
338 (self.0)(materializer)
339 }
340}
341
342struct MapSourceFactory<In, Out, Mat, F> {
343 source: Arc<dyn SourceFactory<In, Mat>>,
344 stage: F,
345 _marker: PhantomData<fn(In) -> Out>,
346}
347
348impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
349where
350 In: Send + 'static,
351 Out: Send + 'static,
352 Mat: Send + 'static,
353 F: Fn(In) -> Out + Send + Sync + 'static,
354{
355 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
356 let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
357 Ok((
358 Box::new(MapSourceStream {
359 input: stream,
360 factory: self,
361 }),
362 mat,
363 ))
364 }
365}
366
367struct MapSourceStream<In, Out, Mat, F> {
368 input: BoxStream<In>,
369 factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
370}
371
372impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
373where
374 F: Fn(In) -> Out,
375{
376 type Item = StreamResult<Out>;
377
378 fn next(&mut self) -> Option<Self::Item> {
379 self.input
380 .next()
381 .map(|item| item.map(|item| (self.factory.stage)(item)))
382 }
383}
384
385fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
386where
387 Out: Send + 'static,
388{
389 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
390 let mut current = 0usize;
391 Box::new(std::iter::from_fn(move || {
392 loop {
393 let index = next_active_optional_stream(&streams, current, |_| true)?;
394 current = (index + 1) % streams.len().max(1);
395 let Some(stream) = streams[index].as_mut() else {
396 continue;
397 };
398 match stream.next() {
399 Some(item) => return Some(item),
400 None => {
401 streams[index] = None;
402 if eager_complete {
403 return None;
404 }
405 }
406 }
407 }
408 }))
409}
410
411fn merge_prioritized_streams<Out>(
412 streams: Vec<BoxStream<Out>>,
413 priorities: Vec<usize>,
414 eager_complete: bool,
415) -> BoxStream<Out>
416where
417 Out: Send + 'static,
418{
419 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
420 let schedule: Vec<usize> = priorities
421 .into_iter()
422 .enumerate()
423 .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
424 .collect();
425 let mut schedule_index = 0usize;
426 Box::new(std::iter::from_fn(move || {
427 loop {
428 if streams.iter().all(Option::is_none) {
429 return None;
430 }
431 let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
432 let Some(stream) = streams[index].as_mut() else {
433 continue;
434 };
435 match stream.next() {
436 Some(item) => return Some(item),
437 None => {
438 streams[index] = None;
439 if eager_complete {
440 return None;
441 }
442 }
443 }
444 }
445 }))
446}
447
448fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
449where
450 Out: Ord + Send + 'static,
451{
452 let mut left_next: Option<Out> = None;
453 let mut right_next: Option<Out> = None;
454 let mut left_done = false;
455 let mut right_done = false;
456 Box::new(std::iter::from_fn(move || {
457 loop {
458 if left_next.is_none() && !left_done {
459 match left.next() {
460 Some(Ok(item)) => left_next = Some(item),
461 Some(Err(error)) => return Some(Err(error)),
462 None => left_done = true,
463 }
464 }
465 if right_next.is_none() && !right_done {
466 match right.next() {
467 Some(Ok(item)) => right_next = Some(item),
468 Some(Err(error)) => return Some(Err(error)),
469 None => right_done = true,
470 }
471 }
472
473 let next = match (&left_next, &right_next) {
474 (Some(left_item), Some(right_item)) => {
475 if left_item <= right_item {
476 left_next.take()
477 } else {
478 right_next.take()
479 }
480 }
481 (Some(_), None) if right_done => left_next.take(),
482 (None, Some(_)) if left_done => right_next.take(),
483 (None, None) if left_done && right_done => return None,
484 _ => continue,
485 };
486 if let Some(item) = next {
487 return Some(Ok(item));
488 }
489 }
490 }))
491}
492
493fn merge_latest_streams<Out>(
494 streams: Vec<BoxStream<Out>>,
495 eager_complete: bool,
496) -> BoxStream<Vec<Out>>
497where
498 Out: Clone + Send + 'static,
499{
500 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
501 let mut latest = vec![None; streams.len()];
502 let mut seen = 0usize;
503 let mut current = 0usize;
504 let mut pending = VecDeque::<Vec<Out>>::new();
505 Box::new(std::iter::from_fn(move || {
506 loop {
507 if let Some(output) = pending.pop_front() {
508 return Some(Ok(output));
509 }
510 if streams.iter().all(Option::is_none) {
511 return None;
512 }
513 let index = next_active_optional_stream(&streams, current, |_| true)?;
514 current = (index + 1) % streams.len().max(1);
515 let Some(stream) = streams[index].as_mut() else {
516 continue;
517 };
518 match stream.next() {
519 Some(Ok(item)) => {
520 if latest[index].is_none() {
521 seen += 1;
522 }
523 latest[index] = Some(item);
524 if seen == latest.len() {
525 pending.push_back(
526 latest
527 .iter()
528 .map(|item| item.clone().expect("merge-latest initialized"))
529 .collect(),
530 );
531 }
532 }
533 Some(Err(error)) => return Some(Err(error)),
534 None => {
535 streams[index] = None;
536 if eager_complete {
537 return None;
538 }
539 }
540 }
541 }
542 }))
543}
544
545fn zip_streams<Left, Right>(
546 mut left: BoxStream<Left>,
547 mut right: BoxStream<Right>,
548) -> BoxStream<(Left, Right)>
549where
550 Left: Send + 'static,
551 Right: Send + 'static,
552{
553 let mut left_next: Option<Left> = None;
554 let mut right_next: Option<Right> = None;
555 let mut left_done = false;
556 let mut right_done = false;
557 Box::new(std::iter::from_fn(move || {
558 loop {
559 if left_next.is_none() && !left_done {
560 match left.next() {
561 Some(Ok(item)) => left_next = Some(item),
562 Some(Err(error)) => return Some(Err(error)),
563 None => left_done = true,
564 }
565 }
566 if right_next.is_none() && !right_done {
567 match right.next() {
568 Some(Ok(item)) => right_next = Some(item),
569 Some(Err(error)) => return Some(Err(error)),
570 None => right_done = true,
571 }
572 }
573 match (left_next.take(), right_next.take()) {
574 (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
575 (left_item, right_item) => {
576 left_next = left_item;
577 right_next = right_item;
578 if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
579 return None;
580 }
581 }
582 }
583 }
584 }))
585}
586
587fn zip_latest_with_stream<Left, Right, Out, F>(
588 mut left: BoxStream<Left>,
589 mut right: BoxStream<Right>,
590 eager_complete: bool,
591 combine: Arc<F>,
592) -> BoxStream<Out>
593where
594 Left: Clone + Send + 'static,
595 Right: Clone + Send + 'static,
596 Out: Send + 'static,
597 F: Fn(Left, Right) -> Out + Send + Sync + 'static,
598{
599 let mut left_latest: Option<Left> = None;
600 let mut right_latest: Option<Right> = None;
601 let mut left_done = false;
602 let mut right_done = false;
603 let mut turn_left = true;
604 let mut pending = VecDeque::<Out>::new();
605
606 Box::new(std::iter::from_fn(move || {
607 loop {
608 if let Some(output) = pending.pop_front() {
609 return Some(Ok(output));
610 }
611 if eager_complete && (left_done || right_done) {
612 return None;
613 }
614 if left_done && right_done {
615 return None;
616 }
617 if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
621 return None;
622 }
623
624 let pull_left = if left_done {
625 false
626 } else if right_done {
627 true
628 } else {
629 let value = turn_left;
630 turn_left = !turn_left;
631 value
632 };
633
634 if pull_left {
635 match left.next() {
636 Some(Ok(item)) => {
637 left_latest = Some(item);
638 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
639 pending.push_back(combine(left_item.clone(), right_item.clone()));
640 }
641 }
642 Some(Err(error)) => return Some(Err(error)),
643 None => {
644 left_done = true;
645 if eager_complete {
646 return None;
647 }
648 }
649 }
650 } else {
651 match right.next() {
652 Some(Ok(item)) => {
653 right_latest = Some(item);
654 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
655 pending.push_back(combine(left_item.clone(), right_item.clone()));
656 }
657 }
658 Some(Err(error)) => return Some(Err(error)),
659 None => {
660 right_done = true;
661 if eager_complete {
662 return None;
663 }
664 }
665 }
666 }
667 }
668 }))
669}
670
671fn zip_all_stream<Left, Right>(
672 mut left: BoxStream<Left>,
673 mut right: BoxStream<Right>,
674 left_fill: Left,
675 right_fill: Right,
676) -> BoxStream<(Left, Right)>
677where
678 Left: Clone + Send + 'static,
679 Right: Clone + Send + 'static,
680{
681 let mut left_done = false;
682 let mut right_done = false;
683 Box::new(std::iter::from_fn(move || {
684 if left_done && right_done {
685 return None;
686 }
687
688 let left_item = if left_done {
689 None
690 } else {
691 match left.next() {
692 Some(Ok(item)) => Some(item),
693 Some(Err(error)) => return Some(Err(error)),
694 None => {
695 left_done = true;
696 None
697 }
698 }
699 };
700 let right_item = if right_done {
701 None
702 } else {
703 match right.next() {
704 Some(Ok(item)) => Some(item),
705 Some(Err(error)) => return Some(Err(error)),
706 None => {
707 right_done = true;
708 None
709 }
710 }
711 };
712
713 match (left_item, right_item) {
714 (None, None) if left_done && right_done => None,
715 (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
716 (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
717 (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
718 (None, None) => None,
719 }
720 }))
721}
722
723fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
724where
725 Out: Send + 'static,
726 Next: Send + 'static,
727 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
728{
729 let count = streams.len();
730 if count == 0 {
731 return Box::new(std::iter::empty());
732 }
733 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
734 let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
735 let mut current = 0usize;
736 Box::new(std::iter::from_fn(move || {
737 loop {
738 if slots.iter().all(Option::is_some) {
739 let values = slots
740 .iter_mut()
741 .map(|slot| slot.take().expect("zip-n slot filled"))
742 .collect();
743 return Some(Ok(zipper(values)));
744 }
745
746 let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
747 current = (index + 1) % count.max(1);
748 let Some(stream) = streams[index].as_mut() else {
749 continue;
750 };
751 match stream.next() {
752 Some(Ok(item)) => slots[index] = Some(item),
753 Some(Err(error)) => return Some(Err(error)),
754 None => {
755 streams[index] = None;
756 slots[index].as_ref()?;
757 }
758 }
759 }
760 }))
761}
762
763fn next_active_optional_stream<T, F>(
764 streams: &[Option<BoxStream<T>>],
765 current: usize,
766 predicate: F,
767) -> Option<usize>
768where
769 T: Send + 'static,
770 F: Fn(usize) -> bool,
771{
772 if streams.is_empty() {
773 return None;
774 }
775 for offset in 0..streams.len() {
776 let index = (current + offset) % streams.len();
777 if streams[index].is_some() && predicate(index) {
778 return Some(index);
779 }
780 }
781 None
782}
783
784fn next_weighted_stream<T>(
785 streams: &[Option<BoxStream<T>>],
786 schedule: &[usize],
787 schedule_index: &mut usize,
788) -> Option<usize>
789where
790 T: Send + 'static,
791{
792 if streams.is_empty() || schedule.is_empty() {
793 return None;
794 }
795 for _ in 0..schedule.len() {
796 let index = schedule[*schedule_index % schedule.len()];
797 *schedule_index = (*schedule_index + 1) % schedule.len();
798 if streams.get(index).is_some_and(Option::is_some) {
799 return Some(index);
800 }
801 }
802 None
803}
804
805mod completion;
806mod error;
807mod flow;
808mod rate;
809mod restart;
810mod runtime;
811mod sink;
812mod source;
813mod time;
814mod timer;
815
816pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
820 fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
821}
822
823pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
826 fn try_register(
830 &self,
831 hook: Arc<dyn SplitSegmentHookDyn>,
832 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
833}
834
835use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
836
837pub use self::{
838 completion::{Cancellable, StreamCompletion},
839 error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
840 flow::{BidiFlow, Flow},
841 rate::{AggregateTimer, OverflowStrategy},
842 restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
843 runtime::{Materializer, Runtime},
844 sink::{RunnableGraph, Sink, SinkCombineStrategy},
845 source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
846 time::{DelayOverflowStrategy, ThrottleMode},
847};
848
849#[cfg(test)]
850mod tests {
851 use super::*;
852 use crate::Attributes;
853 use crate::testkit::TestSink;
854 use std::fs;
855 use std::sync::{
856 Arc as StdArc,
857 atomic::{
858 AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
859 },
860 mpsc,
861 };
862 use std::time::Duration as StdDuration;
863 use std::time::Instant;
864
865 fn wait<T>(completion: StreamCompletion<T>) -> T {
866 completion.wait().unwrap()
867 }
868
869 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
870 let deadline = Instant::now() + timeout;
871 while Instant::now() < deadline {
872 if condition() {
873 return true;
874 }
875 thread::sleep(Duration::from_millis(2));
876 }
877 condition()
878 }
879
880 fn linux_thread_count(thread_name: &str) -> usize {
881 fs::read_dir("/proc/self/task")
882 .expect("task directory readable")
883 .filter_map(Result::ok)
884 .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
885 .filter(|name| name.trim() == thread_name)
886 .count()
887 }
888
889 #[test]
890 fn source_blueprints_are_reusable() {
891 let source = Source::from_iter(0..5).map(|item| item + 1);
892
893 assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
894 assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
895 }
896
897 #[test]
898 fn source_map_preserves_materialized_value() {
899 let graph = Source::single(1)
900 .map_materialized_value(|_| "source")
901 .map(|item| item + 1)
902 .to_mat(Sink::head(), Keep::both);
903
904 let materialized = graph.run().unwrap();
905 assert_eq!(materialized.0, "source");
906 assert_eq!(wait(materialized.1), 2);
907 }
908
909 #[test]
910 fn source_and_flow_compose() {
911 let flow = Flow::identity()
912 .map(|item: i32| item * 2)
913 .filter(|item| item % 3 == 0);
914
915 let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
916
917 assert_eq!(result, vec![0, 6, 12]);
918 }
919
920 #[test]
921 fn sink_setup_sees_materializer_defaults_and_local_attributes() {
922 let observed = StdArc::new(Mutex::new(None));
923 let observed_in_setup = StdArc::clone(&observed);
924 let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
925 *observed_in_setup.lock().unwrap() = Some((
926 attrs.name().map(str::to_owned),
927 attrs.input_buffer_hint(),
928 attrs.dispatcher_hint().map(str::to_owned),
929 ));
930 Sink::ignore()
931 })
932 .add_attributes(Attributes::named("sink-inner"))
933 .add_attributes(Attributes::input_buffer(4, 4))
934 .add_attributes(Attributes::dispatcher("bench-dispatcher"));
935
936 let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
937 wait(
938 Source::from_iter([1, 2, 3])
939 .run_with_materializer(sink, &materializer)
940 .unwrap(),
941 );
942
943 assert_eq!(
944 *observed.lock().unwrap(),
945 Some((
946 Some("sink-inner".to_owned()),
947 Some((4, 4)),
948 Some("bench-dispatcher".to_owned())
949 ))
950 );
951 }
952
953 #[test]
954 fn sink_pre_materialize_feeds_existing_materialization() {
955 let materializer = Materializer::new();
956 let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
957 .pre_materialize(&materializer)
958 .unwrap();
959
960 Source::from_iter([1, 2, 3])
961 .run_with_materializer(pre, &materializer)
962 .unwrap();
963
964 assert_eq!(wait(completion), vec![1, 2, 3]);
965 }
966
967 #[test]
968 fn flow_from_sink_and_source_connects_both_sides() {
969 assert_eq!(
970 Source::from_iter([1, 2, 3])
971 .via(Flow::from_sink_and_source(
972 Sink::foreach(|_item: i32| {}),
973 Source::from_iter([10, 20, 30]),
974 ))
975 .run_collect()
976 .unwrap(),
977 vec![10, 20, 30]
978 );
979 }
980
981 #[test]
982 fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
983 let completed = StdArc::new(StdAtomicBool::new(false));
984 let on_complete = StdArc::clone(&completed);
985 let flow = Flow::from_sink_and_source(
986 Sink::on_complete(move || {
987 on_complete.store(true, StdOrdering::SeqCst);
988 }),
989 Source::single(10),
990 );
991
992 let result = Source::from_iter([1, 2, 3])
993 .via(flow)
994 .run_collect()
995 .unwrap();
996
997 assert_eq!(result, vec![10]);
998 assert!(wait_until(StdDuration::from_secs(1), || {
999 completed.load(StdOrdering::SeqCst)
1000 }));
1001 }
1002
1003 #[test]
1004 fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1005 let cancellable = StdArc::new(Mutex::new(None));
1006 let observed = StdArc::clone(&cancellable);
1007 let flow = Flow::from_sink_and_source_coupled(
1008 Sink::ignore(),
1009 Source::tick(
1010 StdDuration::from_millis(50),
1011 StdDuration::from_millis(50),
1012 10,
1013 )
1014 .map_materialized_value(move |handle| {
1015 *observed.lock().unwrap() = Some(handle.clone());
1016 handle
1017 }),
1018 );
1019
1020 let completion = Source::from_iter(std::iter::empty::<i32>())
1021 .via(flow)
1022 .run_with(Sink::ignore())
1023 .unwrap();
1024 assert!(wait_until(StdDuration::from_secs(1), || {
1025 cancellable
1026 .lock()
1027 .unwrap()
1028 .as_ref()
1029 .is_some_and(Cancellable::is_cancelled)
1030 }));
1031 assert_eq!(wait(completion), NotUsed);
1032 }
1033
1034 #[test]
1035 fn bidi_flow_join_and_atop_compose() {
1036 let codec = BidiFlow::from_flows(
1037 Flow::identity().map(|item: i32| item + 1),
1038 Flow::identity().map(|item: i32| item * 2),
1039 )
1040 .named("codec");
1041 let framing = BidiFlow::from_flows(
1042 Flow::identity().map(|item: i32| item * 3),
1043 Flow::identity().map(|item: i32| item - 4),
1044 );
1045
1046 let joined = codec
1047 .clone()
1048 .join(Flow::identity().map(|item: i32| item - 5));
1049 let stacked = codec.atop(framing).join(Flow::identity());
1050
1051 assert_eq!(
1052 Source::single(10).via(joined).run_collect().unwrap(),
1053 vec![12]
1054 );
1055 assert_eq!(
1056 Source::single(10).via(stacked).run_collect().unwrap(),
1057 vec![58]
1058 );
1059 }
1060
1061 #[test]
1062 fn flow_buffer_then_map_runs_end_to_end() {
1063 let flow = Flow::identity()
1064 .buffer(8, OverflowStrategy::Backpressure)
1065 .map(|item: i32| item + 1);
1066
1067 let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1068
1069 assert_eq!(result, vec![1, 2, 3, 4]);
1070 }
1071
1072 #[test]
1073 fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1074 fn buffered_flow() -> Flow<i32, i32> {
1075 Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1076 }
1077
1078 assert_eq!(
1079 Source::from_iter(0..4)
1080 .via(buffered_flow().filter(|item| *item % 2 == 0))
1081 .run_collect()
1082 .unwrap(),
1083 vec![0, 2]
1084 );
1085 assert_eq!(
1086 Source::from_iter(0..4)
1087 .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1088 .run_collect()
1089 .unwrap(),
1090 vec![1, 3]
1091 );
1092 assert_eq!(
1093 Source::from_iter(0..4)
1094 .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1095 .run_collect()
1096 .unwrap(),
1097 vec![10, 12]
1098 );
1099 assert_eq!(
1100 Source::from_iter(0..3)
1101 .via(buffered_flow().map_concat(|item| [item, item + 10]))
1102 .run_collect()
1103 .unwrap(),
1104 vec![0, 10, 1, 11, 2, 12]
1105 );
1106 assert_eq!(
1107 Source::from_iter(0..3)
1108 .via(buffered_flow().stateful_map(5, |state, item| {
1109 *state += item;
1110 *state
1111 }))
1112 .run_collect()
1113 .unwrap(),
1114 vec![5, 6, 8]
1115 );
1116 assert_eq!(
1117 Source::from_iter(0..3)
1118 .via(buffered_flow().stateful_map_concat(0, |state, item| {
1119 *state += item;
1120 [*state, item]
1121 }))
1122 .run_collect()
1123 .unwrap(),
1124 vec![0, 0, 1, 1, 3, 2]
1125 );
1126 assert_eq!(
1127 Source::from_iter(0..4)
1128 .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1129 .run_collect()
1130 .unwrap(),
1131 vec![1, 2, 3, 4]
1132 );
1133 assert_eq!(
1134 Source::from_iter(0..4)
1135 .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1136 .run_collect()
1137 .unwrap(),
1138 vec![1, 2, 3, 4]
1139 );
1140 assert_eq!(
1141 Source::from_iter(0..4)
1142 .via(buffered_flow().map_async_partitioned(
1143 2,
1144 1,
1145 |item| item % 2,
1146 |item| async move { Ok(item + 1) },
1147 ))
1148 .run_collect()
1149 .unwrap(),
1150 vec![1, 2, 3, 4]
1151 );
1152 assert_eq!(
1153 Source::from_iter(0..5)
1154 .via(buffered_flow().take(3))
1155 .run_collect()
1156 .unwrap(),
1157 vec![0, 1, 2]
1158 );
1159 assert_eq!(
1160 Source::from_iter(0..5)
1161 .via(buffered_flow().drop(2))
1162 .run_collect()
1163 .unwrap(),
1164 vec![2, 3, 4]
1165 );
1166 assert_eq!(
1167 Source::from_iter(0..5)
1168 .via(buffered_flow().take_while(|item| *item < 3))
1169 .run_collect()
1170 .unwrap(),
1171 vec![0, 1, 2]
1172 );
1173 assert_eq!(
1174 Source::from_iter(0..5)
1175 .via(buffered_flow().drop_while(|item| *item < 3))
1176 .run_collect()
1177 .unwrap(),
1178 vec![3, 4]
1179 );
1180 assert_eq!(
1181 Source::from_iter(0..3)
1182 .via(buffered_flow().limit(5))
1183 .run_collect()
1184 .unwrap(),
1185 vec![0, 1, 2]
1186 );
1187 assert_eq!(
1188 Source::from_iter(0..5)
1189 .via(buffered_flow().grouped(2))
1190 .run_collect()
1191 .unwrap(),
1192 vec![vec![0, 1], vec![2, 3], vec![4]]
1193 );
1194 assert_eq!(
1195 Source::from_iter(1..=3)
1196 .via(buffered_flow().scan(0, |acc, item| acc + item))
1197 .run_collect()
1198 .unwrap(),
1199 vec![0, 1, 3, 6]
1200 );
1201 assert_eq!(
1202 Source::from_iter(1..=4)
1203 .via(buffered_flow().sliding(2, 1))
1204 .run_collect()
1205 .unwrap(),
1206 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1207 );
1208 assert_eq!(
1209 Source::from_iter(1..=4)
1210 .via(buffered_flow().fold(0, |acc, item| acc + item))
1211 .run_collect()
1212 .unwrap(),
1213 vec![10]
1214 );
1215 assert_eq!(
1216 Source::from_iter(1..=4)
1217 .via(buffered_flow().reduce(|acc, item| acc + item))
1218 .run_collect()
1219 .unwrap(),
1220 vec![10]
1221 );
1222 assert_eq!(
1223 Source::from_factory(|| {
1224 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1225 })
1226 .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1227 .run_collect(),
1228 Err(StreamError::Failed("mapped".into()))
1229 );
1230 assert_eq!(
1231 Source::<i32>::failed(StreamError::Failed("boom".into()))
1232 .via(buffered_flow().recover(|_| Some(42)))
1233 .run_collect()
1234 .unwrap(),
1235 vec![42]
1236 );
1237 assert_eq!(
1238 Source::<i32>::failed(StreamError::Failed("boom".into()))
1239 .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1240 .run_collect()
1241 .unwrap(),
1242 vec![7, 8]
1243 );
1244 assert_eq!(
1245 Source::<i32>::failed(StreamError::Failed("boom".into()))
1246 .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1247 .run_collect()
1248 .unwrap(),
1249 vec![9]
1250 );
1251 assert_eq!(
1252 Source::from_factory(|| {
1253 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1254 })
1255 .via(buffered_flow().on_error_complete())
1256 .run_collect()
1257 .unwrap(),
1258 vec![1]
1259 );
1260
1261 let materialized = Source::from_iter([1, 2, 3])
1262 .run_with(
1263 buffered_flow()
1264 .via(Flow::identity().map(|item| item + 1))
1265 .map_materialized_value(|_| "buffered-flow")
1266 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1267 )
1268 .unwrap();
1269 assert_eq!(materialized.0, "buffered-flow");
1270 assert_eq!(wait(materialized.1), 9);
1271
1272 let kept = Source::from_iter([1, 2, 3])
1273 .run_with(
1274 buffered_flow()
1275 .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1276 .to(Sink::fold(0, |acc, item| acc + item)),
1277 )
1278 .unwrap();
1279 assert_eq!(kept, "combined");
1280 }
1281
1282 #[test]
1283 fn runtime_rate_flows_compose_in_flow_form() {
1284 let conflate = Flow::identity()
1285 .conflate(|left: i32, right| left + right)
1286 .map(|item| item + 1);
1287 assert_eq!(
1288 Source::single(4).via(conflate).run_collect().unwrap(),
1289 vec![5]
1290 );
1291
1292 let batch = Flow::identity()
1293 .batch(4, |item: i32| item, |left, right| left + right)
1294 .map(|item| item + 1);
1295 assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1296
1297 let expand = Flow::identity()
1298 .expand(std::iter::once::<i32>)
1299 .map(|item| item + 1);
1300 assert_eq!(
1301 Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1302 vec![1, 2, 3, 4]
1303 );
1304
1305 let aggregate = Flow::identity()
1306 .aggregate_with_boundary(
1307 Vec::<i32>::new,
1308 |mut items, item| {
1309 items.push(item);
1310 let ready = !items.is_empty();
1311 (items, ready)
1312 },
1313 |items| items.into_iter().sum::<i32>(),
1314 None,
1315 )
1316 .map(|item| item + 1);
1317 assert_eq!(
1318 Source::from_iter(0..4)
1319 .via(aggregate)
1320 .run_collect()
1321 .unwrap(),
1322 vec![1, 2, 3, 4]
1323 );
1324
1325 let detached = Flow::identity().detach().map(|item: i32| item + 1);
1326 assert_eq!(
1327 Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1328 vec![1, 2, 3, 4]
1329 );
1330 }
1331
1332 #[test]
1333 fn high_use_source_flow_operators_work() {
1334 let result = Source::from_iter(0..8)
1335 .drop(1)
1336 .take(5)
1337 .filter_not(|item| item % 2 == 0)
1338 .map_concat(|item| [item, item + 10])
1339 .grouped(3)
1340 .run_collect()
1341 .unwrap();
1342
1343 assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1344 }
1345
1346 #[test]
1347 fn prefix_and_tail_emits_prefix_and_live_tail() {
1348 let mut outer = Source::from_iter(0..5)
1349 .prefix_and_tail(2)
1350 .run_collect()
1351 .unwrap();
1352 assert_eq!(outer.len(), 1);
1353 let (prefix, tail) = outer.pop().unwrap();
1354 assert_eq!(prefix, vec![0, 1]);
1355 assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1356 assert_eq!(
1357 tail.run_collect(),
1358 Err(StreamError::Failed(
1359 "substream source cannot be materialized more than once".into()
1360 ))
1361 );
1362 }
1363
1364 #[test]
1365 fn prefix_and_tail_fails_before_prefix_is_ready() {
1366 let result = Source::from_factory(|| {
1367 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1368 })
1369 .prefix_and_tail(2)
1370 .run_collect();
1371 assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1372 }
1373
1374 #[test]
1375 fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1376 let mut outer = Source::from_factory(|| {
1377 Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1378 })
1379 .prefix_and_tail(2)
1380 .run_collect()
1381 .unwrap();
1382 let (prefix, tail) = outer.pop().unwrap();
1383 assert_eq!(prefix, vec![1, 2]);
1384 assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1385 }
1386
1387 #[test]
1388 fn prefix_and_tail_accepts_non_clone_elements() {
1389 #[derive(Debug, PartialEq, Eq)]
1390 struct NonClone(u8);
1391
1392 let mut outer = Source::from_factory(|| {
1393 Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1394 })
1395 .prefix_and_tail(2)
1396 .run_collect()
1397 .unwrap();
1398 let (prefix, tail) = outer.pop().unwrap();
1399 assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1400 assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1401 }
1402
1403 #[test]
1404 fn flat_map_prefix_materializes_on_short_upstream_completion() {
1405 let values = Source::from_iter([1, 2])
1406 .flat_map_prefix(3, |prefix| {
1407 let sum = prefix.into_iter().sum::<i32>();
1408 Flow::identity().prepend(Source::single(sum))
1409 })
1410 .run_collect()
1411 .unwrap();
1412 assert_eq!(values, vec![3]);
1413 }
1414
1415 #[test]
1416 fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1417 let invoked = StdArc::new(StdAtomicBool::new(false));
1418 let invoked_for_stage = StdArc::clone(&invoked);
1419 let result = Source::from_factory(|| {
1420 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1421 })
1422 .flat_map_prefix(3, move |_prefix| {
1423 invoked_for_stage.store(true, StdOrdering::SeqCst);
1424 Flow::identity()
1425 })
1426 .run_collect();
1427 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1428 assert!(!invoked.load(StdOrdering::SeqCst));
1429 }
1430
1431 #[test]
1432 fn flat_map_concat_flattens_nested_sources_sequentially() {
1433 let values = Source::from_iter([1, 2, 3])
1434 .flat_map_concat(|item| Source::from_iter(0..item))
1435 .run_collect()
1436 .unwrap();
1437 assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1438 }
1439
1440 #[test]
1441 fn flat_map_merge_respects_breadth_bound() {
1442 let active = StdArc::new(StdAtomicUsize::new(0));
1443 let max_active = StdArc::new(StdAtomicUsize::new(0));
1444 let active_for_stage = StdArc::clone(&active);
1445 let max_for_stage = StdArc::clone(&max_active);
1446
1447 let mut values = Source::from_iter(0..6)
1448 .flat_map_merge(2, move |item| {
1449 let active = StdArc::clone(&active_for_stage);
1450 let max_active = StdArc::clone(&max_for_stage);
1451 Source::future(move || {
1452 let active = StdArc::clone(&active);
1453 let max_active = StdArc::clone(&max_active);
1454 async move {
1455 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1456 loop {
1457 let seen = max_active.load(StdOrdering::SeqCst);
1458 if now <= seen {
1459 break;
1460 }
1461 if max_active
1462 .compare_exchange(
1463 seen,
1464 now,
1465 StdOrdering::SeqCst,
1466 StdOrdering::SeqCst,
1467 )
1468 .is_ok()
1469 {
1470 break;
1471 }
1472 }
1473 thread::sleep(StdDuration::from_millis(20));
1474 active.fetch_sub(1, StdOrdering::SeqCst);
1475 Ok(item)
1476 }
1477 })
1478 })
1479 .run_collect()
1480 .unwrap();
1481 values.sort_unstable();
1482 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1483 assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1484 }
1485
1486 #[test]
1487 fn flat_map_merge_propagates_inner_failures() {
1488 let result = Source::from_iter([0, 1, 2])
1489 .flat_map_merge(2, |item| {
1490 if item == 1 {
1491 Source::failed(StreamError::Failed("boom".into()))
1492 } else {
1493 Source::single(item)
1494 }
1495 })
1496 .run_collect();
1497 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1498 }
1499
1500 #[test]
1501 fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1502 let (release_tx, release_rx) = mpsc::channel();
1503 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1504 let queue = Source::from_factory(move || {
1505 let release_rx = StdArc::clone(&release_rx);
1506 let mut step = 0_u8;
1507 Box::new(std::iter::from_fn(move || {
1508 let item = match step {
1509 0 => Some(Ok(0)),
1510 1 => {
1511 release_rx
1512 .lock()
1513 .unwrap()
1514 .as_ref()
1515 .expect("release receiver available")
1516 .recv_timeout(StdDuration::from_secs(1))
1517 .expect("timed out waiting to release second upstream element");
1518 Some(Ok(1))
1519 }
1520 _ => None,
1521 };
1522 step += 1;
1523 item
1524 }))
1525 })
1526 .flat_map_merge(2, |item| Source::single(item + 10))
1527 .run_with(Sink::queue())
1528 .unwrap();
1529
1530 assert_eq!(queue.pull().unwrap(), Some(10));
1531 release_tx.send(()).unwrap();
1532 assert_eq!(queue.pull().unwrap(), Some(11));
1533 assert!(queue.pull().unwrap().is_none());
1534 }
1535
1536 #[test]
1537 fn group_by_routes_keys_and_drops_closed_keys() {
1538 let outer = Source::from_iter([0, 1, 2, 3, 4])
1539 .group_by(4, |item| item % 2, false)
1540 .run_with(Sink::queue())
1541 .unwrap();
1542
1543 let even = outer.pull().unwrap().unwrap();
1544 let even_completion = even.run_with(Sink::ignore()).unwrap();
1545 let odd = outer.pull().unwrap().unwrap();
1546 drop(even_completion);
1547
1548 assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1549 assert!(outer.pull().unwrap().is_none());
1550 }
1551
1552 #[test]
1553 fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1554 let outer = Source::from_iter([0, 1, 2])
1555 .group_by(2, |item| *item, false)
1556 .run_with(Sink::queue())
1557 .unwrap();
1558
1559 let _ = outer.pull().unwrap().unwrap();
1560 let _ = outer.pull().unwrap().unwrap();
1561 assert!(matches!(
1562 outer.pull(),
1563 Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1564 ));
1565 }
1566
1567 #[test]
1568 fn group_by_can_recreate_closed_substreams_when_enabled() {
1569 let (release_tx, release_rx) = mpsc::channel();
1570 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1571 let outer = Source::from_factory(move || {
1572 let release_rx = StdArc::clone(&release_rx);
1573 let mut step = 0_u8;
1574 Box::new(std::iter::from_fn(move || {
1575 let item = match step {
1576 0 => Some(Ok(0)),
1577 1 => Some(Ok(1)),
1578 2 => {
1579 release_rx
1580 .lock()
1581 .unwrap()
1582 .as_ref()
1583 .expect("release receiver available")
1584 .recv_timeout(StdDuration::from_secs(1))
1585 .expect("timed out waiting to release recreated key");
1586 Some(Ok(0))
1587 }
1588 _ => None,
1589 };
1590 step += 1;
1591 item
1592 }))
1593 })
1594 .group_by(4, |item| item % 2, true)
1595 .run_with(Sink::queue())
1596 .unwrap();
1597
1598 let even = outer.pull().unwrap().unwrap();
1599 assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1600 release_tx.send(()).unwrap();
1601
1602 let odd = outer.pull().unwrap().unwrap();
1603 assert_eq!(odd.run_collect().unwrap(), vec![1]);
1604
1605 let recreated_even = outer.pull().unwrap().unwrap();
1606 assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1607 assert!(outer.pull().unwrap().is_none());
1608 }
1609
1610 #[test]
1611 fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1612 let outer = Source::from_iter([0, 1])
1613 .group_by(
1614 4,
1615 |item| {
1616 assert_ne!(*item, 1, "boom");
1617 item % 2
1618 },
1619 false,
1620 )
1621 .run_with(Sink::queue())
1622 .unwrap();
1623
1624 let substream = outer.pull().unwrap().unwrap();
1625 let (result_tx, result_rx) = mpsc::channel();
1626 thread::spawn(move || {
1627 let _ = result_tx.send(substream.run_collect());
1628 });
1629
1630 assert_eq!(
1631 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1632 Err(StreamError::AbruptTermination)
1633 );
1634 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1635 }
1636
1637 #[test]
1638 fn split_when_starts_new_substream_on_boundary_element() {
1639 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1640 .split_when(|item| *item == 0)
1641 .run_with(Sink::queue())
1642 .unwrap();
1643
1644 let first = outer.pull().unwrap().unwrap();
1645 assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1646 let second = outer.pull().unwrap().unwrap();
1647 assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1648 let third = outer.pull().unwrap().unwrap();
1649 assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1650 assert!(outer.pull().unwrap().is_none());
1651 }
1652
1653 #[test]
1654 fn split_after_ends_current_substream_on_boundary_element() {
1655 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1656 .split_after(|item| *item == 0)
1657 .run_with(Sink::queue())
1658 .unwrap();
1659
1660 let first = outer.pull().unwrap().unwrap();
1661 assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1662 let second = outer.pull().unwrap().unwrap();
1663 assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1664 let third = outer.pull().unwrap().unwrap();
1665 assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1666 assert!(outer.pull().unwrap().is_none());
1667 }
1668
1669 #[test]
1670 fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1671 let outer = Source::from_iter([1, 2])
1672 .split_when(|item| {
1673 assert_ne!(*item, 2, "boom");
1674 false
1675 })
1676 .run_with(Sink::queue())
1677 .unwrap();
1678
1679 let substream = outer.pull().unwrap().unwrap();
1680 let (result_tx, result_rx) = mpsc::channel();
1681 thread::spawn(move || {
1682 let _ = result_tx.send(substream.run_collect());
1683 });
1684
1685 assert_eq!(
1686 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1687 Err(StreamError::AbruptTermination)
1688 );
1689 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1690 }
1691
1692 #[test]
1693 fn split_when_pre_buffer_segments_match_expected_count() {
1694 let outer = Source::from_iter(0..100)
1695 .split_when(|item| *item != 0 && *item % 10 == 0)
1696 .run_with(Sink::queue())
1697 .unwrap();
1698 let mut segment_count = 0;
1699 while let Some(substream) = outer.pull().unwrap() {
1700 let items: Vec<i32> = substream.run_collect().unwrap();
1701 assert!(!items.is_empty(), "segment should not be empty");
1702 segment_count += 1;
1703 }
1704 assert_eq!(segment_count, 10, "100 elements in segments of 10");
1705 }
1706
1707 #[test]
1708 fn split_after_pre_buffer_segments_match_expected_count() {
1709 let outer = Source::from_iter(0..100)
1710 .split_after(|item| (*item + 1) % 10 == 0)
1711 .run_with(Sink::queue())
1712 .unwrap();
1713 let mut segment_count = 0;
1714 let mut total = 0_i32;
1715 while let Some(substream) = outer.pull().unwrap() {
1716 let items: Vec<i32> = substream.run_collect().unwrap();
1717 assert!(!items.is_empty(), "segment should not be empty");
1718 total += items.len() as i32;
1719 segment_count += 1;
1720 }
1721 assert_eq!(segment_count, 10);
1722 assert_eq!(total, 100);
1723 }
1724
1725 #[test]
1726 fn group_by_single_key_fused_matches_general_path() {
1727 let outer = Source::from_iter(0..1000i64)
1728 .group_by(1, |_| 0u8, false)
1729 .run_with(Sink::queue())
1730 .unwrap();
1731 let substream = outer.pull().unwrap().unwrap();
1732 let items: Vec<i64> = substream.run_collect().unwrap();
1733 assert_eq!(items.len(), 1000);
1734 assert_eq!(items[0], 0);
1735 assert_eq!(items[999], 999);
1736 assert!(outer.pull().unwrap().is_none());
1737 }
1738
1739 #[test]
1740 fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1741 let outer = Source::from_iter([0, 1, 0])
1742 .group_by(2, |item| *item, false)
1743 .run_with(Sink::queue())
1744 .unwrap();
1745 let mut sources = vec![];
1746 while let Some(source) = outer.pull().unwrap() {
1747 sources.push(source);
1748 }
1749 assert_eq!(sources.len(), 2);
1750 assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1751 assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1752 }
1753
1754 #[test]
1755 fn flat_map_merge_lock_lighter_matches_expected_count() {
1756 let items = Source::from_iter(0..20)
1757 .flat_map_merge(2, |item| Source::single(item + 100))
1758 .run_with(Sink::queue())
1759 .unwrap();
1760 let mut count = 0;
1761 while items.pull().unwrap().is_some() {
1762 count += 1;
1763 }
1764 assert_eq!(count, 20);
1765 }
1766
1767 #[test]
1777 fn group_by_single_key_emits_substream_before_upstream_completes() {
1778 let (tx, rx) = mpsc::sync_channel::<i32>(0);
1781 let rx = StdArc::new(std::sync::Mutex::new(rx));
1782
1783 let outer = Source::from_factory({
1784 let rx = StdArc::clone(&rx);
1785 move || {
1786 let rx = StdArc::clone(&rx);
1787 Box::new(std::iter::from_fn(move || {
1788 rx.lock().unwrap().recv().ok().map(Ok)
1789 })) as BoxStream<i32>
1790 }
1791 })
1792 .group_by(1, |_| 0u8, false)
1793 .run_with(Sink::queue())
1794 .unwrap();
1795
1796 let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
1799 let outer_thread = thread::spawn(move || {
1800 let substream = outer.pull().unwrap().expect("expected a substream");
1801 sub_tx.send(substream).unwrap();
1802 });
1803
1804 tx.send(0).unwrap();
1807
1808 let substream = sub_rx
1810 .recv_timeout(StdDuration::from_secs(5))
1811 .expect("timed out — group_by buffered first element before emitting substream");
1812
1813 for i in 1..100_i32 {
1815 tx.send(i).unwrap();
1816 }
1817 drop(tx);
1818
1819 let items: Vec<i32> = substream.run_collect().unwrap();
1820 assert_eq!(items.len(), 100);
1821 outer_thread.join().unwrap();
1822 }
1823
1824 #[test]
1825 fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
1826 const STREAMS: usize = 32;
1827 const ROUNDS: usize = 8;
1828 const ITEMS: i64 = 8;
1829
1830 for _ in 0..ROUNDS {
1831 let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
1832 let mut handles = Vec::with_capacity(STREAMS);
1833
1834 for _ in 0..STREAMS {
1835 let barrier = StdArc::clone(&barrier);
1836 handles.push(thread::spawn(move || {
1837 let (tx, rx) = mpsc::sync_channel::<i64>(0);
1838 let rx = StdArc::new(std::sync::Mutex::new(rx));
1839
1840 let outer = Source::from_factory({
1841 let rx = StdArc::clone(&rx);
1842 move || {
1843 let rx = StdArc::clone(&rx);
1844 Box::new(std::iter::from_fn(move || {
1845 rx.lock().unwrap().recv().ok().map(Ok)
1846 })) as BoxStream<i64>
1847 }
1848 })
1849 .group_by(1, |_| 0_u8, false)
1850 .run_with(Sink::queue())
1851 .unwrap();
1852
1853 barrier.wait();
1854
1855 tx.send(0).unwrap();
1856 let substream = outer.pull().unwrap().expect("expected group_by substream");
1857 let subqueue = substream.run_with(Sink::queue()).unwrap();
1858 assert_eq!(subqueue.pull().unwrap(), Some(0));
1859
1860 for item in 1..ITEMS {
1861 tx.send(item).unwrap();
1862 assert_eq!(subqueue.pull().unwrap(), Some(item));
1863 }
1864 drop(tx);
1865
1866 assert!(subqueue.pull().unwrap().is_none());
1867 assert!(outer.pull().unwrap().is_none());
1868 }));
1869 }
1870
1871 for handle in handles {
1872 handle.join().expect("group_by stress worker panicked");
1873 }
1874 }
1875 }
1876
1877 #[test]
1884 fn split_when_emits_substream_before_segment_ends() {
1885 const SEGMENT_LEN: usize = 300;
1889
1890 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1891 for i in 0..SEGMENT_LEN as i32 {
1892 tx.send(i).unwrap();
1893 }
1894 tx.send(-1).unwrap(); tx.send(99).unwrap(); drop(tx);
1897
1898 let outer = Source::from_iter(rx)
1899 .split_when(|item| *item == -1)
1900 .run_with(Sink::queue())
1901 .unwrap();
1902
1903 let (result_tx, result_rx) = mpsc::channel();
1904 thread::spawn(move || {
1905 let first = outer.pull().unwrap().expect("expected first substream");
1906 let items: Vec<i32> = first.run_collect().unwrap();
1907 let second = outer.pull().unwrap().expect("expected second substream");
1908 let items2: Vec<i32> = second.run_collect().unwrap();
1909 let done = outer.pull().unwrap().is_none();
1910 let _ = result_tx.send((items, items2, done));
1911 });
1912
1913 let (items, items2, done) = result_rx
1914 .recv_timeout(StdDuration::from_secs(5))
1915 .expect("timed out — split_when is buffering the whole segment");
1916 assert_eq!(items.len(), SEGMENT_LEN);
1917 assert_eq!(items2, vec![-1, 99]);
1918 assert!(done);
1919 }
1920
1921 #[test]
1922 fn split_after_emits_substream_before_segment_ends() {
1923 const SEGMENT_LEN: usize = 300;
1924
1925 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
1926 for i in 0..SEGMENT_LEN as i32 {
1927 tx.send(i).unwrap();
1928 }
1929 tx.send(-1).unwrap(); tx.send(99).unwrap();
1931 drop(tx);
1932
1933 let outer = Source::from_iter(rx)
1934 .split_after(|item| *item == -1)
1935 .run_with(Sink::queue())
1936 .unwrap();
1937
1938 let (result_tx, result_rx) = mpsc::channel();
1939 thread::spawn(move || {
1940 let first = outer.pull().unwrap().expect("expected first substream");
1941 let items: Vec<i32> = first.run_collect().unwrap();
1942 let second = outer.pull().unwrap().expect("expected second substream");
1943 let items2: Vec<i32> = second.run_collect().unwrap();
1944 let done = outer.pull().unwrap().is_none();
1945 let _ = result_tx.send((items, items2, done));
1946 });
1947
1948 let (items, items2, done) = result_rx
1949 .recv_timeout(StdDuration::from_secs(5))
1950 .expect("timed out — split_after is buffering the whole segment");
1951 assert_eq!(items.len(), SEGMENT_LEN + 1);
1953 assert_eq!(items2, vec![99]);
1954 assert!(done);
1955 }
1956
1957 #[test]
1961 fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
1962 for _ in 0..20 {
1963 let result = Source::from_iter(0..50_i32)
1964 .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
1965 .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
1966 .unwrap()
1967 .wait();
1968 assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
1971 }
1972 }
1973
1974 #[test]
1979 fn flat_map_merge_single_mutex_race_stress() {
1980 for _ in 0..20 {
1981 let result = Source::from_iter(0..100_i64)
1982 .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
1983 .run_with(Sink::fold(0i64, |acc, v| acc + v))
1984 .unwrap()
1985 .wait();
1986 assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
1990 }
1991 }
1992
1993 #[test]
2000 fn split_when_bounded_memory_rendezvous() {
2001 const SEGMENT: usize = 100;
2004 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2005 for i in 0..SEGMENT as i32 {
2006 tx.send(i).unwrap();
2007 }
2008 tx.send(-1).unwrap(); for i in 0..10_i32 {
2011 tx.send(i).unwrap();
2012 }
2013 drop(tx);
2014
2015 let outer = Source::from_iter(rx)
2016 .split_when(|item| *item == -1)
2017 .run_with(Sink::queue())
2018 .unwrap();
2019
2020 let (result_tx, result_rx) = mpsc::channel();
2021 thread::spawn(move || {
2022 let first = outer.pull().unwrap().expect("first segment");
2023 let seg1: Vec<i32> = first.run_collect().unwrap();
2024 let second = outer.pull().unwrap().expect("second segment");
2025 let seg2: Vec<i32> = second.run_collect().unwrap();
2026 let done = outer.pull().unwrap().is_none();
2027 result_tx.send((seg1, seg2, done)).unwrap();
2028 });
2029
2030 let (seg1, seg2, done) = result_rx
2031 .recv_timeout(StdDuration::from_secs(5))
2032 .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2033 assert_eq!(seg1.len(), SEGMENT, "first segment length");
2034 assert_eq!(seg2[0], -1, "boundary element starts second segment");
2035 assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2036 assert!(done);
2037 }
2038
2039 #[test]
2043 fn group_by_single_key_bounded_memory_rendezvous() {
2044 const N: usize = 200;
2047 let outer = Source::from_iter(0..N as i64)
2048 .group_by(1, |_| 0u8, false)
2049 .run_with(Sink::queue())
2050 .unwrap();
2051
2052 let (result_tx, result_rx) = mpsc::channel();
2053 thread::spawn(move || {
2054 let substream = outer.pull().unwrap().expect("substream");
2055 let items: Vec<i64> = substream.run_collect().unwrap();
2056 let done = outer.pull().unwrap().is_none();
2057 result_tx.send((items, done)).unwrap();
2058 });
2059
2060 let (items, done) = result_rx
2061 .recv_timeout(StdDuration::from_secs(5))
2062 .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2063 assert_eq!(items.len(), N, "all items delivered");
2064 assert_eq!(items[0], 0);
2065 assert_eq!(items[N - 1], (N - 1) as i64);
2066 assert!(done);
2067 }
2068
2069 #[test]
2070 fn scan_emits_seed_and_accumulated_values() {
2071 let result = Source::from_iter(1..=3)
2072 .scan(0, |acc, item| acc + item)
2073 .run_collect()
2074 .unwrap();
2075
2076 assert_eq!(result, vec![0, 1, 3, 6]);
2077 }
2078
2079 #[test]
2080 fn limit_fails_after_max_elements() {
2081 let result = Source::from_iter(0..3).limit(2).run_collect();
2082
2083 assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2084 }
2085
2086 #[test]
2087 fn limit_weighted_fails_with_limit_error_like_akka() {
2088 let result = Source::from_iter(["this", "is", "some", "string"])
2089 .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2090 .run_collect();
2091
2092 assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2093 }
2094
2095 #[test]
2096 fn grouped_weighted_allows_oversized_first_element_like_akka() {
2097 let result = Source::from_iter([10_usize, 1, 2])
2098 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2099 .run_collect()
2100 .unwrap();
2101
2102 assert_eq!(result, vec![vec![10], vec![1, 2]]);
2103 }
2104
2105 #[test]
2106 fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2107 let result = Source::from_iter([1_usize, 10, 2])
2108 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2109 .run_collect()
2110 .unwrap();
2111
2112 assert_eq!(result, vec![vec![1, 10], vec![2]]);
2113 }
2114
2115 #[test]
2116 fn sink_terminals_materialize_results() {
2117 let sum = Source::from_iter(1..=4)
2118 .run_with(Sink::fold(0, |acc, item| acc + item))
2119 .unwrap();
2120
2121 assert_eq!(wait(sum), 10);
2122 assert_eq!(
2123 wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2124 1
2125 );
2126 assert_eq!(
2127 wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2128 4
2129 );
2130 }
2131
2132 #[test]
2133 fn all_terminal_sink_variants_complete() {
2134 assert_eq!(
2135 wait(
2136 Source::from_iter([1, 2, 3])
2137 .run_with(Sink::collect())
2138 .unwrap()
2139 ),
2140 vec![1, 2, 3]
2141 );
2142 assert_eq!(
2143 wait(
2144 Source::<i32>::empty()
2145 .run_with(Sink::head_option())
2146 .unwrap()
2147 ),
2148 None
2149 );
2150 assert_eq!(
2151 wait(
2152 Source::from_iter([1, 2, 3])
2153 .run_with(Sink::last_option())
2154 .unwrap()
2155 ),
2156 Some(3)
2157 );
2158 assert_eq!(
2159 wait(
2160 Source::from_iter([1, 2, 3])
2161 .run_with(Sink::reduce(|acc, item| acc + item))
2162 .unwrap()
2163 ),
2164 6
2165 );
2166
2167 let seen = StdArc::new(StdAtomicUsize::new(0));
2168 let seen_by_sink = StdArc::clone(&seen);
2169 assert_eq!(
2170 wait(
2171 Source::from_iter([1_usize, 2, 3])
2172 .run_with(Sink::foreach(move |item| {
2173 seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2174 }))
2175 .unwrap()
2176 ),
2177 NotUsed
2178 );
2179 assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2180 }
2181
2182 #[test]
2183 fn take_last_zero_returns_empty_vector() {
2184 let result = Source::from_iter([1, 2, 3])
2185 .run_with(Sink::take_last(0))
2186 .unwrap();
2187
2188 assert_eq!(wait(result), Vec::<i32>::new());
2189 }
2190
2191 #[test]
2192 fn bounded_head_terminals_complete_inline() {
2193 let materializer = Materializer::new();
2194
2195 let mut head = Source::from_iter(0_u64..1_000)
2196 .run_with_materializer(Sink::head(), &materializer)
2197 .unwrap();
2198 assert_eq!(materializer.active_streams(), 0);
2199 assert_eq!(head.try_wait(), Some(Ok(0)));
2200
2201 let mut filtered_head = Source::from_iter(0_u64..1_000)
2202 .filter(|item| *item >= 10)
2203 .run_with_materializer(Sink::head(), &materializer)
2204 .unwrap();
2205 assert_eq!(materializer.active_streams(), 0);
2206 assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2207
2208 let mut head_option = Source::<u64>::empty()
2209 .run_with_materializer(Sink::head_option(), &materializer)
2210 .unwrap();
2211 assert_eq!(materializer.active_streams(), 0);
2212 assert_eq!(head_option.try_wait(), Some(Ok(None)));
2213 }
2214
2215 #[test]
2216 fn bounded_head_fast_path_preserves_terminal_errors() {
2217 let materializer = Materializer::new();
2218
2219 let mut empty = Source::<u64>::empty()
2220 .run_with_materializer(Sink::head(), &materializer)
2221 .unwrap();
2222 assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2223
2224 let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2225 .run_with_materializer(Sink::head(), &materializer)
2226 .unwrap();
2227 assert_eq!(
2228 failed.try_wait(),
2229 Some(Err(StreamError::Failed("boom".into())))
2230 );
2231 assert_eq!(materializer.active_streams(), 0);
2232 }
2233
2234 #[test]
2235 fn runnable_graph_composes_source_and_sink() {
2236 let graph = Source::from_iter(1..=4)
2237 .map(|item| item * 2)
2238 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2239
2240 assert_eq!(wait(graph.run().unwrap()), 20);
2241
2242 let graph = Source::single(1)
2243 .map_materialized_value(|_| 20)
2244 .to(Sink::ignore())
2245 .map_materialized_value(|value| value + 1);
2246 assert_eq!(graph.run().unwrap(), 21);
2247
2248 let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2249 assert_eq!(ignored, NotUsed);
2250 }
2251
2252 #[test]
2253 fn materialized_values_follow_keep_defaults() {
2254 let source = Source::single(1).map_materialized_value(|_| "source");
2255 let flow = Flow::identity().map_materialized_value(|_| "flow");
2256
2257 let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2258 assert_eq!(source_mat.unwrap(), "source");
2259
2260 let combined = source
2261 .via_mat(flow, Keep::both)
2262 .to_mat(Sink::ignore(), Keep::both)
2263 .run()
2264 .unwrap();
2265 assert_eq!(combined.0, ("source", "flow"));
2266 assert_eq!(wait(combined.1), NotUsed);
2267
2268 let sink_mat = Source::single(41)
2269 .map_materialized_value(|_| "ignored source")
2270 .run_with(Sink::fold(1, |acc, item| acc + item))
2271 .unwrap();
2272 assert_eq!(wait(sink_mat), 42);
2273 }
2274
2275 #[test]
2276 fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2277 let sink = Flow::identity()
2278 .map(|item: i32| item + 1)
2279 .map_materialized_value(|_| "flow")
2280 .to(Sink::fold(0, |acc, item| acc + item));
2281
2282 let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2283
2284 assert_eq!(materialized, "flow");
2285 let explicit = Flow::identity()
2286 .map(|item: i32| item + 1)
2287 .map_materialized_value(|_| "flow")
2288 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2289 .run_with(Source::from_iter([1, 2, 3]))
2290 .unwrap();
2291 assert_eq!(explicit, NotUsed);
2292
2293 let explicit = Source::from_iter([1, 2, 3])
2294 .run_with(
2295 Flow::identity()
2296 .map(|item: i32| item + 1)
2297 .map_materialized_value(|_| "flow")
2298 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2299 )
2300 .unwrap();
2301 assert_eq!(explicit.0, "flow");
2302 assert_eq!(wait(explicit.1), 9);
2303 }
2304
2305 #[test]
2306 fn materializer_shutdown_fails_materialization() {
2307 let materializer = Materializer::new();
2308 let named = materializer.with_name_prefix("test-stream");
2309 materializer.shutdown();
2310
2311 let graph = Source::single(1).to(Sink::ignore());
2312
2313 assert_eq!(named.name_prefix(), "test-stream");
2314 assert_eq!(
2315 graph.run_with_materializer(&named),
2316 Err(StreamError::AbruptTermination)
2317 );
2318 }
2319
2320 #[test]
2321 fn materializer_shutdown_fails_running_stream_completion() {
2322 let materializer = Materializer::new();
2323 let completion = Source::repeat(1)
2324 .run_with_materializer(Sink::ignore(), &materializer)
2325 .unwrap();
2326
2327 assert_eq!(materializer.active_streams(), 1);
2328 materializer.shutdown();
2329 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2330 assert_eq!(materializer.active_streams(), 0);
2331 }
2332
2333 #[test]
2334 fn dropped_stream_completion_cancels_running_stream() {
2335 let materializer = Materializer::new();
2336 let completion = Source::repeat(1)
2337 .run_with_materializer(Sink::ignore(), &materializer)
2338 .unwrap();
2339
2340 assert_eq!(materializer.active_streams(), 1);
2341 drop(completion);
2342 for _ in 0..50 {
2343 if materializer.active_streams() == 0 {
2344 break;
2345 }
2346 thread::sleep(Duration::from_millis(5));
2347 }
2348 assert_eq!(materializer.active_streams(), 0);
2349 }
2350
2351 #[test]
2352 fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2353 let materializer = Materializer::new();
2354 let (once_tx, once_rx) = mpsc::channel();
2355 let once = materializer.schedule_once(Duration::from_millis(5), move || {
2356 once_tx.send(()).unwrap();
2357 });
2358 once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2359 assert!(!once.is_cancelled());
2360
2361 let (cancelled_tx, cancelled_rx) = mpsc::channel();
2362 let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2363 cancelled_tx.send(()).unwrap();
2364 });
2365 assert!(cancelled.cancel());
2366 assert!(!cancelled.cancel());
2367 assert!(cancelled.is_cancelled());
2368 assert!(
2369 cancelled_rx
2370 .recv_timeout(Duration::from_millis(75))
2371 .is_err()
2372 );
2373
2374 let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2375 let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2376 let fixed_delay = materializer.schedule_with_fixed_delay(
2377 Duration::from_millis(1),
2378 Duration::from_millis(5),
2379 move || {
2380 fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2381 },
2382 );
2383 thread::sleep(Duration::from_millis(25));
2384 assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2385 fixed_delay.cancel();
2386
2387 let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2388 let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2389 let fixed_rate = materializer.schedule_at_fixed_rate(
2390 Duration::from_millis(1),
2391 Duration::from_millis(5),
2392 move || {
2393 fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2394 },
2395 );
2396 thread::sleep(Duration::from_millis(25));
2397 assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2398 fixed_rate.cancel();
2399
2400 let shutdown_materializer = Materializer::new();
2401 let (shutdown_tx, shutdown_rx) = mpsc::channel();
2402 shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2403 shutdown_tx.send(()).unwrap();
2404 });
2405 shutdown_materializer.shutdown();
2406 assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2407 }
2408
2409 #[test]
2410 fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2411 use std::sync::{Condvar, Mutex};
2412
2413 #[derive(Debug)]
2414 enum TimerEvent {
2415 Started(usize, Instant),
2416 Completed(usize, Instant),
2417 }
2418
2419 let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2420 rx.recv_timeout(Duration::from_secs(20))
2421 .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2422 };
2423 let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2424 let (released, condvar) = &**gate;
2425 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2426 *released = true;
2427 condvar.notify_all();
2428 };
2429
2430 let interval = Duration::from_secs(2);
2431 let overrun = interval + Duration::from_millis(250);
2432
2433 let rate_materializer = Materializer::new();
2434 let (rate_tx, rate_rx) = mpsc::channel();
2435 let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2436 let rate_task_runs = StdArc::clone(&rate_runs);
2437 let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2438 let rate_task_gate = StdArc::clone(&rate_gate);
2439 let fixed_rate =
2440 rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2441 let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2442 rate_tx
2443 .send(TimerEvent::Started(run, Instant::now()))
2444 .unwrap();
2445 if run == 1 {
2446 let (released, condvar) = &*rate_task_gate;
2447 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2448 while !*released {
2449 released = condvar
2450 .wait(released)
2451 .unwrap_or_else(|poison| poison.into_inner());
2452 }
2453 rate_tx
2454 .send(TimerEvent::Completed(run, Instant::now()))
2455 .unwrap();
2456 }
2457 });
2458 let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2459 TimerEvent::Started(1, at) => at,
2460 other => panic!("fixed-rate first task: unexpected event {other:?}"),
2461 };
2462 assert!(wait_until(Duration::from_secs(20), || {
2463 rate_first_started.elapsed() >= overrun
2464 }));
2465 release(&rate_gate);
2466 let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2467 TimerEvent::Completed(1, at) => at,
2468 other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2469 };
2470 let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2471 TimerEvent::Started(2, at) => at,
2472 other => panic!("fixed-rate second task: unexpected event {other:?}"),
2473 };
2474 fixed_rate.cancel();
2475 rate_materializer.shutdown();
2476
2477 let delay_materializer = Materializer::new();
2478 let (delay_tx, delay_rx) = mpsc::channel();
2479 let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2480 let delay_task_runs = StdArc::clone(&delay_runs);
2481 let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2482 let delay_task_gate = StdArc::clone(&delay_gate);
2483 let fixed_delay =
2484 delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2485 let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2486 delay_tx
2487 .send(TimerEvent::Started(run, Instant::now()))
2488 .unwrap();
2489 if run == 1 {
2490 let (released, condvar) = &*delay_task_gate;
2491 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2492 while !*released {
2493 released = condvar
2494 .wait(released)
2495 .unwrap_or_else(|poison| poison.into_inner());
2496 }
2497 delay_tx
2498 .send(TimerEvent::Completed(run, Instant::now()))
2499 .unwrap();
2500 }
2501 });
2502 let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2503 TimerEvent::Started(1, at) => at,
2504 other => panic!("fixed-delay first task: unexpected event {other:?}"),
2505 };
2506 assert!(wait_until(Duration::from_secs(20), || {
2507 delay_first_started.elapsed() >= overrun
2508 }));
2509 release(&delay_gate);
2510 let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2511 TimerEvent::Completed(1, at) => at,
2512 other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2513 };
2514 let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2515 TimerEvent::Started(2, at) => at,
2516 other => panic!("fixed-delay second task: unexpected event {other:?}"),
2517 };
2518 fixed_delay.cancel();
2519 delay_materializer.shutdown();
2520
2521 let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2522 let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2523 let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2524 let delay_gap = delay_second_started.duration_since(delay_first_completed);
2525 assert!(
2526 rate_task_time >= interval,
2527 "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2528 );
2529 assert!(
2530 rate_catch_up < interval,
2531 "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2532 );
2533 assert!(
2534 delay_task_time >= interval,
2535 "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2536 );
2537 assert!(
2538 delay_gap >= interval,
2539 "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2540 );
2541 }
2542
2543 #[test]
2544 fn runtime_repeating_timer_cancellation_stops_future_fires() {
2545 let materializer = Materializer::new();
2546 let (tx, rx) = mpsc::channel();
2547 let timer = materializer.schedule_at_fixed_rate(
2548 Duration::from_millis(1),
2549 Duration::from_millis(30),
2550 move || {
2551 tx.send(()).unwrap();
2552 },
2553 );
2554
2555 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2556 assert!(timer.cancel());
2557 assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2558 materializer.shutdown();
2559 }
2560
2561 #[test]
2562 fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2563 let materializer = Materializer::new();
2564 materializer.schedule_once(Duration::from_millis(1), || {
2565 panic!("timer boom");
2566 });
2567
2568 let (tx, rx) = mpsc::channel();
2569 materializer.schedule_once(Duration::from_millis(20), move || {
2570 tx.send(()).unwrap();
2571 });
2572
2573 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2574 materializer.shutdown();
2575 }
2576
2577 #[test]
2578 fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2579 let materializer = Materializer::new();
2580 let panic_count = StdArc::new(StdAtomicUsize::new(0));
2581 let panic_count_task = StdArc::clone(&panic_count);
2582 materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2583 panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2584 panic!("fixed-rate boom");
2585 });
2586
2587 assert!(wait_until(Duration::from_millis(150), || {
2588 panic_count.load(StdOrdering::SeqCst) == 1
2589 }));
2590
2591 let (tx, rx) = mpsc::channel();
2592 materializer.schedule_once(Duration::from_millis(30), move || {
2593 tx.send(()).unwrap();
2594 });
2595 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2596
2597 thread::sleep(Duration::from_millis(90));
2598 assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2599 materializer.shutdown();
2600 }
2601
2602 #[test]
2603 fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2604 let materializer = Materializer::new();
2605 let started = StdArc::new(StdAtomicBool::new(false));
2606 let started_task = StdArc::clone(&started);
2607 let slow_timer = materializer.schedule_at_fixed_rate(
2608 Duration::ZERO,
2609 Duration::from_millis(250),
2610 move || {
2611 started_task.store(true, StdOrdering::SeqCst);
2612 thread::sleep(Duration::from_millis(200));
2613 },
2614 );
2615
2616 assert!(wait_until(Duration::from_millis(100), || {
2617 started.load(StdOrdering::SeqCst)
2618 }));
2619
2620 let start = Instant::now();
2621 let (tx, rx) = mpsc::channel();
2622 materializer.schedule_once(Duration::from_millis(10), move || {
2623 tx.send(Instant::now()).unwrap();
2624 });
2625 let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2626 let elapsed = fired_at.duration_since(start);
2627
2628 slow_timer.cancel();
2629 materializer.shutdown();
2630 assert!(
2631 elapsed < Duration::from_millis(150),
2632 "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2633 );
2634 }
2635
2636 #[test]
2637 fn runtime_shutdown_stops_timer_driver_thread() {
2638 let materializer = Materializer::new();
2639 assert!(wait_until(Duration::from_secs(1), || materializer
2640 .timer_driver_is_live()));
2641
2642 materializer.shutdown();
2643 assert!(wait_until(Duration::from_secs(2), || !materializer
2644 .timer_driver_is_live()));
2645 }
2646
2647 #[test]
2648 fn runtime_timer_driver_orders_many_timers_by_deadline() {
2649 let materializer = Materializer::new();
2650 let (tx, rx) = mpsc::channel();
2651 let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2652
2653 for (delay_ms, value) in schedule {
2654 let tx = tx.clone();
2655 materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2656 tx.send(value).unwrap();
2657 });
2658 }
2659 drop(tx);
2660
2661 let mut received = Vec::new();
2662 for _ in 0..schedule.len() {
2663 received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2664 }
2665 materializer.shutdown();
2666
2667 assert_eq!(received, vec![1, 2, 3, 4, 5]);
2668 }
2669
2670 #[test]
2671 fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2672 let materializer = Materializer::new();
2673 let thread_name = materializer.timer_thread_name().to_owned();
2674 let linux_thread_name = thread_name.chars().take(15).collect::<String>();
2680 assert!(wait_until(Duration::from_secs(5), || {
2681 materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
2682 }));
2683 let live_timer_threads = linux_thread_count(&linux_thread_name);
2684
2685 for _ in 0..128 {
2686 materializer.schedule_once(Duration::from_secs(60), || {});
2687 }
2688
2689 assert!(
2690 wait_until(Duration::from_secs(5), || {
2691 materializer.timer_driver_is_live()
2692 && linux_thread_count(&linux_thread_name) == live_timer_threads
2693 }),
2694 "scheduling timers should not create extra timer threads for a runtime",
2695 );
2696 materializer.shutdown();
2697 assert!(wait_until(Duration::from_secs(5), || {
2698 !materializer.timer_driver_is_live()
2699 && linux_thread_count(&linux_thread_name) < live_timer_threads
2700 }));
2701 }
2702
2703 #[test]
2704 fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2705 assert_eq!(
2706 Source::repeat(1)
2707 .run_with(Sink::cancelled())
2708 .expect("cancelled sink materializes"),
2709 NotUsed
2710 );
2711 assert_eq!(
2712 Source::single(1)
2713 .run_with(Sink::never())
2714 .expect("never sink materializes")
2715 .try_wait(),
2716 None
2717 );
2718 }
2719
2720 #[test]
2721 fn never_sink_finishes_on_materializer_shutdown() {
2722 let materializer = Materializer::new();
2723 let completion = Source::single(1)
2724 .run_with_materializer(Sink::never(), &materializer)
2725 .unwrap();
2726
2727 materializer.shutdown();
2728 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2729 }
2730
2731 #[test]
2732 fn dropping_source_never_completion_releases_parked_worker() {
2733 let materializer = Materializer::new();
2734 let completion = Source::<i32>::never()
2735 .run_with_materializer(Sink::ignore(), &materializer)
2736 .unwrap();
2737
2738 assert!(wait_until(StdDuration::from_secs(1), || {
2739 materializer.active_streams() == 1
2740 }));
2741 assert_eq!(materializer.active_streams(), 1);
2742
2743 drop(completion);
2744
2745 assert!(wait_until(StdDuration::from_secs(15), || {
2746 materializer.active_streams() == 0
2747 }));
2748 assert_eq!(materializer.active_streams(), 0);
2749 }
2750
2751 #[test]
2752 fn future_and_maybe_sources_emit_values() {
2753 let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2754 assert_eq!(future_value, vec![7]);
2755
2756 let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2757 .run_collect()
2758 .unwrap();
2759 assert_eq!(future_source, vec![1, 2, 3]);
2760
2761 let (handle, source) = Source::maybe();
2762 assert_eq!(
2763 source.clone().run_collect(),
2764 Err(StreamError::MaybeIncomplete)
2765 );
2766 handle.complete(9).unwrap();
2767 assert_eq!(source.run_collect().unwrap(), vec![9]);
2768 }
2769
2770 #[test]
2771 fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
2772 assert_eq!(
2773 Source::cycle(|| [1, 2, 3].into_iter())
2774 .take(8)
2775 .run_collect()
2776 .unwrap(),
2777 vec![1, 2, 3, 1, 2, 3, 1, 2]
2778 );
2779 assert_eq!(
2780 Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
2781 Err(StreamError::Failed("empty iterator".into()))
2782 );
2783 assert_eq!(
2784 Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
2785 .run_collect()
2786 .unwrap(),
2787 vec![0, 1, 2, 3]
2788 );
2789 assert_eq!(
2790 Source::unfold_async(0, |state| async move {
2791 Ok((state < 4).then_some((state + 1, state * 2)))
2792 })
2793 .run_collect()
2794 .unwrap(),
2795 vec![0, 2, 4, 6]
2796 );
2797 assert!(matches!(
2798 Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
2799 Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
2800 ));
2801 }
2802
2803 #[test]
2804 fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
2805 let created = StdArc::new(StdAtomicUsize::new(0));
2806 let created_for_source = StdArc::clone(&created);
2807 let source = Source::<i32>::lazy_source(move || {
2808 created_for_source.fetch_add(1, StdOrdering::SeqCst);
2809 Source::from_iter([7, 8]).map_materialized_value(|_| 99)
2810 });
2811 let materializer = Materializer::new();
2812 let (mut stream, mut mat) = StdArc::clone(&source.factory)
2813 .create(&materializer)
2814 .unwrap();
2815
2816 assert_eq!(created.load(StdOrdering::SeqCst), 0);
2817 assert!(mat.try_wait().is_none());
2818 assert_eq!(stream.next().unwrap().unwrap(), 7);
2819 assert_eq!(mat.wait().unwrap(), 99);
2820 assert_eq!(created.load(StdOrdering::SeqCst), 1);
2821 assert_eq!(stream.next().unwrap().unwrap(), 8);
2822
2823 let never_created = StdArc::new(StdAtomicUsize::new(0));
2824 let never_created_for_source = StdArc::clone(&never_created);
2825 let mat = Source::<i32>::lazy_future_source(move || {
2826 never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
2827 async { Ok(Source::single(1)) }
2828 })
2829 .to(Sink::cancelled())
2830 .run()
2831 .unwrap();
2832 assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
2833 assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
2834
2835 let lazy_future = StdArc::new(StdAtomicUsize::new(0));
2836 let lazy_future_for_source = StdArc::clone(&lazy_future);
2837 let source = Source::lazy_future(move || {
2838 lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
2839 async { Ok(42) }
2840 });
2841 let (mut stream, _) = StdArc::clone(&source.factory)
2842 .create(&Materializer::new())
2843 .unwrap();
2844 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
2845 assert_eq!(stream.next().unwrap().unwrap(), 42);
2846 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
2847 }
2848
2849 #[test]
2850 fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
2851 let closed = StdArc::new(StdAtomicUsize::new(0));
2852 let closed_on_complete = StdArc::clone(&closed);
2853 let values = Source::unfold_resource(
2854 || Ok(std::collections::VecDeque::from([1, 2, 3])),
2855 |items| Ok(items.pop_front()),
2856 move |_items| {
2857 closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
2858 Ok(())
2859 },
2860 )
2861 .run_collect()
2862 .unwrap();
2863 assert_eq!(values, vec![1, 2, 3]);
2864 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2865
2866 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2867 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2868 let failed = Source::<i32>::unfold_resource(
2869 || Ok(()),
2870 |_| Err(StreamError::Failed("read".into())),
2871 move |_| {
2872 closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
2873 Err(StreamError::Failed("close".into()))
2874 },
2875 )
2876 .run_collect();
2877 assert_eq!(failed, Err(StreamError::Failed("read".into())));
2878 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2879
2880 let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
2881 let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
2882 let first = Source::unfold_resource(
2883 || Ok(0_usize),
2884 |next| {
2885 let item = *next;
2886 *next += 1;
2887 Ok(Some(item))
2888 },
2889 move |_| {
2890 closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
2891 Ok(())
2892 },
2893 )
2894 .run_with(Sink::head())
2895 .unwrap();
2896 assert_eq!(first.wait().unwrap(), 0);
2897 assert!(wait_until(Duration::from_millis(250), || {
2898 closed_on_cancel.load(StdOrdering::SeqCst) == 1
2899 }));
2900 }
2901
2902 #[test]
2903 fn wp6b_async_resource_and_async_accumulators_are_sequential() {
2904 let closed = StdArc::new(StdAtomicUsize::new(0));
2905 let closed_for_close = StdArc::clone(&closed);
2906 let values = Source::unfold_resource_async(
2907 || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
2908 |items| {
2909 let item = items.pop_front();
2910 async move { Ok(item) }
2911 },
2912 move |_items| {
2913 let closed = StdArc::clone(&closed_for_close);
2914 async move {
2915 closed.fetch_add(1, StdOrdering::SeqCst);
2916 Ok(())
2917 }
2918 },
2919 )
2920 .run_collect()
2921 .unwrap();
2922 assert_eq!(values, vec![1, 2, 3]);
2923 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
2924
2925 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
2926 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
2927 let failed = Source::<i32>::unfold_resource_async(
2928 || async { Ok(()) },
2929 |_resource| async { Err(StreamError::Failed("read".into())) },
2930 move |_resource| {
2931 let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
2932 async move {
2933 closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
2934 Err(StreamError::Failed("close".into()))
2935 }
2936 },
2937 )
2938 .run_collect();
2939 assert_eq!(failed, Err(StreamError::Failed("read".into())));
2940 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
2941
2942 let active = StdArc::new(StdAtomicUsize::new(0));
2943 let max_active = StdArc::new(StdAtomicUsize::new(0));
2944 let active_for_stage = StdArc::clone(&active);
2945 let max_for_stage = StdArc::clone(&max_active);
2946 let scanned = Source::from_iter(1..=4)
2947 .scan_async(0, move |acc, item| {
2948 let active = StdArc::clone(&active_for_stage);
2949 let max_active = StdArc::clone(&max_for_stage);
2950 async move {
2951 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
2952 max_active.fetch_max(now, StdOrdering::SeqCst);
2953 tokio::time::sleep(Duration::from_millis(1)).await;
2954 active.fetch_sub(1, StdOrdering::SeqCst);
2955 Ok(acc + item)
2956 }
2957 })
2958 .run_collect()
2959 .unwrap();
2960 assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
2961 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
2962
2963 let folded = Source::from_iter(1..=4)
2964 .fold_async(0, |acc, item| async move { Ok(acc + item) })
2965 .run_collect()
2966 .unwrap();
2967 assert_eq!(folded, vec![10]);
2968 }
2969
2970 #[test]
2971 fn wp6b_fold_async_materialization_does_not_drain_upstream() {
2972 let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
2973 let started = StdArc::new(StdAtomicBool::new(false));
2974 let source = {
2975 let release = StdArc::clone(&release);
2976 let started = StdArc::clone(&started);
2977 Source::from_factory(move || {
2978 let release = StdArc::clone(&release);
2979 let started = StdArc::clone(&started);
2980 let mut emitted = false;
2981 Box::new(std::iter::from_fn(move || {
2982 if emitted {
2983 return None;
2984 }
2985 emitted = true;
2986 started.store(true, StdOrdering::SeqCst);
2987 let (released, available) = &*release;
2988 let mut released = released.lock().unwrap();
2989 while !*released {
2990 released = available.wait(released).unwrap();
2991 }
2992 Some(Ok(1))
2993 }))
2994 })
2995 };
2996
2997 let (materialized_tx, materialized_rx) = mpsc::channel();
2998 let join = thread::spawn(move || {
2999 let queue = source
3000 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3001 .run_with(Sink::queue())
3002 .unwrap();
3003 materialized_tx.send(queue).unwrap();
3004 });
3005
3006 let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3007 Ok(queue) => queue,
3008 Err(error) => {
3009 let (released, available) = &*release;
3010 *released.lock().unwrap() = true;
3011 available.notify_all();
3012 let _ = join.join();
3013 panic!("fold_async materialization did not return before first pull: {error}");
3014 }
3015 };
3016 let (released, _) = &*release;
3017 assert!(
3018 !*released.lock().unwrap(),
3019 "test source was released before materialization returned"
3020 );
3021
3022 let (released, available) = &*release;
3023 *released.lock().unwrap() = true;
3024 available.notify_all();
3025 assert_eq!(queue.pull().unwrap(), Some(1));
3026 assert_eq!(queue.pull().unwrap(), None);
3027 join.join().unwrap();
3028 assert!(started.load(StdOrdering::SeqCst));
3029 }
3030
3031 #[test]
3032 fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3033 let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3034 let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3035 let empty_sink = Source::<i32>::empty()
3036 .run_with(Sink::lazy_sink(move || {
3037 lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3038 Sink::ignore()
3039 }))
3040 .unwrap();
3041 assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3042 assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3043
3044 let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3045 let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3046 Source::from_iter([1_usize, 2, 3])
3047 .run_with(Sink::foreach_async(2, move |item| {
3048 let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3049 async move {
3050 foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3051 Ok(())
3052 }
3053 }))
3054 .unwrap()
3055 .wait()
3056 .unwrap();
3057 assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3058
3059 let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3060 let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3061 let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3062 lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3063 Flow::identity()
3064 .map(|item: i32| item + 10)
3065 .map_materialized_value(|_| 123)
3066 });
3067 let mat = (lazy_flow.materialize)().unwrap();
3068 let mut stream = match lazy_flow.transform {
3069 flow::FlowTransform::Runtime(transform) => {
3070 transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3071 }
3072 flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3073 };
3074 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3075 assert_eq!(stream.next().unwrap().unwrap(), 11);
3076 assert_eq!(mat.wait().unwrap(), 123);
3077 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3078 assert_eq!(stream.next().unwrap().unwrap(), 12);
3079
3080 let future_flow = Source::from_iter([1, 2])
3081 .via_mat(
3082 Flow::future_flow(|| async {
3083 Ok(Flow::identity()
3084 .map(|item: i32| item * 2)
3085 .map_materialized_value(|_| 77))
3086 }),
3087 Keep::right,
3088 )
3089 .to_mat(Sink::collect(), Keep::both)
3090 .run()
3091 .unwrap();
3092 assert_eq!(future_flow.0.wait().unwrap(), 77);
3093 assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3094 }
3095
3096 #[test]
3097 fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3098 for round in 0..50 {
3103 let counter = StdArc::new(StdAtomicUsize::new(1));
3104 let factory_counter = StdArc::clone(&counter);
3105 let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3106 let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3107 Flow::identity()
3108 .map(move |x: usize| x * 100 + id)
3109 .map_materialized_value(move |_| id)
3110 });
3111 let lazy_again = lazy.clone();
3112
3113 let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3114 .via_mat(lazy, Keep::right)
3115 .via_mat(lazy_again, Keep::both)
3116 .to_mat(Sink::collect(), Keep::both)
3117 .run()
3118 .unwrap();
3119
3120 let first_id = first_mat.wait().unwrap();
3121 let second_id = second_mat.wait().unwrap();
3122 let element = out.wait().unwrap()[0];
3123 assert_eq!(
3124 element,
3125 first_id * 100 + second_id,
3126 "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3127 );
3128 assert_ne!(
3129 first_id, second_id,
3130 "round {round}: same factory instance paired twice"
3131 );
3132 }
3133 }
3134
3135 #[test]
3136 fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3137 for _ in 0..20 {
3138 let next_id = StdArc::new(StdAtomicUsize::new(0));
3139 let next_id_for_factory = StdArc::clone(&next_id);
3140 let flow = Flow::<i32, i32>::lazy_flow(move || {
3141 let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3142 Flow::identity()
3143 .map(move |item: i32| item + (id as i32 * 100))
3144 .map_materialized_value(move |_| id)
3145 });
3146 let barrier = StdArc::new(std::sync::Barrier::new(3));
3147
3148 let spawn_materialization = |input: i32| {
3149 let flow = flow.clone();
3150 let barrier = StdArc::clone(&barrier);
3151 thread::spawn(move || {
3152 barrier.wait();
3153 let (mat, values) = Source::single(input)
3154 .via_mat(flow, Keep::right)
3155 .to_mat(Sink::collect(), Keep::both)
3156 .run()
3157 .unwrap();
3158 (input, mat.wait().unwrap(), values.wait().unwrap())
3159 })
3160 };
3161
3162 let first = spawn_materialization(1);
3163 let second = spawn_materialization(2);
3164 barrier.wait();
3165
3166 for result in [first.join().unwrap(), second.join().unwrap()] {
3167 let (input, mat_id, values) = result;
3168 assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3169 }
3170 assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3171 }
3172 }
3173
3174 #[test]
3175 fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3176 let queue = Source::from_factory(|| {
3177 Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3178 })
3179 .map_with_resource(
3180 || Ok(()),
3181 |_resource, item| Ok(item + 10),
3182 |_resource| Ok(Some(99)),
3183 )
3184 .run_with(Sink::queue())
3185 .unwrap();
3186
3187 assert_eq!(queue.pull().unwrap(), Some(11));
3188 assert_eq!(queue.pull().unwrap(), Some(99));
3189 assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3190
3191 let failed: StreamResult<Vec<i32>> = Source::single(1)
3192 .map_with_resource(
3193 || Ok(()),
3194 |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3195 |_resource| -> StreamResult<Option<i32>> {
3196 Err(StreamError::Failed("close".into()))
3197 },
3198 )
3199 .run_collect();
3200 assert_eq!(failed, Err(StreamError::Failed("map".into())));
3201 }
3202
3203 #[test]
3204 fn stateful_and_terminal_source_operators_work() {
3205 let stateful = Source::from_iter([1, 2, 3])
3206 .stateful_map(0, |sum, item| {
3207 *sum += item;
3208 *sum
3209 })
3210 .run_collect()
3211 .unwrap();
3212 assert_eq!(stateful, vec![1, 3, 6]);
3213
3214 let concat = Source::from_iter([1, 2, 3])
3215 .stateful_map_concat(0, |sum, item| {
3216 *sum += item;
3217 [item, *sum]
3218 })
3219 .run_collect()
3220 .unwrap();
3221 assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3222
3223 assert_eq!(
3224 Source::from_iter([1, 2, 3])
3225 .fold(10, |acc, item| acc + item)
3226 .run_collect()
3227 .unwrap(),
3228 vec![16]
3229 );
3230 assert_eq!(
3231 Source::from_iter([1, 2, 3])
3232 .reduce(|acc, item| acc + item)
3233 .run_collect()
3234 .unwrap(),
3235 vec![6]
3236 );
3237 }
3238
3239 #[test]
3240 fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3241 let concat = Source::single(())
3242 .map_concat(|_| 0_u64..)
3243 .take(1)
3244 .run_collect()
3245 .unwrap();
3246 assert_eq!(concat, vec![0]);
3247
3248 let sliding = Source::repeat(1_u64)
3249 .sliding(2, 1)
3250 .take(1)
3251 .run_collect()
3252 .unwrap();
3253 assert_eq!(sliding, vec![vec![1, 1]]);
3254 }
3255
3256 #[test]
3257 fn fan_in_source_operators_follow_ordering_rules() {
3258 assert_eq!(
3259 Source::from_iter([1, 2])
3260 .concat(Source::from_iter([3, 4]))
3261 .run_collect()
3262 .unwrap(),
3263 vec![1, 2, 3, 4]
3264 );
3265 assert_eq!(
3266 Source::from_iter([3, 4])
3267 .prepend(Source::from_iter([1, 2]))
3268 .run_collect()
3269 .unwrap(),
3270 vec![1, 2, 3, 4]
3271 );
3272 assert_eq!(
3273 Source::empty()
3274 .or_else(Source::from_iter([10, 20]))
3275 .run_collect()
3276 .unwrap(),
3277 vec![10, 20]
3278 );
3279 assert_eq!(
3280 Source::from_iter([1, 2])
3281 .or_else(Source::from_iter([10, 20]))
3282 .run_collect()
3283 .unwrap(),
3284 vec![1, 2]
3285 );
3286 assert_eq!(
3287 Source::from_iter([1, 2, 3])
3288 .interleave(Source::from_iter([10, 11, 12]), 2)
3289 .run_collect()
3290 .unwrap(),
3291 vec![1, 2, 10, 11, 3, 12]
3292 );
3293 }
3294
3295 #[test]
3296 fn fan_in_flow_operators_compose_with_primary_stream() {
3297 let concat = Source::from_iter([1, 2])
3298 .via(Flow::identity().concat(Source::from_iter([3, 4])))
3299 .run_collect()
3300 .unwrap();
3301 assert_eq!(concat, vec![1, 2, 3, 4]);
3302
3303 let prepend = Source::from_iter([3, 4])
3304 .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3305 .run_collect()
3306 .unwrap();
3307 assert_eq!(prepend, vec![1, 2, 3, 4]);
3308
3309 let interleave = Source::from_iter([1, 2, 3])
3310 .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3311 .run_collect()
3312 .unwrap();
3313 assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3314
3315 let merge_sorted = Source::from_iter([1, 4])
3316 .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3317 .run_collect()
3318 .unwrap();
3319 assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3320
3321 let zip_latest = Source::from_iter([1, 2])
3322 .via(Flow::identity().zip_latest(Source::single(10)))
3323 .run_collect()
3324 .unwrap();
3325 assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3326
3327 let zip_latest_with = Source::from_iter([1, 2])
3328 .via(
3329 Flow::identity()
3330 .zip_latest_with(Source::single(10), false, |left, right| left + right),
3331 )
3332 .run_collect()
3333 .unwrap();
3334 assert_eq!(zip_latest_with, vec![11, 12]);
3335 }
3336
3337 #[test]
3338 fn fan_in_operators_propagate_errors_and_eager_close() {
3339 assert!(matches!(
3340 Source::failed(StreamError::Failed("boom".into()))
3341 .or_else(Source::from_iter([1, 2]))
3342 .run_collect(),
3343 Err(StreamError::Failed(_))
3344 ));
3345 assert!(matches!(
3346 Source::from_iter([1, 2])
3347 .prepend(Source::failed(StreamError::Failed("boom".into())))
3348 .run_collect(),
3349 Err(StreamError::Failed(_))
3350 ));
3351 assert_eq!(
3352 Source::from_iter([1, 2])
3353 .interleave_all([Source::empty()], 1, true)
3354 .run_collect()
3355 .unwrap(),
3356 vec![1]
3357 );
3358 }
3359
3360 #[test]
3361 fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3362 use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3363
3364 let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3365 AtomicUsize::new(0),
3366 AtomicUsize::new(0),
3367 AtomicUsize::new(0),
3368 ]);
3369
3370 let make_source = |idx: usize| {
3371 let pulls = Arc::clone(&pulls);
3372 Source::from_materialized_factory(move |_| {
3373 let pulls = Arc::clone(&pulls);
3374 let mut emitted = false;
3375 Ok((
3376 Box::new(std::iter::from_fn(move || {
3377 pulls[idx].fetch_add(1, Ordering::SeqCst);
3378 if !emitted && idx == 0 {
3379 emitted = true;
3380 Some(Ok(42))
3381 } else {
3382 None
3383 }
3384 })) as BoxStream<i32>,
3385 NotUsed,
3386 ))
3387 })
3388 };
3389
3390 let result = make_source(0)
3391 .interleave_all([make_source(1), make_source(2)], 1, false)
3392 .run_with(Sink::head());
3393
3394 assert_eq!(wait(result.unwrap()), 42);
3395 assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3396 assert_eq!(
3397 pulls[1].load(Ordering::SeqCst),
3398 0,
3399 "second input should not be pulled when downstream cancels after first element"
3400 );
3401 assert_eq!(
3402 pulls[2].load(Ordering::SeqCst),
3403 0,
3404 "third input should not be pulled before its turn"
3405 );
3406 }
3407
3408 #[test]
3409 fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3410 assert_eq!(
3411 Source::from_iter([1, 2, 3, 4])
3412 .interleave_all(
3413 [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3414 1,
3415 false
3416 )
3417 .run_collect()
3418 .unwrap(),
3419 vec![1, 10, 20, 2, 21, 3, 22, 4]
3420 );
3421 }
3422
3423 #[test]
3424 fn remaining_merge_and_zip_family_matches_expected_ordering() {
3425 assert_eq!(
3426 Source::from_iter([1, 4])
3427 .merge_sorted(Source::from_iter([2, 3, 5]))
3428 .run_collect()
3429 .unwrap(),
3430 vec![1, 2, 3, 4, 5]
3431 );
3432
3433 assert_eq!(
3434 Source::from_iter([1, 2])
3435 .merge_latest(Source::single(10), false)
3436 .run_collect()
3437 .unwrap(),
3438 vec![vec![1, 10], vec![2, 10]]
3439 );
3440
3441 assert_eq!(
3442 Source::from_iter([1, 2, 3])
3443 .merge_all([Source::from_iter([10, 11])], false)
3444 .run_collect()
3445 .unwrap(),
3446 vec![1, 10, 2, 11, 3]
3447 );
3448
3449 assert_eq!(
3450 Source::from_iter([1, 2, 3])
3451 .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3452 .run_collect()
3453 .unwrap(),
3454 vec![11, 13, 15]
3455 );
3456
3457 assert_eq!(
3458 Source::from_iter([1, 2])
3459 .zip_latest(Source::single(10))
3460 .run_collect()
3461 .unwrap(),
3462 vec![(1, 10), (2, 10)]
3463 );
3464
3465 assert_eq!(
3466 Source::from_iter([1, 2, 3])
3467 .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3468 .run_collect()
3469 .unwrap(),
3470 vec![11, 12, 13]
3471 );
3472
3473 assert_eq!(
3474 Source::from_iter([1, 2])
3475 .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3476 .run_collect()
3477 .unwrap(),
3478 vec![(1, 10), (2, 11), (-1, 12)]
3479 );
3480
3481 assert_eq!(
3482 Source::from_iter([5, 6, 7])
3483 .zip_with_index()
3484 .run_collect()
3485 .unwrap(),
3486 vec![(5, 0), (6, 1), (7, 2)]
3487 );
3488
3489 assert_eq!(
3490 Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3491 .run_collect()
3492 .unwrap(),
3493 vec![vec![1, 10], vec![2, 20]]
3494 );
3495
3496 assert_eq!(
3497 Source::zip_with_n(
3498 [
3499 Source::from_iter([1, 2]),
3500 Source::from_iter([10, 20]),
3501 Source::from_iter([100, 200]),
3502 ],
3503 |values| values.into_iter().sum::<i32>(),
3504 )
3505 .run_collect()
3506 .unwrap(),
3507 vec![111, 222]
3508 );
3509
3510 assert_eq!(
3511 Source::merge_prioritized_n(
3512 [
3513 (Source::from_iter([1, 2, 3, 4]), 2),
3514 (Source::from_iter([10, 11]), 1),
3515 ],
3516 false,
3517 )
3518 .run_collect()
3519 .unwrap(),
3520 vec![1, 2, 10, 3, 4, 11]
3521 );
3522
3523 assert_eq!(
3524 Source::combine(
3525 Source::from_iter([1, 2, 3]),
3526 Source::from_iter([10, 11]),
3527 std::iter::empty::<Source<i32, NotUsed>>(),
3528 SourceCombineStrategy::Merge {
3529 eager_complete: false,
3530 },
3531 )
3532 .run_collect()
3533 .unwrap(),
3534 vec![1, 10, 2, 11, 3]
3535 );
3536
3537 let combined_sink = Sink::combine(
3538 Sink::ignore(),
3539 Sink::ignore(),
3540 std::iter::empty::<Sink<i32, NotUsed>>(),
3541 SinkCombineStrategy::Broadcast,
3542 );
3543 assert_eq!(
3544 Source::from_iter([1, 2, 3])
3545 .run_with(combined_sink)
3546 .unwrap(),
3547 NotUsed
3548 );
3549 }
3550
3551 #[test]
3552 fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3553 let first_count = StdArc::new(StdAtomicUsize::new(0));
3557 let second_count = StdArc::new(StdAtomicUsize::new(0));
3558 let first_counter = StdArc::clone(&first_count);
3559 let second_counter = StdArc::clone(&second_count);
3560 let combined = Sink::combine(
3561 Sink::foreach(move |_: i32| {
3562 first_counter.fetch_add(1, StdOrdering::SeqCst);
3563 }),
3564 Sink::foreach(move |_: i32| {
3565 second_counter.fetch_add(1, StdOrdering::SeqCst);
3566 }),
3567 std::iter::empty::<Sink<i32, NotUsed>>(),
3568 SinkCombineStrategy::Broadcast,
3569 );
3570 assert_eq!(
3571 Source::from_iter(0..100).run_with(combined).unwrap(),
3572 NotUsed
3573 );
3574 assert!(wait_until(StdDuration::from_secs(1), || {
3579 first_count.load(StdOrdering::SeqCst) == 100
3580 && second_count.load(StdOrdering::SeqCst) == 100
3581 }));
3582 }
3583
3584 #[test]
3585 fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3586 assert_eq!(
3590 Source::from_iter(std::iter::empty::<i32>())
3591 .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3592 .run_collect()
3593 .unwrap(),
3594 Vec::<i32>::new()
3595 );
3596 assert_eq!(
3597 Source::repeat(10)
3598 .zip_latest_with(
3599 Source::from_iter(std::iter::empty::<i32>()),
3600 false,
3601 |left, right| left + right,
3602 )
3603 .run_collect()
3604 .unwrap(),
3605 Vec::<i32>::new()
3606 );
3607 }
3608
3609 #[test]
3610 fn zip_family_completion_boundaries_match_expected_results() {
3611 assert_eq!(
3612 Source::from_iter([1, 2, 3])
3613 .zip_with(Source::from_iter([10]), |left, right| left + right)
3614 .run_collect()
3615 .unwrap(),
3616 vec![11]
3617 );
3618
3619 assert_eq!(
3620 Source::from_iter([1, 2, 3])
3621 .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3622 .run_collect()
3623 .unwrap(),
3624 vec![11, 12]
3625 );
3626
3627 assert_eq!(
3628 Source::zip_n([
3629 Source::from_iter([1, 2, 3]),
3630 Source::from_iter([10]),
3631 Source::from_iter([100, 200, 300]),
3632 ])
3633 .run_collect()
3634 .unwrap(),
3635 vec![vec![1, 10, 100]]
3636 );
3637 }
3638
3639 #[test]
3640 fn combine_strategies_follow_merge_concat_and_priority_rules() {
3641 assert_eq!(
3642 Source::combine(
3643 Source::from_iter([1, 2]),
3644 Source::from_iter([10, 11]),
3645 [Source::from_iter([100])],
3646 SourceCombineStrategy::Concat,
3647 )
3648 .run_collect()
3649 .unwrap(),
3650 vec![1, 2, 10, 11, 100]
3651 );
3652
3653 assert_eq!(
3654 Source::combine(
3655 Source::from_iter([1, 2, 3, 4]),
3656 Source::from_iter([10, 11]),
3657 std::iter::empty::<Source<i32, NotUsed>>(),
3658 SourceCombineStrategy::Prioritized {
3659 priorities: vec![2, 1],
3660 eager_complete: false,
3661 },
3662 )
3663 .run_collect()
3664 .unwrap(),
3665 vec![1, 2, 10, 3, 4, 11]
3666 );
3667 }
3668
3669 #[test]
3670 fn concat_lazy_defers_follow_on_source_until_needed() {
3671 let source_counter = StdArc::new(StdAtomicUsize::new(0));
3672 let source_counter_clone = StdArc::clone(&source_counter);
3673 let lazy_source = Source::from_materialized_factory(move |_| {
3674 source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3675 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3676 });
3677 let source_head = Source::single(1)
3678 .concat_lazy(lazy_source)
3679 .run_with(Sink::head());
3680 assert_eq!(wait(source_head.unwrap()), 1);
3681 assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3682
3683 let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3684 let flow_counter_clone = StdArc::clone(&flow_counter);
3685 let lazy_flow_source = Source::from_materialized_factory(move |_| {
3686 flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3687 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3688 });
3689 let flow_head = Source::single(1)
3690 .via(Flow::identity().concat_lazy(lazy_flow_source))
3691 .run_with(Sink::head());
3692 assert_eq!(wait(flow_head.unwrap()), 1);
3693 assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3694 }
3695
3696 #[test]
3697 fn also_to_completes_when_side_sink_cancels() {
3698 assert_eq!(
3699 Source::from_iter([1, 2, 3])
3700 .also_to(Sink::cancelled())
3701 .run_collect()
3702 .unwrap(),
3703 Vec::<i32>::new()
3704 );
3705 assert_eq!(
3706 Source::from_iter([1, 2, 3])
3707 .also_to_all([Sink::cancelled(), Sink::cancelled()])
3708 .run_collect()
3709 .unwrap(),
3710 Vec::<i32>::new()
3711 );
3712 }
3713
3714 #[test]
3715 fn also_to_completes_gracefully_when_side_sink_disconnects() {
3716 let result = Source::from_iter(0..100)
3717 .also_to(Sink::head())
3718 .run_collect()
3719 .unwrap();
3720 assert!(!result.is_empty(), "main should emit at least one element");
3721 assert!(
3722 result.len() < 100,
3723 "main should complete early when side disconnects"
3724 );
3725 }
3726
3727 #[test]
3728 fn also_to_propagates_original_error_when_side_is_disconnected() {
3729 let err = StreamError::Failed("distinctive-boom".into());
3730 assert!(matches!(
3731 Source::<i32>::failed(err.clone())
3732 .also_to(Sink::cancelled())
3733 .run_collect(),
3734 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3735 ));
3736 assert!(matches!(
3737 Source::<i32>::failed(err.clone())
3738 .also_to_all([Sink::cancelled()])
3739 .run_collect(),
3740 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3741 ));
3742 assert!(matches!(
3743 Source::<i32>::failed(err)
3744 .divert_to(Sink::cancelled(), |_: &i32| true)
3745 .run_collect(),
3746 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3747 ));
3748 }
3749
3750 #[test]
3751 fn divert_to_routes_matching_elements_to_side_sink() {
3752 let diverted = Source::from_iter([1, 2, 3, 4])
3753 .divert_to(Sink::ignore(), |item| item % 2 == 0)
3754 .run_collect()
3755 .unwrap();
3756 assert_eq!(diverted, vec![1, 3]);
3757 }
3758
3759 #[test]
3760 fn wire_tap_drops_when_side_sink_backpressures() {
3761 let tapped = Source::from_iter([1, 2, 3])
3762 .wire_tap(Sink::head())
3763 .run_collect()
3764 .unwrap();
3765 assert_eq!(tapped, vec![1, 2, 3]);
3766
3767 let tapped_via_flow = Source::from_iter([1, 2, 3])
3768 .via(Flow::identity().wire_tap(Sink::head()))
3769 .run_collect()
3770 .unwrap();
3771 assert_eq!(tapped_via_flow, vec![1, 2, 3]);
3772 }
3773
3774 #[test]
3775 fn async_mapping_variants_complete() {
3776 let ordered = Source::from_iter(0..4)
3777 .map_async(2, |item| async move { Ok(item * 2) })
3778 .run_collect()
3779 .unwrap();
3780 assert_eq!(ordered, vec![0, 2, 4, 6]);
3781
3782 let unordered = Source::from_iter(0..4)
3783 .map_async_unordered(2, |item| async move { Ok(item * 2) })
3784 .run_collect()
3785 .unwrap();
3786 assert_eq!(unordered, vec![0, 2, 4, 6]);
3787
3788 let partitioned = Source::from_iter(0..4)
3789 .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
3790 .run_collect()
3791 .unwrap();
3792 assert_eq!(partitioned, vec![1, 2, 3, 4]);
3793 }
3794
3795 #[test]
3796 fn map_async_ordered_bounds_pulls_behind_stuck_head() {
3797 let pulls = StdArc::new(StdAtomicUsize::new(0));
3798 let pulls_for_source = StdArc::clone(&pulls);
3799 let probe = Source::from_fn_iter(move || {
3800 let pulls = StdArc::clone(&pulls_for_source);
3801 std::iter::from_fn(move || {
3802 let next = pulls.fetch_add(1, StdOrdering::SeqCst);
3803 Some(next)
3804 })
3805 })
3806 .map_async(2, |item| async move {
3807 if item == 0 {
3808 tokio::time::sleep(StdDuration::from_millis(300)).await;
3809 }
3810 Ok(item)
3811 })
3812 .run_with(TestSink::probe())
3813 .unwrap();
3814
3815 probe.request(16);
3816 thread::sleep(StdDuration::from_millis(100));
3817 assert!(
3818 pulls.load(StdOrdering::SeqCst) <= 3,
3819 "pulled {} elements with parallelism=2 behind a stuck ordered head",
3820 pulls.load(StdOrdering::SeqCst)
3821 );
3822 }
3823
3824 #[test]
3825 fn async_mapping_parks_until_woken_future_completes() {
3826 struct WakeOnceFuture {
3827 value: Option<u64>,
3828 ready: StdArc<StdAtomicBool>,
3829 started: bool,
3830 polls: StdArc<StdAtomicUsize>,
3831 latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
3832 }
3833
3834 impl std::future::Future for WakeOnceFuture {
3835 type Output = StreamResult<u64>;
3836
3837 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3838 let this = self.as_mut().get_mut();
3839 this.polls.fetch_add(1, StdOrdering::SeqCst);
3840 *this.latest_waker.lock().expect("latest wake slot mutex") =
3841 Some(cx.waker().clone());
3842 if this.ready.load(StdOrdering::SeqCst) {
3843 return Poll::Ready(Ok(this.value.take().unwrap()));
3844 }
3845
3846 if !this.started {
3847 this.started = true;
3848 let ready = StdArc::clone(&this.ready);
3849 let latest_waker = StdArc::clone(&this.latest_waker);
3850 thread::spawn(move || {
3851 thread::sleep(Duration::from_millis(20));
3852 ready.store(true, StdOrdering::SeqCst);
3853 if let Some(waker) =
3854 latest_waker.lock().expect("latest wake slot mutex").take()
3855 {
3856 waker.wake();
3857 }
3858 });
3859 }
3860 Poll::Pending
3861 }
3862 }
3863
3864 let polls = StdArc::new(StdAtomicUsize::new(0));
3865 let polls_for_stage = StdArc::clone(&polls);
3866 let start = Instant::now();
3867
3868 let values = Source::single(41)
3869 .map_async(1, move |item| WakeOnceFuture {
3870 value: Some(item + 1),
3871 ready: StdArc::new(StdAtomicBool::new(false)),
3872 started: false,
3873 polls: StdArc::clone(&polls_for_stage),
3874 latest_waker: StdArc::new(Mutex::new(None)),
3875 })
3876 .run_collect()
3877 .unwrap();
3878
3879 assert_eq!(values, vec![42]);
3880 let elapsed = start.elapsed();
3881 assert!(
3882 elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
3883 "pending future should park until woken once, elapsed={elapsed:?}"
3884 );
3885 assert!(
3886 polls.load(StdOrdering::SeqCst) < 4096,
3887 "pending future was repolled too aggressively"
3888 );
3889 }
3890
3891 #[test]
3892 fn async_mapping_emits_before_unbounded_upstream_finishes() {
3893 let ordered = Source::repeat(1)
3894 .map_async(2, |item| async move { Ok(item + 1) })
3895 .take(1)
3896 .run_collect()
3897 .unwrap();
3898 assert_eq!(ordered, vec![2]);
3899
3900 let unordered = Source::repeat(1)
3901 .map_async_unordered(2, |item| async move { Ok(item + 1) })
3902 .take(1)
3903 .run_collect()
3904 .unwrap();
3905 assert_eq!(unordered, vec![2]);
3906
3907 let partitioned = Source::repeat(1)
3908 .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
3909 .take(1)
3910 .run_collect()
3911 .unwrap();
3912 assert_eq!(partitioned, vec![2]);
3913 }
3914
3915 #[test]
3916 fn partitioned_async_mapping_limits_same_key_concurrency() {
3917 let active = StdArc::new(StdAtomicUsize::new(0));
3918 let max_active = StdArc::new(StdAtomicUsize::new(0));
3919 let active_for_stage = StdArc::clone(&active);
3920 let max_for_stage = StdArc::clone(&max_active);
3921
3922 let values = Source::from_iter(0..6)
3923 .map_async_partitioned(
3924 4,
3925 1,
3926 |_| 0_u8,
3927 move |item| {
3928 let active = StdArc::clone(&active_for_stage);
3929 let max_active = StdArc::clone(&max_for_stage);
3930 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3931 max_active.fetch_max(current, StdOrdering::SeqCst);
3932 async move {
3933 thread::sleep(Duration::from_millis(1));
3934 active.fetch_sub(1, StdOrdering::SeqCst);
3935 Ok(item)
3936 }
3937 },
3938 )
3939 .run_collect()
3940 .unwrap();
3941
3942 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
3943 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3944 }
3945
3946 #[test]
3947 fn partitioned_async_mapping_scans_past_blocked_pending_key() {
3948 let active = StdArc::new(StdAtomicUsize::new(0));
3949 let max_active = StdArc::new(StdAtomicUsize::new(0));
3950 let active_for_stage = StdArc::clone(&active);
3951 let max_for_stage = StdArc::clone(&max_active);
3952 let (release_tx, release_rx) = oneshot::channel::<()>();
3953 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
3954 let release_rx_for_stage = StdArc::clone(&release_rx);
3955 let max_for_release = StdArc::clone(&max_active);
3956
3957 let releaser = thread::spawn(move || {
3958 let deadline = Instant::now() + StdDuration::from_secs(1);
3959 while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
3960 thread::yield_now();
3961 }
3962 let _ = release_tx.send(());
3963 });
3964
3965 let values = Source::from_iter([0, 2, 1])
3966 .map_async_partitioned(
3967 2,
3968 1,
3969 |item| item % 2,
3970 move |item| {
3971 let active = StdArc::clone(&active_for_stage);
3972 let max_active = StdArc::clone(&max_for_stage);
3973 let release_rx = StdArc::clone(&release_rx_for_stage);
3974 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3975 max_active.fetch_max(current, StdOrdering::SeqCst);
3976 async move {
3977 if item == 0 {
3978 let receiver = release_rx
3979 .lock()
3980 .expect("release receiver mutex")
3981 .take()
3982 .expect("release receiver present");
3983 let _ = receiver.await;
3984 }
3985 active.fetch_sub(1, StdOrdering::SeqCst);
3986 Ok(item)
3987 }
3988 },
3989 )
3990 .run_collect()
3991 .unwrap();
3992 releaser.join().unwrap();
3993
3994 assert_eq!(values, vec![0, 2, 1]);
3995 assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
3996 }
3997
3998 #[test]
3999 fn partitioned_async_mapping_p1_still_evaluates_partition() {
4000 let partitions = StdArc::new(StdAtomicUsize::new(0));
4001 let partitions_for_stage = StdArc::clone(&partitions);
4002
4003 let values = Source::from_iter(0..8)
4004 .map_async_partitioned(
4005 1,
4006 1,
4007 move |item| {
4008 partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4009 item % 2
4010 },
4011 |item| async move { Ok(item + 1) },
4012 )
4013 .run_collect()
4014 .unwrap();
4015
4016 assert_eq!(values, (1..9).collect::<Vec<_>>());
4017 assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4018 }
4019
4020 #[test]
4021 fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4022 let active_by_key =
4023 StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4024 let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4025 let active_for_stage = StdArc::clone(&active_by_key);
4026 let max_for_stage = StdArc::clone(&max_by_key);
4027
4028 let values = Source::from_iter(0..512_usize)
4029 .map_async_partitioned(
4030 32,
4031 1,
4032 |item| item % 16,
4033 move |item| {
4034 let active = StdArc::clone(&active_for_stage);
4035 let max_active = StdArc::clone(&max_for_stage);
4036 let key = item % 16;
4037 let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4038 max_active[key].fetch_max(current, StdOrdering::SeqCst);
4039 async move {
4040 active[key].fetch_sub(1, StdOrdering::SeqCst);
4041 Ok(item)
4042 }
4043 },
4044 )
4045 .run_collect()
4046 .unwrap();
4047
4048 assert_eq!(values, (0..512).collect::<Vec<_>>());
4049 for max_active in max_by_key.iter() {
4050 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4051 }
4052 }
4053
4054 #[test]
4055 fn error_operators_map_recover_and_complete() {
4056 let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4057 .map_error(|_| StreamError::Failed("mapped".into()))
4058 .run_collect();
4059 assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4060
4061 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4062 .recover(|error| match error {
4063 StreamError::Failed(_) => Some(42),
4064 _ => None,
4065 })
4066 .run_collect()
4067 .unwrap();
4068 assert_eq!(recovered, vec![42]);
4069
4070 let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4071 .recover(|_| None)
4072 .run_collect();
4073 assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4074
4075 let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4076 .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4077 .run_collect()
4078 .unwrap();
4079 assert_eq!(recovered_with, vec![1, 2]);
4080
4081 let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4082 .recover_with_retries(1, |_| None)
4083 .run_collect();
4084 assert_eq!(
4085 declined_recover_with,
4086 Err(StreamError::Failed("declined".into()))
4087 );
4088
4089 let completed = Source::from_factory(|| {
4090 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4091 })
4092 .on_error_complete()
4093 .run_collect()
4094 .unwrap();
4095 assert_eq!(completed, vec![1]);
4096 }
4097
4098 #[test]
4099 fn sliding_matches_akka_window_semantics() {
4100 assert_eq!(
4102 Source::from_iter(1..=4)
4103 .sliding(3, 1)
4104 .run_collect()
4105 .unwrap(),
4106 vec![vec![1, 2, 3], vec![2, 3, 4]]
4107 );
4108 assert_eq!(
4109 Source::from_iter(1..=4)
4110 .sliding(2, 1)
4111 .run_collect()
4112 .unwrap(),
4113 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4114 );
4115 assert_eq!(
4117 Source::from_iter(1..=3)
4118 .sliding(3, 1)
4119 .run_collect()
4120 .unwrap(),
4121 vec![vec![1, 2, 3]]
4122 );
4123 assert_eq!(
4125 Source::from_iter(1..=2)
4126 .sliding(3, 1)
4127 .run_collect()
4128 .unwrap(),
4129 vec![vec![1, 2]]
4130 );
4131 assert_eq!(
4133 Source::from_iter(1..=3)
4134 .sliding(1, 1)
4135 .run_collect()
4136 .unwrap(),
4137 vec![vec![1], vec![2], vec![3]]
4138 );
4139 assert_eq!(
4141 Source::from_iter(1..=6)
4142 .sliding(2, 3)
4143 .run_collect()
4144 .unwrap(),
4145 vec![vec![1, 2], vec![4, 5]]
4146 );
4147 assert_eq!(
4149 Source::from_iter(1..=3)
4150 .sliding(2, 4)
4151 .run_collect()
4152 .unwrap(),
4153 vec![vec![1, 2]]
4154 );
4155 }
4156
4157 #[test]
4158 fn recover_with_retries_indefinitely_like_akka() {
4159 let attempts = StdArc::new(StdAtomicUsize::new(0));
4160 let attempts_in_stage = StdArc::clone(&attempts);
4161 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4164 .recover_with(move |_error| {
4165 if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4166 Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4167 } else {
4168 Some(Source::from_iter([42]))
4169 }
4170 })
4171 .run_collect()
4172 .unwrap();
4173 assert_eq!(recovered, vec![42]);
4174 assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4175 }
4176
4177 #[test]
4178 fn many_concurrent_streams_do_not_starve_the_pool() {
4179 let materializer = Materializer::new();
4188 let busy = 6_usize;
4189
4190 let mut held = Vec::with_capacity(busy);
4191 for _ in 0..busy {
4192 held.push(
4193 Source::single(1_u64)
4194 .run_with_materializer(Sink::never(), &materializer)
4195 .unwrap(),
4196 );
4197 }
4198
4199 for _ in 0..400 {
4200 if materializer.active_streams() >= busy {
4201 break;
4202 }
4203 thread::sleep(Duration::from_millis(5));
4204 }
4205 assert_eq!(materializer.active_streams(), busy);
4206
4207 let sum = Source::from_iter(0_u64..5)
4210 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4211 .unwrap();
4212 assert_eq!(sum.wait().unwrap(), 10);
4213
4214 materializer.shutdown();
4215 for completion in held {
4216 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4217 }
4218 }
4219}