1use std::{
4 collections::{BTreeMap, HashMap, VecDeque},
5 fmt,
6 future::Future,
7 hash::Hash,
8 marker::PhantomData,
9 panic::{AssertUnwindSafe, catch_unwind},
10 pin::Pin,
11 sync::{
12 Arc, Condvar, Mutex, OnceLock,
13 atomic::{AtomicBool, AtomicUsize, Ordering},
14 },
15 task::{Context, Poll},
16 thread,
17 time::Duration,
18};
19
20use futures::{channel::oneshot, executor::block_on};
21use thiserror::Error;
22use tokio::{
23 runtime::{Builder as TokioRuntimeBuilder, Runtime as TokioRuntime},
24 task::{JoinError, JoinHandle},
25};
26
27pub(crate) type BoxStream<T> = Box<dyn Iterator<Item = StreamResult<T>> + Send>;
28pub(crate) type PureTransform<In, Out> = Arc<dyn Fn(BoxStream<In>) -> BoxStream<Out> + Send + Sync>;
29pub(crate) type RuntimeTransform<In, Out> =
30 Arc<dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<BoxStream<Out>> + Send + Sync>;
31type SinkRunner<In, Mat> = dyn Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync;
32type HintedSinkRunner<In, Mat> =
33 dyn Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat> + Send + Sync;
34type RunnableGraphRunner<Mat> = dyn Fn(&Materializer) -> StreamResult<Mat> + Send + Sync;
35const STREAM_READY_SPINS: usize = 256;
36const STREAM_SPIN_BACKOFF: usize = 8;
41const STREAM_MAX_PARK: Duration = Duration::from_millis(1);
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq)]
47struct InlineMicroSourceHint {
48 max_success_items: usize,
49}
50
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52struct SourceHints {
53 inline_head_terminal: bool,
54 inline_micro: Option<InlineMicroSourceHint>,
57 terminal_consumer_batch: bool,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub(crate) struct SourceRuntimeHints {
65 pub(crate) inline_micro_max_success_items: Option<usize>,
66 pub(crate) terminal_consumer_batch: bool,
67}
68
69impl SourceHints {
70 const fn with_inline_micro(max_success_items: usize) -> Self {
71 Self {
72 inline_head_terminal: true,
73 inline_micro: Some(InlineMicroSourceHint { max_success_items }),
74 terminal_consumer_batch: true,
75 }
76 }
77
78 const fn with_terminal_consumer_batch() -> Self {
79 Self {
80 inline_head_terminal: false,
81 inline_micro: None,
82 terminal_consumer_batch: true,
83 }
84 }
85
86 fn after_flow(self, flow: FlowHints) -> Self {
87 if flow.preserves_inline_head_terminal {
88 Self {
91 inline_head_terminal: true,
92 inline_micro: None,
93 terminal_consumer_batch: self.terminal_consumer_batch
94 && flow.preserves_terminal_consumer_batch,
95 }
96 } else {
97 Self {
98 inline_head_terminal: false,
99 inline_micro: None,
100 terminal_consumer_batch: self.terminal_consumer_batch
101 && flow.preserves_terminal_consumer_batch,
102 }
103 }
104 }
105
106 fn without_inline_micro(self) -> Self {
107 Self {
108 inline_head_terminal: self.inline_head_terminal,
109 inline_micro: None,
110 terminal_consumer_batch: self.terminal_consumer_batch,
111 }
112 }
113
114 fn runtime(self) -> SourceRuntimeHints {
115 SourceRuntimeHints {
116 inline_micro_max_success_items: self.inline_micro.map(|hint| hint.max_success_items),
117 terminal_consumer_batch: self.terminal_consumer_batch,
118 }
119 }
120}
121
122#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
123struct FlowHints {
124 preserves_inline_head_terminal: bool,
125 preserves_terminal_consumer_batch: bool,
126}
127
128impl FlowHints {
129 const PRESERVES_INLINE_HEAD_TERMINAL: Self = Self {
130 preserves_inline_head_terminal: true,
131 preserves_terminal_consumer_batch: true,
132 };
133
134 const PRESERVES_TERMINAL_CONSUMER_BATCH: Self = Self {
135 preserves_inline_head_terminal: false,
136 preserves_terminal_consumer_batch: true,
137 };
138
139 fn then(self, next: Self) -> Self {
140 Self {
141 preserves_inline_head_terminal: self.preserves_inline_head_terminal
142 && next.preserves_inline_head_terminal,
143 preserves_terminal_consumer_batch: self.preserves_terminal_consumer_batch
144 && next.preserves_terminal_consumer_batch,
145 }
146 }
147}
148
149struct PartitionSlot<Key, Out> {
150 key: Option<Key>,
151 active: usize,
152 queued: VecDeque<(usize, Out)>,
153 in_ready_queue: bool,
154}
155
156struct AbortOnDropHandle<T> {
157 handle: JoinHandle<T>,
158}
159
160impl<T> AbortOnDropHandle<T> {
161 fn new(handle: JoinHandle<T>) -> Self {
162 Self { handle }
163 }
164}
165
166impl<T> Drop for AbortOnDropHandle<T> {
167 fn drop(&mut self) {
168 self.handle.abort();
169 }
170}
171
172impl<T> Future for AbortOnDropHandle<T> {
173 type Output = Result<T, JoinError>;
174
175 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 Pin::new(&mut self.handle).poll(cx)
177 }
178}
179
180impl<T> Unpin for AbortOnDropHandle<T> {}
181
182pub(crate) fn stream_tokio_runtime() -> &'static TokioRuntime {
183 static RUNTIME: OnceLock<TokioRuntime> = OnceLock::new();
184 RUNTIME.get_or_init(|| {
185 TokioRuntimeBuilder::new_multi_thread()
186 .enable_all()
187 .thread_name("datum-stream-tokio")
188 .build()
189 .expect("stream tokio runtime")
190 })
191}
192
193fn spawn_tokio_task<Fut, T>(future: Fut) -> AbortOnDropHandle<T>
194where
195 Fut: Future<Output = T> + Send + 'static,
196 T: Send + 'static,
197{
198 AbortOnDropHandle::new(stream_tokio_runtime().spawn(future))
199}
200
201pub(crate) fn current_stream_cancelled() -> Option<Arc<AtomicBool>> {
202 runtime::current_stream_cancelled()
203}
204
205pub(super) fn catch_unwind_failed<T, F>(context: &'static str, f: F) -> StreamResult<T>
206where
207 F: FnOnce() -> T,
208{
209 catch_unwind(AssertUnwindSafe(f))
210 .map_err(|_| StreamError::Failed(format!("{context} panicked")))
211}
212
213impl<Key, Out> PartitionSlot<Key, Out> {
214 fn new(key: Key) -> Self {
215 Self {
216 key: Some(key),
217 active: 0,
218 queued: VecDeque::new(),
219 in_ready_queue: false,
220 }
221 }
222}
223
224#[inline(always)]
225fn partition_slot_for<Key, Out>(
226 key: Key,
227 slots_by_key: &mut HashMap<Key, usize>,
228 slots: &mut Vec<PartitionSlot<Key, Out>>,
229 free_slots: &mut Vec<usize>,
230) -> usize
231where
232 Key: Clone + Eq + Hash,
233{
234 if let Some(slot) = slots_by_key.get(&key) {
235 return *slot;
236 }
237
238 let slot = if let Some(slot) = free_slots.pop() {
239 let state = &mut slots[slot];
240 state.key = Some(key.clone());
241 state.active = 0;
242 state.queued.clear();
243 state.in_ready_queue = false;
244 slot
245 } else {
246 slots.push(PartitionSlot::new(key.clone()));
247 slots.len() - 1
248 };
249 slots_by_key.insert(key, slot);
250 slot
251}
252
253#[inline(always)]
254fn retire_partition_slot<Key, Out>(
255 slot: usize,
256 slots_by_key: &mut HashMap<Key, usize>,
257 slots: &mut [PartitionSlot<Key, Out>],
258 free_slots: &mut Vec<usize>,
259) where
260 Key: Eq + Hash,
261{
262 let state = &mut slots[slot];
263 if let Some(key) = state.key.take() {
264 slots_by_key.remove(&key);
265 }
266 state.active = 0;
267 state.queued.clear();
268 state.in_ready_queue = false;
269 free_slots.push(slot);
270}
271
272#[inline(always)]
273fn ready_partition_slot<Key, Out>(
274 slots: &mut [PartitionSlot<Key, Out>],
275 ready_slots: &mut VecDeque<usize>,
276 slot: usize,
277 per_partition: usize,
278) {
279 if let Some(state) = slots.get_mut(slot)
280 && state.key.is_some()
281 && !state.in_ready_queue
282 && state.active < per_partition
283 && !state.queued.is_empty()
284 {
285 state.in_ready_queue = true;
286 ready_slots.push_back(slot);
287 }
288}
289
290#[inline(always)]
291fn pop_ready_partition_slot<Key, Out>(
292 slots: &mut [PartitionSlot<Key, Out>],
293 ready_slots: &mut VecDeque<usize>,
294 per_partition: usize,
295) -> Option<(usize, usize, Out)> {
296 while let Some(slot) = ready_slots.pop_front() {
297 let mut requeue = false;
298 let item = if let Some(state) = slots.get_mut(slot) {
299 state.in_ready_queue = false;
300 if state.key.is_some() && state.active < per_partition {
301 let item = state.queued.pop_front().map(|(index, item)| {
302 state.active += 1;
303 (index, slot, item)
304 });
305 if !state.queued.is_empty() && state.active < per_partition {
306 state.in_ready_queue = true;
307 requeue = true;
308 }
309 item
310 } else {
311 None
312 }
313 } else {
314 None
315 };
316
317 if requeue {
318 ready_slots.push_back(slot);
319 }
320 if item.is_some() {
321 return item;
322 }
323 }
324 None
325}
326
327pub(crate) trait SourceFactory<Out, Mat>: Send + Sync {
328 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)>;
329}
330
331struct FnSourceFactory<F>(F);
332
333impl<Out, Mat, F> SourceFactory<Out, Mat> for FnSourceFactory<F>
334where
335 F: Fn(&Materializer) -> StreamResult<(BoxStream<Out>, Mat)> + Send + Sync,
336{
337 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
338 (self.0)(materializer)
339 }
340}
341
342struct MapSourceFactory<In, Out, Mat, F> {
343 source: Arc<dyn SourceFactory<In, Mat>>,
344 stage: F,
345 _marker: PhantomData<fn(In) -> Out>,
346}
347
348impl<In, Out, Mat, F> SourceFactory<Out, Mat> for MapSourceFactory<In, Out, Mat, F>
349where
350 In: Send + 'static,
351 Out: Send + 'static,
352 Mat: Send + 'static,
353 F: Fn(In) -> Out + Send + Sync + 'static,
354{
355 fn create(self: Arc<Self>, materializer: &Materializer) -> StreamResult<(BoxStream<Out>, Mat)> {
356 let (stream, mat) = Arc::clone(&self.source).create(materializer)?;
357 Ok((
358 Box::new(MapSourceStream {
359 input: stream,
360 factory: self,
361 }),
362 mat,
363 ))
364 }
365}
366
367struct MapSourceStream<In, Out, Mat, F> {
368 input: BoxStream<In>,
369 factory: Arc<MapSourceFactory<In, Out, Mat, F>>,
370}
371
372impl<In, Out, Mat, F> Iterator for MapSourceStream<In, Out, Mat, F>
373where
374 F: Fn(In) -> Out,
375{
376 type Item = StreamResult<Out>;
377
378 fn next(&mut self) -> Option<Self::Item> {
379 self.input
380 .next()
381 .map(|item| item.map(|item| (self.factory.stage)(item)))
382 }
383}
384
385fn merge_streams<Out>(streams: Vec<BoxStream<Out>>, eager_complete: bool) -> BoxStream<Out>
386where
387 Out: Send + 'static,
388{
389 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
390 let mut current = 0usize;
391 Box::new(std::iter::from_fn(move || {
392 loop {
393 let index = next_active_optional_stream(&streams, current, |_| true)?;
394 current = (index + 1) % streams.len().max(1);
395 let Some(stream) = streams[index].as_mut() else {
396 continue;
397 };
398 match stream.next() {
399 Some(item) => return Some(item),
400 None => {
401 streams[index] = None;
402 if eager_complete {
403 return None;
404 }
405 }
406 }
407 }
408 }))
409}
410
411fn merge_prioritized_streams<Out>(
412 streams: Vec<BoxStream<Out>>,
413 priorities: Vec<usize>,
414 eager_complete: bool,
415) -> BoxStream<Out>
416where
417 Out: Send + 'static,
418{
419 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
420 let schedule: Vec<usize> = priorities
421 .into_iter()
422 .enumerate()
423 .flat_map(|(index, weight)| std::iter::repeat_n(index, weight))
424 .collect();
425 let mut schedule_index = 0usize;
426 Box::new(std::iter::from_fn(move || {
427 loop {
428 if streams.iter().all(Option::is_none) {
429 return None;
430 }
431 let index = next_weighted_stream(&streams, &schedule, &mut schedule_index)?;
432 let Some(stream) = streams[index].as_mut() else {
433 continue;
434 };
435 match stream.next() {
436 Some(item) => return Some(item),
437 None => {
438 streams[index] = None;
439 if eager_complete {
440 return None;
441 }
442 }
443 }
444 }
445 }))
446}
447
448fn merge_sorted_stream<Out>(mut left: BoxStream<Out>, mut right: BoxStream<Out>) -> BoxStream<Out>
449where
450 Out: Ord + Send + 'static,
451{
452 let mut left_next: Option<Out> = None;
453 let mut right_next: Option<Out> = None;
454 let mut left_done = false;
455 let mut right_done = false;
456 Box::new(std::iter::from_fn(move || {
457 loop {
458 if left_next.is_none() && !left_done {
459 match left.next() {
460 Some(Ok(item)) => left_next = Some(item),
461 Some(Err(error)) => return Some(Err(error)),
462 None => left_done = true,
463 }
464 }
465 if right_next.is_none() && !right_done {
466 match right.next() {
467 Some(Ok(item)) => right_next = Some(item),
468 Some(Err(error)) => return Some(Err(error)),
469 None => right_done = true,
470 }
471 }
472
473 let next = match (&left_next, &right_next) {
474 (Some(left_item), Some(right_item)) => {
475 if left_item <= right_item {
476 left_next.take()
477 } else {
478 right_next.take()
479 }
480 }
481 (Some(_), None) if right_done => left_next.take(),
482 (None, Some(_)) if left_done => right_next.take(),
483 (None, None) if left_done && right_done => return None,
484 _ => continue,
485 };
486 if let Some(item) = next {
487 return Some(Ok(item));
488 }
489 }
490 }))
491}
492
493fn merge_latest_streams<Out>(
494 streams: Vec<BoxStream<Out>>,
495 eager_complete: bool,
496) -> BoxStream<Vec<Out>>
497where
498 Out: Clone + Send + 'static,
499{
500 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
501 let mut latest = vec![None; streams.len()];
502 let mut seen = 0usize;
503 let mut current = 0usize;
504 let mut pending = VecDeque::<Vec<Out>>::new();
505 Box::new(std::iter::from_fn(move || {
506 loop {
507 if let Some(output) = pending.pop_front() {
508 return Some(Ok(output));
509 }
510 if streams.iter().all(Option::is_none) {
511 return None;
512 }
513 let index = next_active_optional_stream(&streams, current, |_| true)?;
514 current = (index + 1) % streams.len().max(1);
515 let Some(stream) = streams[index].as_mut() else {
516 continue;
517 };
518 match stream.next() {
519 Some(Ok(item)) => {
520 if latest[index].is_none() {
521 seen += 1;
522 }
523 latest[index] = Some(item);
524 if seen == latest.len() {
525 pending.push_back(
526 latest
527 .iter()
528 .map(|item| item.clone().expect("merge-latest initialized"))
529 .collect(),
530 );
531 }
532 }
533 Some(Err(error)) => return Some(Err(error)),
534 None => {
535 streams[index] = None;
536 if eager_complete {
537 return None;
538 }
539 }
540 }
541 }
542 }))
543}
544
545fn zip_streams<Left, Right>(
546 mut left: BoxStream<Left>,
547 mut right: BoxStream<Right>,
548) -> BoxStream<(Left, Right)>
549where
550 Left: Send + 'static,
551 Right: Send + 'static,
552{
553 let mut left_next: Option<Left> = None;
554 let mut right_next: Option<Right> = None;
555 let mut left_done = false;
556 let mut right_done = false;
557 Box::new(std::iter::from_fn(move || {
558 loop {
559 if left_next.is_none() && !left_done {
560 match left.next() {
561 Some(Ok(item)) => left_next = Some(item),
562 Some(Err(error)) => return Some(Err(error)),
563 None => left_done = true,
564 }
565 }
566 if right_next.is_none() && !right_done {
567 match right.next() {
568 Some(Ok(item)) => right_next = Some(item),
569 Some(Err(error)) => return Some(Err(error)),
570 None => right_done = true,
571 }
572 }
573 match (left_next.take(), right_next.take()) {
574 (Some(left_item), Some(right_item)) => return Some(Ok((left_item, right_item))),
575 (left_item, right_item) => {
576 left_next = left_item;
577 right_next = right_item;
578 if (left_done && left_next.is_none()) || (right_done && right_next.is_none()) {
579 return None;
580 }
581 }
582 }
583 }
584 }))
585}
586
587fn zip_latest_with_stream<Left, Right, Out, F>(
588 mut left: BoxStream<Left>,
589 mut right: BoxStream<Right>,
590 eager_complete: bool,
591 combine: Arc<F>,
592) -> BoxStream<Out>
593where
594 Left: Clone + Send + 'static,
595 Right: Clone + Send + 'static,
596 Out: Send + 'static,
597 F: Fn(Left, Right) -> Out + Send + Sync + 'static,
598{
599 let mut left_latest: Option<Left> = None;
600 let mut right_latest: Option<Right> = None;
601 let mut left_done = false;
602 let mut right_done = false;
603 let mut turn_left = true;
604 let mut pending = VecDeque::<Out>::new();
605
606 Box::new(std::iter::from_fn(move || {
607 loop {
608 if let Some(output) = pending.pop_front() {
609 return Some(Ok(output));
610 }
611 if eager_complete && (left_done || right_done) {
612 return None;
613 }
614 if left_done && right_done {
615 return None;
616 }
617 if (left_done && left_latest.is_none()) || (right_done && right_latest.is_none()) {
621 return None;
622 }
623
624 let pull_left = if left_done {
625 false
626 } else if right_done {
627 true
628 } else {
629 let value = turn_left;
630 turn_left = !turn_left;
631 value
632 };
633
634 if pull_left {
635 match left.next() {
636 Some(Ok(item)) => {
637 left_latest = Some(item);
638 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
639 pending.push_back(combine(left_item.clone(), right_item.clone()));
640 }
641 }
642 Some(Err(error)) => return Some(Err(error)),
643 None => {
644 left_done = true;
645 if eager_complete {
646 return None;
647 }
648 }
649 }
650 } else {
651 match right.next() {
652 Some(Ok(item)) => {
653 right_latest = Some(item);
654 if let (Some(left_item), Some(right_item)) = (&left_latest, &right_latest) {
655 pending.push_back(combine(left_item.clone(), right_item.clone()));
656 }
657 }
658 Some(Err(error)) => return Some(Err(error)),
659 None => {
660 right_done = true;
661 if eager_complete {
662 return None;
663 }
664 }
665 }
666 }
667 }
668 }))
669}
670
671fn zip_all_stream<Left, Right>(
672 mut left: BoxStream<Left>,
673 mut right: BoxStream<Right>,
674 left_fill: Left,
675 right_fill: Right,
676) -> BoxStream<(Left, Right)>
677where
678 Left: Clone + Send + 'static,
679 Right: Clone + Send + 'static,
680{
681 let mut left_done = false;
682 let mut right_done = false;
683 Box::new(std::iter::from_fn(move || {
684 if left_done && right_done {
685 return None;
686 }
687
688 let left_item = if left_done {
689 None
690 } else {
691 match left.next() {
692 Some(Ok(item)) => Some(item),
693 Some(Err(error)) => return Some(Err(error)),
694 None => {
695 left_done = true;
696 None
697 }
698 }
699 };
700 let right_item = if right_done {
701 None
702 } else {
703 match right.next() {
704 Some(Ok(item)) => Some(item),
705 Some(Err(error)) => return Some(Err(error)),
706 None => {
707 right_done = true;
708 None
709 }
710 }
711 };
712
713 match (left_item, right_item) {
714 (None, None) if left_done && right_done => None,
715 (Some(left_value), Some(right_value)) => Some(Ok((left_value, right_value))),
716 (Some(left_value), None) => Some(Ok((left_value, right_fill.clone()))),
717 (None, Some(right_value)) => Some(Ok((left_fill.clone(), right_value))),
718 (None, None) => None,
719 }
720 }))
721}
722
723fn zip_n_streams<Out, Next, F>(streams: Vec<BoxStream<Out>>, zipper: Arc<F>) -> BoxStream<Next>
724where
725 Out: Send + 'static,
726 Next: Send + 'static,
727 F: Fn(Vec<Out>) -> Next + Send + Sync + 'static,
728{
729 let count = streams.len();
730 if count == 0 {
731 return Box::new(std::iter::empty());
732 }
733 let mut streams: Vec<Option<BoxStream<Out>>> = streams.into_iter().map(Some).collect();
734 let mut slots: Vec<Option<Out>> = (0..count).map(|_| None).collect();
735 let mut current = 0usize;
736 Box::new(std::iter::from_fn(move || {
737 loop {
738 if slots.iter().all(Option::is_some) {
739 let values = slots
740 .iter_mut()
741 .map(|slot| slot.take().expect("zip-n slot filled"))
742 .collect();
743 return Some(Ok(zipper(values)));
744 }
745
746 let index = next_active_optional_stream(&streams, current, |idx| slots[idx].is_none())?;
747 current = (index + 1) % count.max(1);
748 let Some(stream) = streams[index].as_mut() else {
749 continue;
750 };
751 match stream.next() {
752 Some(Ok(item)) => slots[index] = Some(item),
753 Some(Err(error)) => return Some(Err(error)),
754 None => {
755 streams[index] = None;
756 slots[index].as_ref()?;
757 }
758 }
759 }
760 }))
761}
762
763fn next_active_optional_stream<T, F>(
764 streams: &[Option<BoxStream<T>>],
765 current: usize,
766 predicate: F,
767) -> Option<usize>
768where
769 T: Send + 'static,
770 F: Fn(usize) -> bool,
771{
772 if streams.is_empty() {
773 return None;
774 }
775 for offset in 0..streams.len() {
776 let index = (current + offset) % streams.len();
777 if streams[index].is_some() && predicate(index) {
778 return Some(index);
779 }
780 }
781 None
782}
783
784fn next_weighted_stream<T>(
785 streams: &[Option<BoxStream<T>>],
786 schedule: &[usize],
787 schedule_index: &mut usize,
788) -> Option<usize>
789where
790 T: Send + 'static,
791{
792 if streams.is_empty() || schedule.is_empty() {
793 return None;
794 }
795 for _ in 0..schedule.len() {
796 let index = schedule[*schedule_index % schedule.len()];
797 *schedule_index = (*schedule_index + 1) % schedule.len();
798 if streams.get(index).is_some_and(Option::is_some) {
799 return Some(index);
800 }
801 }
802 None
803}
804
805pub(crate) mod async_boundary;
806mod completion;
807mod error;
808mod flow;
809mod rate;
810mod restart;
811mod runtime;
812mod sink;
813mod source;
814mod time;
815mod timer;
816
817pub(crate) trait SplitSegmentHookDyn: Send + Sync + 'static {
821 fn as_any_arc(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync>;
822}
823
824pub(crate) trait TerminalSourceHookDyn<In>: Send + Sync + 'static {
830 fn drain_terminal_batch(
831 &self,
832 materializer: &Materializer,
833 cancelled: &Arc<AtomicBool>,
834 batch: &mut Vec<In>,
835 ) -> StreamResult<TerminalSourceStatus>;
836
837 fn supports_direct_terminal(&self) -> bool {
838 false
839 }
840
841 fn try_register_direct_terminal(
842 &self,
843 _consumer: Box<dyn TerminalSinkConsumerDyn<In>>,
844 _cancelled: Arc<AtomicBool>,
845 ) -> Option<StreamResult<()>> {
846 None
847 }
848
849 fn cancel_terminal(&self) {}
850}
851
852#[derive(Clone, Copy, Debug, PartialEq, Eq)]
853pub(crate) enum TerminalSourceStatus {
854 Active,
855 Completed,
856}
857
858pub(crate) trait TerminalSinkConsumerDyn<In>: Send + 'static {
859 fn on_item(&mut self, item: In) -> StreamResult<()>;
860 fn finish(self: Box<Self>, result: StreamResult<()>);
861}
862
863pub(crate) trait FoldFastPathDyn<In: Send + 'static>: Send + Sync + 'static {
866 fn try_register(
870 &self,
871 hook: Arc<dyn SplitSegmentHookDyn>,
872 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>>;
873
874 fn supports_terminal_drain(&self) -> bool {
875 false
876 }
877
878 fn try_register_direct_terminal(
879 &self,
880 _hook: Arc<dyn TerminalSourceHookDyn<In>>,
881 _materializer: &Materializer,
882 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
883 None
884 }
885
886 fn try_register_terminal_drain(
887 &self,
888 _hook: Arc<dyn TerminalSourceHookDyn<In>>,
889 _materializer: &Materializer,
890 ) -> Option<StreamResult<Box<dyn std::any::Any + Send>>> {
891 None
892 }
893}
894
895use self::runtime::{runtime_checked_stream, set_current_stream_cancelled};
896
897pub(crate) use self::completion::StreamCancellation;
898
899pub use self::{
900 completion::{Cancellable, StreamCompletion},
901 error::{StreamError, StreamResult, Supervision, SupervisionDecider, SupervisionDirective},
902 flow::{BidiFlow, Flow},
903 rate::{AggregateTimer, OverflowStrategy},
904 restart::{RestartFlow, RestartSettings, RestartSink, RestartSource, RetryFlow},
905 runtime::{Materializer, Runtime},
906 sink::{RunnableGraph, Sink, SinkCombineStrategy},
907 source::{Demand, Keep, MaybeHandle, NotUsed, PushOutlet, Source, SourceCombineStrategy},
908 time::{DelayOverflowStrategy, ThrottleMode},
909};
910
911#[cfg(test)]
912mod tests {
913 use super::*;
914 use crate::Attributes;
915 use crate::testkit::TestSink;
916 use std::fs;
917 use std::sync::{
918 Arc as StdArc,
919 atomic::{
920 AtomicBool as StdAtomicBool, AtomicUsize as StdAtomicUsize, Ordering as StdOrdering,
921 },
922 mpsc,
923 };
924 use std::time::Duration as StdDuration;
925 use std::time::Instant;
926
927 fn wait<T>(completion: StreamCompletion<T>) -> T {
928 completion.wait().unwrap()
929 }
930
931 fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
932 let deadline = Instant::now() + timeout;
933 while Instant::now() < deadline {
934 if condition() {
935 return true;
936 }
937 thread::sleep(Duration::from_millis(2));
938 }
939 condition()
940 }
941
942 fn linux_thread_count(thread_name: &str) -> usize {
943 fs::read_dir("/proc/self/task")
944 .expect("task directory readable")
945 .filter_map(Result::ok)
946 .filter_map(|entry| fs::read_to_string(entry.path().join("comm")).ok())
947 .filter(|name| name.trim() == thread_name)
948 .count()
949 }
950
951 #[test]
952 fn source_async_boundary_preserves_results() {
953 let expected = Source::from_iter(0_u64..128)
954 .map(|item| item.wrapping_add(1))
955 .filter(|item| item % 3 != 0)
956 .map(|item| item * 2)
957 .run_collect()
958 .unwrap();
959
960 let actual = Source::from_iter(0_u64..128)
961 .map(|item| item.wrapping_add(1))
962 .async_boundary()
963 .filter(|item| item % 3 != 0)
964 .map(|item| item * 2)
965 .run_collect()
966 .unwrap();
967
968 assert_eq!(actual, expected);
969 }
970
971 #[test]
972 fn flow_async_boundary_preserves_results() {
973 let expected = Source::from_iter(0_u64..128)
974 .map(|item| item + 1)
975 .map(|item| item * 3)
976 .run_collect()
977 .unwrap();
978
979 let flow = Flow::identity()
980 .map(|item: u64| item + 1)
981 .r#async()
982 .map(|item| item * 3);
983 let actual = Source::from_iter(0_u64..128)
984 .via(flow)
985 .run_collect()
986 .unwrap();
987
988 assert_eq!(actual, expected);
989 }
990
991 #[test]
992 fn linear_async_boundary_matches_graph_async_boundary_shape() {
993 use crate::{
994 AsyncBoundary, AsyncBoundaryExecutionConfig, FusedExecutionConfig, GraphDsl,
995 GraphFlowShape, MapStage,
996 };
997
998 let graph = GraphDsl::try_create(|builder| {
999 let first = builder.add(MapStage::new(|item: u64| item + 1));
1000 let boundary = builder.add(AsyncBoundary::<u64>::new());
1001 let second = builder.add(MapStage::new(|item: u64| item * 2));
1002
1003 builder.connect(first.outlet(), boundary.inlet())?;
1004 builder.connect(boundary.outlet(), second.inlet())?;
1005
1006 Ok(GraphFlowShape::new(first.inlet(), second.outlet()))
1007 })
1008 .unwrap();
1009
1010 let linear = Source::from_iter(1_u64..=4)
1011 .map(|item| item + 1)
1012 .async_boundary_with_buffer(4)
1013 .map(|item| item * 2)
1014 .run_collect()
1015 .unwrap();
1016 let graph_output = graph.run_with_input(1_u64..=4).unwrap();
1017 let report = graph
1018 .run_async_boundary_count_with_input_report(
1019 1_u64..=4,
1020 AsyncBoundaryExecutionConfig {
1021 fused: FusedExecutionConfig { event_limit: 1024 },
1022 buffer_size: 4,
1023 },
1024 )
1025 .unwrap();
1026
1027 assert_eq!(linear, graph_output);
1028 assert_eq!(report.result, linear.len());
1029 assert_eq!(report.async_boundary_crossings, linear.len());
1030 }
1031
1032 #[test]
1033 fn async_boundary_regions_run_concurrently() {
1034 let (upstream_tx, upstream_rx) = mpsc::channel::<u64>();
1035 let (downstream_blocked_tx, downstream_blocked_rx) = mpsc::channel::<()>();
1036 let (release_tx, release_rx) = mpsc::channel::<()>();
1037 let release_rx = StdArc::new(Mutex::new(release_rx));
1038
1039 let completion = Source::from_iter(0_u64..3)
1040 .map(move |item| {
1041 upstream_tx.send(item).expect("upstream probe receives");
1042 item
1043 })
1044 .async_boundary_with_buffer(1)
1045 .map({
1046 let release_rx = StdArc::clone(&release_rx);
1047 move |item| {
1048 if item == 0 {
1049 downstream_blocked_tx
1050 .send(())
1051 .expect("downstream probe receives");
1052 release_rx
1053 .lock()
1054 .expect("release receiver lock")
1055 .recv_timeout(StdDuration::from_secs(2))
1056 .expect("downstream release arrives");
1057 }
1058 item
1059 }
1060 })
1061 .run_with(Sink::collect())
1062 .unwrap();
1063
1064 assert_eq!(
1065 downstream_blocked_rx.recv_timeout(StdDuration::from_secs(2)),
1066 Ok(())
1067 );
1068 assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1069 assert_eq!(upstream_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1070
1071 release_tx.send(()).expect("release downstream");
1072 assert_eq!(completion.wait().unwrap(), vec![0, 1, 2]);
1073 }
1074
1075 #[test]
1076 fn async_boundary_backpressures_slow_downstream() {
1077 let (produced_tx, produced_rx) = mpsc::channel::<u64>();
1078 let (release_tx, release_rx) = mpsc::channel::<()>();
1079 let release_rx = StdArc::new(Mutex::new(release_rx));
1080
1081 let completion = Source::from_iter(0_u64..8)
1082 .map(move |item| {
1083 produced_tx.send(item).expect("producer probe receives");
1084 item
1085 })
1086 .async_boundary_with_buffer(1)
1087 .map({
1088 let release_rx = StdArc::clone(&release_rx);
1089 move |item| {
1090 if item == 0 {
1091 release_rx
1092 .lock()
1093 .expect("release receiver lock")
1094 .recv_timeout(StdDuration::from_secs(2))
1095 .expect("downstream release arrives");
1096 }
1097 item
1098 }
1099 })
1100 .run_with(Sink::collect())
1101 .unwrap();
1102
1103 assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(0));
1104 assert_eq!(produced_rx.recv_timeout(StdDuration::from_secs(2)), Ok(1));
1105 if let Ok(item) = produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1106 assert_eq!(item, 2);
1107 }
1108 match produced_rx.recv_timeout(StdDuration::from_millis(100)) {
1109 Err(mpsc::RecvTimeoutError::Timeout) => {}
1110 other => panic!("async boundary handoff was not bounded: {other:?}"),
1111 }
1112
1113 release_tx.send(()).expect("release downstream");
1114 assert_eq!(completion.wait().unwrap(), (0_u64..8).collect::<Vec<_>>());
1115 }
1116
1117 #[test]
1118 fn source_blueprints_are_reusable() {
1119 let source = Source::from_iter(0..5).map(|item| item + 1);
1120
1121 assert_eq!(source.clone().run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1122 assert_eq!(source.run_collect().unwrap(), vec![1, 2, 3, 4, 5]);
1123 }
1124
1125 #[test]
1126 fn source_map_preserves_materialized_value() {
1127 let graph = Source::single(1)
1128 .map_materialized_value(|_| "source")
1129 .map(|item| item + 1)
1130 .to_mat(Sink::head(), Keep::both);
1131
1132 let materialized = graph.run().unwrap();
1133 assert_eq!(materialized.0, "source");
1134 assert_eq!(wait(materialized.1), 2);
1135 }
1136
1137 #[test]
1138 fn source_and_flow_compose() {
1139 let flow = Flow::identity()
1140 .map(|item: i32| item * 2)
1141 .filter(|item| item % 3 == 0);
1142
1143 let result = Source::from_iter(0..8).via(flow).run_collect().unwrap();
1144
1145 assert_eq!(result, vec![0, 6, 12]);
1146 }
1147
1148 #[test]
1149 fn sink_setup_sees_materializer_defaults_and_local_attributes() {
1150 let observed = StdArc::new(Mutex::new(None));
1151 let observed_in_setup = StdArc::clone(&observed);
1152 let sink = Sink::<i32, StreamCompletion<NotUsed>>::setup(move |_materializer, attrs| {
1153 *observed_in_setup.lock().unwrap() = Some((
1154 attrs.name().map(str::to_owned),
1155 attrs.input_buffer_hint(),
1156 attrs.dispatcher_hint().map(str::to_owned),
1157 ));
1158 Sink::ignore()
1159 })
1160 .add_attributes(Attributes::named("sink-inner"))
1161 .add_attributes(Attributes::input_buffer(4, 4))
1162 .add_attributes(Attributes::dispatcher("bench-dispatcher"));
1163
1164 let materializer = Materializer::new().with_attributes(Attributes::named("mat-outer"));
1165 wait(
1166 Source::from_iter([1, 2, 3])
1167 .run_with_materializer(sink, &materializer)
1168 .unwrap(),
1169 );
1170
1171 assert_eq!(
1172 *observed.lock().unwrap(),
1173 Some((
1174 Some("sink-inner".to_owned()),
1175 Some((4, 4)),
1176 Some("bench-dispatcher".to_owned())
1177 ))
1178 );
1179 }
1180
1181 #[test]
1182 fn sink_pre_materialize_feeds_existing_materialization() {
1183 let materializer = Materializer::new();
1184 let (completion, pre) = Sink::<i32, StreamCompletion<Vec<i32>>>::collect()
1185 .pre_materialize(&materializer)
1186 .unwrap();
1187
1188 Source::from_iter([1, 2, 3])
1189 .run_with_materializer(pre, &materializer)
1190 .unwrap();
1191
1192 assert_eq!(wait(completion), vec![1, 2, 3]);
1193 }
1194
1195 #[test]
1196 fn flow_from_sink_and_source_connects_both_sides() {
1197 assert_eq!(
1198 Source::from_iter([1, 2, 3])
1199 .via(Flow::from_sink_and_source(
1200 Sink::foreach(|_item: i32| {}),
1201 Source::from_iter([10, 20, 30]),
1202 ))
1203 .run_collect()
1204 .unwrap(),
1205 vec![10, 20, 30]
1206 );
1207 }
1208
1209 #[test]
1210 fn from_sink_and_source_keeps_sink_running_after_source_side_completes() {
1211 let completed = StdArc::new(StdAtomicBool::new(false));
1212 let on_complete = StdArc::clone(&completed);
1213 let flow = Flow::from_sink_and_source(
1214 Sink::on_complete(move || {
1215 on_complete.store(true, StdOrdering::SeqCst);
1216 }),
1217 Source::single(10),
1218 );
1219
1220 let result = Source::from_iter([1, 2, 3])
1221 .via(flow)
1222 .run_collect()
1223 .unwrap();
1224
1225 assert_eq!(result, vec![10]);
1226 assert!(wait_until(StdDuration::from_secs(1), || {
1227 completed.load(StdOrdering::SeqCst)
1228 }));
1229 }
1230
1231 #[test]
1232 fn from_sink_and_source_coupled_cancels_source_when_sink_finishes_first() {
1233 let cancellable = StdArc::new(Mutex::new(None));
1234 let observed = StdArc::clone(&cancellable);
1235 let flow = Flow::from_sink_and_source_coupled(
1236 Sink::ignore(),
1237 Source::tick(
1238 StdDuration::from_millis(50),
1239 StdDuration::from_millis(50),
1240 10,
1241 )
1242 .map_materialized_value(move |handle| {
1243 *observed.lock().unwrap() = Some(handle.clone());
1244 handle
1245 }),
1246 );
1247
1248 let completion = Source::from_iter(std::iter::empty::<i32>())
1249 .via(flow)
1250 .run_with(Sink::ignore())
1251 .unwrap();
1252 assert!(wait_until(StdDuration::from_secs(1), || {
1253 cancellable
1254 .lock()
1255 .unwrap()
1256 .as_ref()
1257 .is_some_and(Cancellable::is_cancelled)
1258 }));
1259 assert_eq!(wait(completion), NotUsed);
1260 }
1261
1262 #[test]
1263 fn bidi_flow_join_and_atop_compose() {
1264 let codec = BidiFlow::from_flows(
1265 Flow::identity().map(|item: i32| item + 1),
1266 Flow::identity().map(|item: i32| item * 2),
1267 )
1268 .named("codec");
1269 let framing = BidiFlow::from_flows(
1270 Flow::identity().map(|item: i32| item * 3),
1271 Flow::identity().map(|item: i32| item - 4),
1272 );
1273
1274 let joined = codec
1275 .clone()
1276 .join(Flow::identity().map(|item: i32| item - 5));
1277 let stacked = codec.atop(framing).join(Flow::identity());
1278
1279 assert_eq!(
1280 Source::single(10).via(joined).run_collect().unwrap(),
1281 vec![12]
1282 );
1283 assert_eq!(
1284 Source::single(10).via(stacked).run_collect().unwrap(),
1285 vec![58]
1286 );
1287 }
1288
1289 #[test]
1290 fn flow_buffer_then_map_runs_end_to_end() {
1291 let flow = Flow::identity()
1292 .buffer(8, OverflowStrategy::Backpressure)
1293 .map(|item: i32| item + 1);
1294
1295 let result = Source::from_iter(0..4).via(flow).run_collect().unwrap();
1296
1297 assert_eq!(result, vec![1, 2, 3, 4]);
1298 }
1299
1300 #[test]
1301 fn public_flow_combinators_preserve_runtime_transform_after_buffer() {
1302 fn buffered_flow() -> Flow<i32, i32> {
1303 Flow::identity().buffer(8, OverflowStrategy::Backpressure)
1304 }
1305
1306 assert_eq!(
1307 Source::from_iter(0..4)
1308 .via(buffered_flow().filter(|item| *item % 2 == 0))
1309 .run_collect()
1310 .unwrap(),
1311 vec![0, 2]
1312 );
1313 assert_eq!(
1314 Source::from_iter(0..4)
1315 .via(buffered_flow().filter_not(|item| *item % 2 == 0))
1316 .run_collect()
1317 .unwrap(),
1318 vec![1, 3]
1319 );
1320 assert_eq!(
1321 Source::from_iter(0..4)
1322 .via(buffered_flow().filter_map(|item| (item % 2 == 0).then_some(item + 10)))
1323 .run_collect()
1324 .unwrap(),
1325 vec![10, 12]
1326 );
1327 assert_eq!(
1328 Source::from_iter(0..3)
1329 .via(buffered_flow().map_concat(|item| [item, item + 10]))
1330 .run_collect()
1331 .unwrap(),
1332 vec![0, 10, 1, 11, 2, 12]
1333 );
1334 assert_eq!(
1335 Source::from_iter(0..3)
1336 .via(buffered_flow().stateful_map(5, |state, item| {
1337 *state += item;
1338 *state
1339 }))
1340 .run_collect()
1341 .unwrap(),
1342 vec![5, 6, 8]
1343 );
1344 assert_eq!(
1345 Source::from_iter(0..3)
1346 .via(buffered_flow().stateful_map_concat(0, |state, item| {
1347 *state += item;
1348 [*state, item]
1349 }))
1350 .run_collect()
1351 .unwrap(),
1352 vec![0, 0, 1, 1, 3, 2]
1353 );
1354 assert_eq!(
1355 Source::from_iter(0..4)
1356 .via(buffered_flow().map_async(2, |item| async move { Ok(item + 1) }))
1357 .run_collect()
1358 .unwrap(),
1359 vec![1, 2, 3, 4]
1360 );
1361 assert_eq!(
1362 Source::from_iter(0..4)
1363 .via(buffered_flow().map_async_unordered(2, |item| async move { Ok(item + 1) }))
1364 .run_collect()
1365 .unwrap(),
1366 vec![1, 2, 3, 4]
1367 );
1368 assert_eq!(
1369 Source::from_iter(0..4)
1370 .via(buffered_flow().map_async_partitioned(
1371 2,
1372 1,
1373 |item| item % 2,
1374 |item| async move { Ok(item + 1) },
1375 ))
1376 .run_collect()
1377 .unwrap(),
1378 vec![1, 2, 3, 4]
1379 );
1380 assert_eq!(
1381 Source::from_iter(0..5)
1382 .via(buffered_flow().take(3))
1383 .run_collect()
1384 .unwrap(),
1385 vec![0, 1, 2]
1386 );
1387 assert_eq!(
1388 Source::from_iter(0..5)
1389 .via(buffered_flow().drop(2))
1390 .run_collect()
1391 .unwrap(),
1392 vec![2, 3, 4]
1393 );
1394 assert_eq!(
1395 Source::from_iter(0..5)
1396 .via(buffered_flow().take_while(|item| *item < 3))
1397 .run_collect()
1398 .unwrap(),
1399 vec![0, 1, 2]
1400 );
1401 assert_eq!(
1402 Source::from_iter(0..5)
1403 .via(buffered_flow().drop_while(|item| *item < 3))
1404 .run_collect()
1405 .unwrap(),
1406 vec![3, 4]
1407 );
1408 assert_eq!(
1409 Source::from_iter(0..3)
1410 .via(buffered_flow().limit(5))
1411 .run_collect()
1412 .unwrap(),
1413 vec![0, 1, 2]
1414 );
1415 assert_eq!(
1416 Source::from_iter(0..5)
1417 .via(buffered_flow().grouped(2))
1418 .run_collect()
1419 .unwrap(),
1420 vec![vec![0, 1], vec![2, 3], vec![4]]
1421 );
1422 assert_eq!(
1423 Source::from_iter(1..=3)
1424 .via(buffered_flow().scan(0, |acc, item| acc + item))
1425 .run_collect()
1426 .unwrap(),
1427 vec![0, 1, 3, 6]
1428 );
1429 assert_eq!(
1430 Source::from_iter(1..=4)
1431 .via(buffered_flow().sliding(2, 1))
1432 .run_collect()
1433 .unwrap(),
1434 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
1435 );
1436 assert_eq!(
1437 Source::from_iter(1..=4)
1438 .via(buffered_flow().fold(0, |acc, item| acc + item))
1439 .run_collect()
1440 .unwrap(),
1441 vec![10]
1442 );
1443 assert_eq!(
1444 Source::from_iter(1..=4)
1445 .via(buffered_flow().reduce(|acc, item| acc + item))
1446 .run_collect()
1447 .unwrap(),
1448 vec![10]
1449 );
1450 assert_eq!(
1451 Source::from_factory(|| {
1452 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1453 })
1454 .via(buffered_flow().map_error(|_| StreamError::Failed("mapped".into())))
1455 .run_collect(),
1456 Err(StreamError::Failed("mapped".into()))
1457 );
1458 assert_eq!(
1459 Source::<i32>::failed(StreamError::Failed("boom".into()))
1460 .via(buffered_flow().recover(|_| Some(42)))
1461 .run_collect()
1462 .unwrap(),
1463 vec![42]
1464 );
1465 assert_eq!(
1466 Source::<i32>::failed(StreamError::Failed("boom".into()))
1467 .via(buffered_flow().recover_with(|_| Some(Source::from_iter([7, 8]))))
1468 .run_collect()
1469 .unwrap(),
1470 vec![7, 8]
1471 );
1472 assert_eq!(
1473 Source::<i32>::failed(StreamError::Failed("boom".into()))
1474 .via(buffered_flow().recover_with_retries(1, |_| Some(Source::from_iter([9]))))
1475 .run_collect()
1476 .unwrap(),
1477 vec![9]
1478 );
1479 assert_eq!(
1480 Source::from_factory(|| {
1481 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
1482 })
1483 .via(buffered_flow().on_error_complete())
1484 .run_collect()
1485 .unwrap(),
1486 vec![1]
1487 );
1488
1489 let materialized = Source::from_iter([1, 2, 3])
1490 .run_with(
1491 buffered_flow()
1492 .via(Flow::identity().map(|item| item + 1))
1493 .map_materialized_value(|_| "buffered-flow")
1494 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
1495 )
1496 .unwrap();
1497 assert_eq!(materialized.0, "buffered-flow");
1498 assert_eq!(wait(materialized.1), 9);
1499
1500 let kept = Source::from_iter([1, 2, 3])
1501 .run_with(
1502 buffered_flow()
1503 .via_mat_with(Flow::identity().map(|item| item + 1), |_, _| "combined")
1504 .to(Sink::fold(0, |acc, item| acc + item)),
1505 )
1506 .unwrap();
1507 assert_eq!(kept, "combined");
1508 }
1509
1510 #[test]
1511 fn runtime_rate_flows_compose_in_flow_form() {
1512 let conflate = Flow::identity()
1513 .conflate(|left: i32, right| left + right)
1514 .map(|item| item + 1);
1515 assert_eq!(
1516 Source::single(4).via(conflate).run_collect().unwrap(),
1517 vec![5]
1518 );
1519
1520 let batch = Flow::identity()
1521 .batch(4, |item: i32| item, |left, right| left + right)
1522 .map(|item| item + 1);
1523 assert_eq!(Source::single(4).via(batch).run_collect().unwrap(), vec![5]);
1524
1525 let expand = Flow::identity()
1526 .expand(std::iter::once::<i32>)
1527 .map(|item| item + 1);
1528 assert_eq!(
1529 Source::from_iter(0..4).via(expand).run_collect().unwrap(),
1530 vec![1, 2, 3, 4]
1531 );
1532
1533 let aggregate = Flow::identity()
1534 .aggregate_with_boundary(
1535 Vec::<i32>::new,
1536 |mut items, item| {
1537 items.push(item);
1538 let ready = !items.is_empty();
1539 (items, ready)
1540 },
1541 |items| items.into_iter().sum::<i32>(),
1542 None,
1543 )
1544 .map(|item| item + 1);
1545 assert_eq!(
1546 Source::from_iter(0..4)
1547 .via(aggregate)
1548 .run_collect()
1549 .unwrap(),
1550 vec![1, 2, 3, 4]
1551 );
1552
1553 let detached = Flow::identity().detach().map(|item: i32| item + 1);
1554 assert_eq!(
1555 Source::from_iter(0..4).via(detached).run_collect().unwrap(),
1556 vec![1, 2, 3, 4]
1557 );
1558 }
1559
1560 #[test]
1561 fn high_use_source_flow_operators_work() {
1562 let result = Source::from_iter(0..8)
1563 .drop(1)
1564 .take(5)
1565 .filter_not(|item| item % 2 == 0)
1566 .map_concat(|item| [item, item + 10])
1567 .grouped(3)
1568 .run_collect()
1569 .unwrap();
1570
1571 assert_eq!(result, vec![vec![1, 11, 3], vec![13, 5, 15]]);
1572 }
1573
1574 #[test]
1575 fn prefix_and_tail_emits_prefix_and_live_tail() {
1576 let mut outer = Source::from_iter(0..5)
1577 .prefix_and_tail(2)
1578 .run_collect()
1579 .unwrap();
1580 assert_eq!(outer.len(), 1);
1581 let (prefix, tail) = outer.pop().unwrap();
1582 assert_eq!(prefix, vec![0, 1]);
1583 assert_eq!(tail.clone().run_collect().unwrap(), vec![2, 3, 4]);
1584 assert_eq!(
1585 tail.run_collect(),
1586 Err(StreamError::Failed(
1587 "substream source cannot be materialized more than once".into()
1588 ))
1589 );
1590 }
1591
1592 #[test]
1593 fn prefix_and_tail_fails_before_prefix_is_ready() {
1594 let result = Source::from_factory(|| {
1595 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into())), Ok(2)].into_iter())
1596 })
1597 .prefix_and_tail(2)
1598 .run_collect();
1599 assert!(matches!(result, Err(StreamError::Failed(message)) if message == "boom"));
1600 }
1601
1602 #[test]
1603 fn prefix_and_tail_tail_propagates_late_upstream_failure() {
1604 let mut outer = Source::from_factory(|| {
1605 Box::new(vec![Ok(1), Ok(2), Err(StreamError::Failed("boom".into())), Ok(3)].into_iter())
1606 })
1607 .prefix_and_tail(2)
1608 .run_collect()
1609 .unwrap();
1610 let (prefix, tail) = outer.pop().unwrap();
1611 assert_eq!(prefix, vec![1, 2]);
1612 assert_eq!(tail.run_collect(), Err(StreamError::Failed("boom".into())));
1613 }
1614
1615 #[test]
1616 fn prefix_and_tail_accepts_non_clone_elements() {
1617 #[derive(Debug, PartialEq, Eq)]
1618 struct NonClone(u8);
1619
1620 let mut outer = Source::from_factory(|| {
1621 Box::new(vec![Ok(NonClone(1)), Ok(NonClone(2)), Ok(NonClone(3))].into_iter())
1622 })
1623 .prefix_and_tail(2)
1624 .run_collect()
1625 .unwrap();
1626 let (prefix, tail) = outer.pop().unwrap();
1627 assert_eq!(prefix, vec![NonClone(1), NonClone(2)]);
1628 assert_eq!(tail.run_collect().unwrap(), vec![NonClone(3)]);
1629 }
1630
1631 #[test]
1632 fn flat_map_prefix_materializes_on_short_upstream_completion() {
1633 let values = Source::from_iter([1, 2])
1634 .flat_map_prefix(3, |prefix| {
1635 let sum = prefix.into_iter().sum::<i32>();
1636 Flow::identity().prepend(Source::single(sum))
1637 })
1638 .run_collect()
1639 .unwrap();
1640 assert_eq!(values, vec![3]);
1641 }
1642
1643 #[test]
1644 fn flat_map_prefix_does_not_materialize_on_early_upstream_failure() {
1645 let invoked = StdArc::new(StdAtomicBool::new(false));
1646 let invoked_for_stage = StdArc::clone(&invoked);
1647 let result = Source::from_factory(|| {
1648 Box::new(vec![Ok(1), Err(StreamError::Failed("boom".into()))].into_iter())
1649 })
1650 .flat_map_prefix(3, move |_prefix| {
1651 invoked_for_stage.store(true, StdOrdering::SeqCst);
1652 Flow::identity()
1653 })
1654 .run_collect();
1655 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1656 assert!(!invoked.load(StdOrdering::SeqCst));
1657 }
1658
1659 #[test]
1660 fn flat_map_concat_flattens_nested_sources_sequentially() {
1661 let values = Source::from_iter([1, 2, 3])
1662 .flat_map_concat(|item| Source::from_iter(0..item))
1663 .run_collect()
1664 .unwrap();
1665 assert_eq!(values, vec![0, 0, 1, 0, 1, 2]);
1666 }
1667
1668 #[test]
1669 fn flat_map_merge_respects_breadth_bound() {
1670 let active = StdArc::new(StdAtomicUsize::new(0));
1671 let max_active = StdArc::new(StdAtomicUsize::new(0));
1672 let active_for_stage = StdArc::clone(&active);
1673 let max_for_stage = StdArc::clone(&max_active);
1674
1675 let mut values = Source::from_iter(0..6)
1676 .flat_map_merge(2, move |item| {
1677 let active = StdArc::clone(&active_for_stage);
1678 let max_active = StdArc::clone(&max_for_stage);
1679 Source::future(move || {
1680 let active = StdArc::clone(&active);
1681 let max_active = StdArc::clone(&max_active);
1682 async move {
1683 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
1684 loop {
1685 let seen = max_active.load(StdOrdering::SeqCst);
1686 if now <= seen {
1687 break;
1688 }
1689 if max_active
1690 .compare_exchange(
1691 seen,
1692 now,
1693 StdOrdering::SeqCst,
1694 StdOrdering::SeqCst,
1695 )
1696 .is_ok()
1697 {
1698 break;
1699 }
1700 }
1701 thread::sleep(StdDuration::from_millis(20));
1702 active.fetch_sub(1, StdOrdering::SeqCst);
1703 Ok(item)
1704 }
1705 })
1706 })
1707 .run_collect()
1708 .unwrap();
1709 values.sort_unstable();
1710 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
1711 assert!(max_active.load(StdOrdering::SeqCst) <= 2);
1712 }
1713
1714 #[test]
1715 fn flat_map_merge_propagates_inner_failures() {
1716 let result = Source::from_iter([0, 1, 2])
1717 .flat_map_merge(2, |item| {
1718 if item == 1 {
1719 Source::failed(StreamError::Failed("boom".into()))
1720 } else {
1721 Source::single(item)
1722 }
1723 })
1724 .run_collect();
1725 assert_eq!(result, Err(StreamError::Failed("boom".into())));
1726 }
1727
1728 #[test]
1729 fn flat_map_merge_emits_ready_inner_output_while_upstream_is_blocked() {
1730 let (release_tx, release_rx) = mpsc::channel();
1731 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1732 let queue = Source::from_factory(move || {
1733 let release_rx = StdArc::clone(&release_rx);
1734 let mut step = 0_u8;
1735 Box::new(std::iter::from_fn(move || {
1736 let item = match step {
1737 0 => Some(Ok(0)),
1738 1 => {
1739 release_rx
1740 .lock()
1741 .unwrap()
1742 .as_ref()
1743 .expect("release receiver available")
1744 .recv_timeout(StdDuration::from_secs(1))
1745 .expect("timed out waiting to release second upstream element");
1746 Some(Ok(1))
1747 }
1748 _ => None,
1749 };
1750 step += 1;
1751 item
1752 }))
1753 })
1754 .flat_map_merge(2, |item| Source::single(item + 10))
1755 .run_with(Sink::queue())
1756 .unwrap();
1757
1758 assert_eq!(queue.pull().unwrap(), Some(10));
1759 release_tx.send(()).unwrap();
1760 assert_eq!(queue.pull().unwrap(), Some(11));
1761 assert!(queue.pull().unwrap().is_none());
1762 }
1763
1764 #[test]
1765 fn group_by_routes_keys_and_drops_closed_keys() {
1766 let outer = Source::from_iter([0, 1, 2, 3, 4])
1767 .group_by(4, |item| item % 2, false)
1768 .run_with(Sink::queue())
1769 .unwrap();
1770
1771 let even = outer.pull().unwrap().unwrap();
1772 let even_completion = even.run_with(Sink::ignore()).unwrap();
1773 let odd = outer.pull().unwrap().unwrap();
1774 drop(even_completion);
1775
1776 assert_eq!(odd.run_collect().unwrap(), vec![1, 3]);
1777 assert!(outer.pull().unwrap().is_none());
1778 }
1779
1780 #[test]
1781 fn group_by_fails_when_distinct_key_limit_is_exceeded() {
1782 let outer = Source::from_iter([0, 1, 2])
1783 .group_by(2, |item| *item, false)
1784 .run_with(Sink::queue())
1785 .unwrap();
1786
1787 let _ = outer.pull().unwrap().unwrap();
1788 let _ = outer.pull().unwrap().unwrap();
1789 assert!(matches!(
1790 outer.pull(),
1791 Err(StreamError::Failed(message)) if message == "group_by reached max_substreams (2)"
1792 ));
1793 }
1794
1795 #[test]
1796 fn group_by_can_recreate_closed_substreams_when_enabled() {
1797 let (release_tx, release_rx) = mpsc::channel();
1798 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
1799 let outer = Source::from_factory(move || {
1800 let release_rx = StdArc::clone(&release_rx);
1801 let mut step = 0_u8;
1802 Box::new(std::iter::from_fn(move || {
1803 let item = match step {
1804 0 => Some(Ok(0)),
1805 1 => Some(Ok(1)),
1806 2 => {
1807 release_rx
1808 .lock()
1809 .unwrap()
1810 .as_ref()
1811 .expect("release receiver available")
1812 .recv_timeout(StdDuration::from_secs(1))
1813 .expect("timed out waiting to release recreated key");
1814 Some(Ok(0))
1815 }
1816 _ => None,
1817 };
1818 step += 1;
1819 item
1820 }))
1821 })
1822 .group_by(4, |item| item % 2, true)
1823 .run_with(Sink::queue())
1824 .unwrap();
1825
1826 let even = outer.pull().unwrap().unwrap();
1827 assert_eq!(wait(even.run_with(Sink::head()).unwrap()), 0);
1828 release_tx.send(()).unwrap();
1829
1830 let odd = outer.pull().unwrap().unwrap();
1831 assert_eq!(odd.run_collect().unwrap(), vec![1]);
1832
1833 let recreated_even = outer.pull().unwrap().unwrap();
1834 assert_eq!(recreated_even.run_collect().unwrap(), vec![0]);
1835 assert!(outer.pull().unwrap().is_none());
1836 }
1837
1838 #[test]
1839 fn group_by_panicking_key_fn_abruptly_terminates_live_substreams() {
1840 let outer = Source::from_iter([0, 1])
1841 .group_by(
1842 4,
1843 |item| {
1844 assert_ne!(*item, 1, "boom");
1845 item % 2
1846 },
1847 false,
1848 )
1849 .run_with(Sink::queue())
1850 .unwrap();
1851
1852 let substream = outer.pull().unwrap().unwrap();
1853 let (result_tx, result_rx) = mpsc::channel();
1854 thread::spawn(move || {
1855 let _ = result_tx.send(substream.run_collect());
1856 });
1857
1858 assert_eq!(
1859 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1860 Err(StreamError::AbruptTermination)
1861 );
1862 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1863 }
1864
1865 #[test]
1866 fn split_when_starts_new_substream_on_boundary_element() {
1867 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1868 .split_when(|item| *item == 0)
1869 .run_with(Sink::queue())
1870 .unwrap();
1871
1872 let first = outer.pull().unwrap().unwrap();
1873 assert_eq!(first.run_collect().unwrap(), vec![1, 2]);
1874 let second = outer.pull().unwrap().unwrap();
1875 assert_eq!(second.run_collect().unwrap(), vec![0, 3]);
1876 let third = outer.pull().unwrap().unwrap();
1877 assert_eq!(third.run_collect().unwrap(), vec![0, 4, 5]);
1878 assert!(outer.pull().unwrap().is_none());
1879 }
1880
1881 #[test]
1882 fn split_after_ends_current_substream_on_boundary_element() {
1883 let outer = Source::from_iter([1, 2, 0, 3, 0, 4, 5])
1884 .split_after(|item| *item == 0)
1885 .run_with(Sink::queue())
1886 .unwrap();
1887
1888 let first = outer.pull().unwrap().unwrap();
1889 assert_eq!(first.run_collect().unwrap(), vec![1, 2, 0]);
1890 let second = outer.pull().unwrap().unwrap();
1891 assert_eq!(second.run_collect().unwrap(), vec![3, 0]);
1892 let third = outer.pull().unwrap().unwrap();
1893 assert_eq!(third.run_collect().unwrap(), vec![4, 5]);
1894 assert!(outer.pull().unwrap().is_none());
1895 }
1896
1897 #[test]
1898 fn split_when_panicking_predicate_abruptly_terminates_live_substreams() {
1899 let outer = Source::from_iter([1, 2])
1900 .split_when(|item| {
1901 assert_ne!(*item, 2, "boom");
1902 false
1903 })
1904 .run_with(Sink::queue())
1905 .unwrap();
1906
1907 let substream = outer.pull().unwrap().unwrap();
1908 let (result_tx, result_rx) = mpsc::channel();
1909 thread::spawn(move || {
1910 let _ = result_tx.send(substream.run_collect());
1911 });
1912
1913 assert_eq!(
1914 result_rx.recv_timeout(StdDuration::from_secs(1)).unwrap(),
1915 Err(StreamError::AbruptTermination)
1916 );
1917 assert!(matches!(outer.pull(), Err(StreamError::AbruptTermination)));
1918 }
1919
1920 #[test]
1921 fn split_when_pre_buffer_segments_match_expected_count() {
1922 let outer = Source::from_iter(0..100)
1923 .split_when(|item| *item != 0 && *item % 10 == 0)
1924 .run_with(Sink::queue())
1925 .unwrap();
1926 let mut segment_count = 0;
1927 while let Some(substream) = outer.pull().unwrap() {
1928 let items: Vec<i32> = substream.run_collect().unwrap();
1929 assert!(!items.is_empty(), "segment should not be empty");
1930 segment_count += 1;
1931 }
1932 assert_eq!(segment_count, 10, "100 elements in segments of 10");
1933 }
1934
1935 #[test]
1936 fn split_after_pre_buffer_segments_match_expected_count() {
1937 let outer = Source::from_iter(0..100)
1938 .split_after(|item| (*item + 1) % 10 == 0)
1939 .run_with(Sink::queue())
1940 .unwrap();
1941 let mut segment_count = 0;
1942 let mut total = 0_i32;
1943 while let Some(substream) = outer.pull().unwrap() {
1944 let items: Vec<i32> = substream.run_collect().unwrap();
1945 assert!(!items.is_empty(), "segment should not be empty");
1946 total += items.len() as i32;
1947 segment_count += 1;
1948 }
1949 assert_eq!(segment_count, 10);
1950 assert_eq!(total, 100);
1951 }
1952
1953 #[test]
1954 fn group_by_single_key_fused_matches_general_path() {
1955 let outer = Source::from_iter(0..1000i64)
1956 .group_by(1, |_| 0u8, false)
1957 .run_with(Sink::queue())
1958 .unwrap();
1959 let substream = outer.pull().unwrap().unwrap();
1960 let items: Vec<i64> = substream.run_collect().unwrap();
1961 assert_eq!(items.len(), 1000);
1962 assert_eq!(items[0], 0);
1963 assert_eq!(items[999], 999);
1964 assert!(outer.pull().unwrap().is_none());
1965 }
1966
1967 #[test]
1968 fn group_by_single_key_fused_handles_key_change_with_substream_limit() {
1969 let outer = Source::from_iter([0, 1, 0])
1970 .group_by(2, |item| *item, false)
1971 .run_with(Sink::queue())
1972 .unwrap();
1973 let mut sources = vec![];
1974 while let Some(source) = outer.pull().unwrap() {
1975 sources.push(source);
1976 }
1977 assert_eq!(sources.len(), 2);
1978 assert_eq!(sources[0].clone().run_collect().unwrap(), vec![0, 0]);
1979 assert_eq!(sources[1].clone().run_collect().unwrap(), vec![1]);
1980 }
1981
1982 #[test]
1983 fn flat_map_merge_lock_lighter_matches_expected_count() {
1984 let items = Source::from_iter(0..20)
1985 .flat_map_merge(2, |item| Source::single(item + 100))
1986 .run_with(Sink::queue())
1987 .unwrap();
1988 let mut count = 0;
1989 while items.pull().unwrap().is_some() {
1990 count += 1;
1991 }
1992 assert_eq!(count, 20);
1993 }
1994
1995 #[test]
2005 fn group_by_single_key_emits_substream_before_upstream_completes() {
2006 let (tx, rx) = mpsc::sync_channel::<i32>(0);
2009 let rx = StdArc::new(std::sync::Mutex::new(rx));
2010
2011 let outer = Source::from_factory({
2012 let rx = StdArc::clone(&rx);
2013 move || {
2014 let rx = StdArc::clone(&rx);
2015 Box::new(std::iter::from_fn(move || {
2016 rx.lock().unwrap().recv().ok().map(Ok)
2017 })) as BoxStream<i32>
2018 }
2019 })
2020 .group_by(1, |_| 0u8, false)
2021 .run_with(Sink::queue())
2022 .unwrap();
2023
2024 let (sub_tx, sub_rx) = mpsc::channel::<Source<i32>>();
2027 let outer_thread = thread::spawn(move || {
2028 let substream = outer.pull().unwrap().expect("expected a substream");
2029 sub_tx.send(substream).unwrap();
2030 });
2031
2032 tx.send(0).unwrap();
2035
2036 let substream = sub_rx
2038 .recv_timeout(StdDuration::from_secs(5))
2039 .expect("timed out — group_by buffered first element before emitting substream");
2040
2041 for i in 1..100_i32 {
2043 tx.send(i).unwrap();
2044 }
2045 drop(tx);
2046
2047 let items: Vec<i32> = substream.run_collect().unwrap();
2048 assert_eq!(items.len(), 100);
2049 outer_thread.join().unwrap();
2050 }
2051
2052 #[test]
2053 fn group_by_concurrent_live_substreams_do_not_hold_ready_item_stress() {
2054 const STREAMS: usize = 32;
2055 const ROUNDS: usize = 8;
2056 const ITEMS: i64 = 8;
2057
2058 for _ in 0..ROUNDS {
2059 let barrier = StdArc::new(std::sync::Barrier::new(STREAMS));
2060 let mut handles = Vec::with_capacity(STREAMS);
2061
2062 for _ in 0..STREAMS {
2063 let barrier = StdArc::clone(&barrier);
2064 handles.push(thread::spawn(move || {
2065 let (tx, rx) = mpsc::sync_channel::<i64>(0);
2066 let rx = StdArc::new(std::sync::Mutex::new(rx));
2067
2068 let outer = Source::from_factory({
2069 let rx = StdArc::clone(&rx);
2070 move || {
2071 let rx = StdArc::clone(&rx);
2072 Box::new(std::iter::from_fn(move || {
2073 rx.lock().unwrap().recv().ok().map(Ok)
2074 })) as BoxStream<i64>
2075 }
2076 })
2077 .group_by(1, |_| 0_u8, false)
2078 .run_with(Sink::queue())
2079 .unwrap();
2080
2081 barrier.wait();
2082
2083 tx.send(0).unwrap();
2084 let substream = outer.pull().unwrap().expect("expected group_by substream");
2085 let subqueue = substream.run_with(Sink::queue()).unwrap();
2086 assert_eq!(subqueue.pull().unwrap(), Some(0));
2087
2088 for item in 1..ITEMS {
2089 tx.send(item).unwrap();
2090 assert_eq!(subqueue.pull().unwrap(), Some(item));
2091 }
2092 drop(tx);
2093
2094 assert!(subqueue.pull().unwrap().is_none());
2095 assert!(outer.pull().unwrap().is_none());
2096 }));
2097 }
2098
2099 for handle in handles {
2100 handle.join().expect("group_by stress worker panicked");
2101 }
2102 }
2103 }
2104
2105 #[test]
2112 fn split_when_emits_substream_before_segment_ends() {
2113 const SEGMENT_LEN: usize = 300;
2117
2118 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2119 for i in 0..SEGMENT_LEN as i32 {
2120 tx.send(i).unwrap();
2121 }
2122 tx.send(-1).unwrap(); tx.send(99).unwrap(); drop(tx);
2125
2126 let outer = Source::from_iter(rx)
2127 .split_when(|item| *item == -1)
2128 .run_with(Sink::queue())
2129 .unwrap();
2130
2131 let (result_tx, result_rx) = mpsc::channel();
2132 thread::spawn(move || {
2133 let first = outer.pull().unwrap().expect("expected first substream");
2134 let items: Vec<i32> = first.run_collect().unwrap();
2135 let second = outer.pull().unwrap().expect("expected second substream");
2136 let items2: Vec<i32> = second.run_collect().unwrap();
2137 let done = outer.pull().unwrap().is_none();
2138 let _ = result_tx.send((items, items2, done));
2139 });
2140
2141 let (items, items2, done) = result_rx
2142 .recv_timeout(StdDuration::from_secs(5))
2143 .expect("timed out — split_when is buffering the whole segment");
2144 assert_eq!(items.len(), SEGMENT_LEN);
2145 assert_eq!(items2, vec![-1, 99]);
2146 assert!(done);
2147 }
2148
2149 #[test]
2150 fn split_after_emits_substream_before_segment_ends() {
2151 const SEGMENT_LEN: usize = 300;
2152
2153 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT_LEN * 2 + 4);
2154 for i in 0..SEGMENT_LEN as i32 {
2155 tx.send(i).unwrap();
2156 }
2157 tx.send(-1).unwrap(); tx.send(99).unwrap();
2159 drop(tx);
2160
2161 let outer = Source::from_iter(rx)
2162 .split_after(|item| *item == -1)
2163 .run_with(Sink::queue())
2164 .unwrap();
2165
2166 let (result_tx, result_rx) = mpsc::channel();
2167 thread::spawn(move || {
2168 let first = outer.pull().unwrap().expect("expected first substream");
2169 let items: Vec<i32> = first.run_collect().unwrap();
2170 let second = outer.pull().unwrap().expect("expected second substream");
2171 let items2: Vec<i32> = second.run_collect().unwrap();
2172 let done = outer.pull().unwrap().is_none();
2173 let _ = result_tx.send((items, items2, done));
2174 });
2175
2176 let (items, items2, done) = result_rx
2177 .recv_timeout(StdDuration::from_secs(5))
2178 .expect("timed out — split_after is buffering the whole segment");
2179 assert_eq!(items.len(), SEGMENT_LEN + 1);
2181 assert_eq!(items2, vec![99]);
2182 assert!(done);
2183 }
2184
2185 #[test]
2189 fn flat_map_merge_coordinator_no_lost_wakeup_stress() {
2190 for _ in 0..20 {
2191 let result = Source::from_iter(0..50_i32)
2192 .flat_map_merge(8, |item| Source::from_iter(item..item + 3))
2193 .run_with(Sink::fold(0i64, |acc, item| acc + item as i64))
2194 .unwrap()
2195 .wait();
2196 assert_eq!(result, Ok(3825), "flat_map_merge produced wrong sum");
2199 }
2200 }
2201
2202 #[test]
2207 fn flat_map_merge_single_mutex_race_stress() {
2208 for _ in 0..20 {
2209 let result = Source::from_iter(0..100_i64)
2210 .flat_map_merge(16, |item| Source::from_iter([item, item + 1000]))
2211 .run_with(Sink::fold(0i64, |acc, v| acc + v))
2212 .unwrap()
2213 .wait();
2214 assert_eq!(result, Ok(109_900), "flat_map_merge single-mutex stress");
2218 }
2219 }
2220
2221 #[test]
2228 fn split_when_bounded_memory_rendezvous() {
2229 const SEGMENT: usize = 100;
2232 let (tx, rx) = mpsc::sync_channel::<i32>(SEGMENT * 4);
2233 for i in 0..SEGMENT as i32 {
2234 tx.send(i).unwrap();
2235 }
2236 tx.send(-1).unwrap(); for i in 0..10_i32 {
2239 tx.send(i).unwrap();
2240 }
2241 drop(tx);
2242
2243 let outer = Source::from_iter(rx)
2244 .split_when(|item| *item == -1)
2245 .run_with(Sink::queue())
2246 .unwrap();
2247
2248 let (result_tx, result_rx) = mpsc::channel();
2249 thread::spawn(move || {
2250 let first = outer.pull().unwrap().expect("first segment");
2251 let seg1: Vec<i32> = first.run_collect().unwrap();
2252 let second = outer.pull().unwrap().expect("second segment");
2253 let seg2: Vec<i32> = second.run_collect().unwrap();
2254 let done = outer.pull().unwrap().is_none();
2255 result_tx.send((seg1, seg2, done)).unwrap();
2256 });
2257
2258 let (seg1, seg2, done) = result_rx
2259 .recv_timeout(StdDuration::from_secs(5))
2260 .expect("timed out — split_when writer held items past LIVE_SUBSTREAM_BATCH");
2261 assert_eq!(seg1.len(), SEGMENT, "first segment length");
2262 assert_eq!(seg2[0], -1, "boundary element starts second segment");
2263 assert_eq!(seg2.len(), 11, "second segment: boundary + 10 items");
2264 assert!(done);
2265 }
2266
2267 #[test]
2271 fn group_by_single_key_bounded_memory_rendezvous() {
2272 const N: usize = 200;
2275 let outer = Source::from_iter(0..N as i64)
2276 .group_by(1, |_| 0u8, false)
2277 .run_with(Sink::queue())
2278 .unwrap();
2279
2280 let (result_tx, result_rx) = mpsc::channel();
2281 thread::spawn(move || {
2282 let substream = outer.pull().unwrap().expect("substream");
2283 let items: Vec<i64> = substream.run_collect().unwrap();
2284 let done = outer.pull().unwrap().is_none();
2285 result_tx.send((items, done)).unwrap();
2286 });
2287
2288 let (items, done) = result_rx
2289 .recv_timeout(StdDuration::from_secs(5))
2290 .expect("timed out — group_by write batch held items beyond LIVE_SUBSTREAM_BATCH");
2291 assert_eq!(items.len(), N, "all items delivered");
2292 assert_eq!(items[0], 0);
2293 assert_eq!(items[N - 1], (N - 1) as i64);
2294 assert!(done);
2295 }
2296
2297 #[test]
2298 fn scan_emits_seed_and_accumulated_values() {
2299 let result = Source::from_iter(1..=3)
2300 .scan(0, |acc, item| acc + item)
2301 .run_collect()
2302 .unwrap();
2303
2304 assert_eq!(result, vec![0, 1, 3, 6]);
2305 }
2306
2307 #[test]
2308 fn limit_fails_after_max_elements() {
2309 let result = Source::from_iter(0..3).limit(2).run_collect();
2310
2311 assert_eq!(result, Err(StreamError::LimitExceeded { max: 2 }));
2312 }
2313
2314 #[test]
2315 fn limit_weighted_fails_with_limit_error_like_akka() {
2316 let result = Source::from_iter(["this", "is", "some", "string"])
2317 .via(Flow::identity().limit_weighted(15, |item: &&str| item.len()))
2318 .run_collect();
2319
2320 assert_eq!(result, Err(StreamError::LimitExceeded { max: 15 }));
2321 }
2322
2323 #[test]
2324 fn grouped_weighted_allows_oversized_first_element_like_akka() {
2325 let result = Source::from_iter([10_usize, 1, 2])
2326 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2327 .run_collect()
2328 .unwrap();
2329
2330 assert_eq!(result, vec![vec![10], vec![1, 2]]);
2331 }
2332
2333 #[test]
2334 fn grouped_weighted_keeps_oversized_later_element_in_current_group_like_akka() {
2335 let result = Source::from_iter([1_usize, 10, 2])
2336 .via(Flow::identity().grouped_weighted(5, |item: &usize| *item))
2337 .run_collect()
2338 .unwrap();
2339
2340 assert_eq!(result, vec![vec![1, 10], vec![2]]);
2341 }
2342
2343 #[test]
2344 fn sink_terminals_materialize_results() {
2345 let sum = Source::from_iter(1..=4)
2346 .run_with(Sink::fold(0, |acc, item| acc + item))
2347 .unwrap();
2348
2349 assert_eq!(wait(sum), 10);
2350 assert_eq!(
2351 wait(Source::from_iter(1..=4).run_with(Sink::head()).unwrap()),
2352 1
2353 );
2354 assert_eq!(
2355 wait(Source::from_iter(1..=4).run_with(Sink::last()).unwrap()),
2356 4
2357 );
2358 }
2359
2360 #[test]
2361 fn all_terminal_sink_variants_complete() {
2362 assert_eq!(
2363 wait(
2364 Source::from_iter([1, 2, 3])
2365 .run_with(Sink::collect())
2366 .unwrap()
2367 ),
2368 vec![1, 2, 3]
2369 );
2370 assert_eq!(
2371 wait(
2372 Source::<i32>::empty()
2373 .run_with(Sink::head_option())
2374 .unwrap()
2375 ),
2376 None
2377 );
2378 assert_eq!(
2379 wait(
2380 Source::from_iter([1, 2, 3])
2381 .run_with(Sink::last_option())
2382 .unwrap()
2383 ),
2384 Some(3)
2385 );
2386 assert_eq!(
2387 wait(
2388 Source::from_iter([1, 2, 3])
2389 .run_with(Sink::reduce(|acc, item| acc + item))
2390 .unwrap()
2391 ),
2392 6
2393 );
2394
2395 let seen = StdArc::new(StdAtomicUsize::new(0));
2396 let seen_by_sink = StdArc::clone(&seen);
2397 assert_eq!(
2398 wait(
2399 Source::from_iter([1_usize, 2, 3])
2400 .run_with(Sink::foreach(move |item| {
2401 seen_by_sink.fetch_add(item, StdOrdering::SeqCst);
2402 }))
2403 .unwrap()
2404 ),
2405 NotUsed
2406 );
2407 assert_eq!(seen.load(StdOrdering::SeqCst), 6);
2408 }
2409
2410 #[test]
2411 fn take_last_zero_returns_empty_vector() {
2412 let result = Source::from_iter([1, 2, 3])
2413 .run_with(Sink::take_last(0))
2414 .unwrap();
2415
2416 assert_eq!(wait(result), Vec::<i32>::new());
2417 }
2418
2419 #[test]
2420 fn bounded_head_terminals_complete_inline() {
2421 let materializer = Materializer::new();
2422
2423 let mut head = Source::from_iter(0_u64..1_000)
2424 .run_with_materializer(Sink::head(), &materializer)
2425 .unwrap();
2426 assert_eq!(materializer.active_streams(), 0);
2427 assert_eq!(head.try_wait(), Some(Ok(0)));
2428
2429 let mut filtered_head = Source::from_iter(0_u64..1_000)
2430 .filter(|item| *item >= 10)
2431 .run_with_materializer(Sink::head(), &materializer)
2432 .unwrap();
2433 assert_eq!(materializer.active_streams(), 0);
2434 assert_eq!(filtered_head.try_wait(), Some(Ok(10)));
2435
2436 let mut head_option = Source::<u64>::empty()
2437 .run_with_materializer(Sink::head_option(), &materializer)
2438 .unwrap();
2439 assert_eq!(materializer.active_streams(), 0);
2440 assert_eq!(head_option.try_wait(), Some(Ok(None)));
2441 }
2442
2443 #[test]
2444 fn bounded_head_fast_path_preserves_terminal_errors() {
2445 let materializer = Materializer::new();
2446
2447 let mut empty = Source::<u64>::empty()
2448 .run_with_materializer(Sink::head(), &materializer)
2449 .unwrap();
2450 assert_eq!(empty.try_wait(), Some(Err(StreamError::EmptyStream)));
2451
2452 let mut failed = Source::<u64>::failed(StreamError::Failed("boom".into()))
2453 .run_with_materializer(Sink::head(), &materializer)
2454 .unwrap();
2455 assert_eq!(
2456 failed.try_wait(),
2457 Some(Err(StreamError::Failed("boom".into())))
2458 );
2459 assert_eq!(materializer.active_streams(), 0);
2460 }
2461
2462 #[test]
2463 fn runnable_graph_composes_source_and_sink() {
2464 let graph = Source::from_iter(1..=4)
2465 .map(|item| item * 2)
2466 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::right);
2467
2468 assert_eq!(wait(graph.run().unwrap()), 20);
2469
2470 let graph = Source::single(1)
2471 .map_materialized_value(|_| 20)
2472 .to(Sink::ignore())
2473 .map_materialized_value(|value| value + 1);
2474 assert_eq!(graph.run().unwrap(), 21);
2475
2476 let ignored = Source::single(1).to(Sink::ignore()).run().unwrap();
2477 assert_eq!(ignored, NotUsed);
2478 }
2479
2480 #[test]
2481 fn materialized_values_follow_keep_defaults() {
2482 let source = Source::single(1).map_materialized_value(|_| "source");
2483 let flow = Flow::identity().map_materialized_value(|_| "flow");
2484
2485 let source_mat = source.clone().via(flow.clone()).to(Sink::ignore()).run();
2486 assert_eq!(source_mat.unwrap(), "source");
2487
2488 let combined = source
2489 .via_mat(flow, Keep::both)
2490 .to_mat(Sink::ignore(), Keep::both)
2491 .run()
2492 .unwrap();
2493 assert_eq!(combined.0, ("source", "flow"));
2494 assert_eq!(wait(combined.1), NotUsed);
2495
2496 let sink_mat = Source::single(41)
2497 .map_materialized_value(|_| "ignored source")
2498 .run_with(Sink::fold(1, |acc, item| acc + item))
2499 .unwrap();
2500 assert_eq!(wait(sink_mat), 42);
2501 }
2502
2503 #[test]
2504 fn flow_to_sink_preserves_flow_materialized_value_by_default() {
2505 let sink = Flow::identity()
2506 .map(|item: i32| item + 1)
2507 .map_materialized_value(|_| "flow")
2508 .to(Sink::fold(0, |acc, item| acc + item));
2509
2510 let materialized = Source::from_iter([1, 2, 3]).run_with(sink).unwrap();
2511
2512 assert_eq!(materialized, "flow");
2513 let explicit = Flow::identity()
2514 .map(|item: i32| item + 1)
2515 .map_materialized_value(|_| "flow")
2516 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both)
2517 .run_with(Source::from_iter([1, 2, 3]))
2518 .unwrap();
2519 assert_eq!(explicit, NotUsed);
2520
2521 let explicit = Source::from_iter([1, 2, 3])
2522 .run_with(
2523 Flow::identity()
2524 .map(|item: i32| item + 1)
2525 .map_materialized_value(|_| "flow")
2526 .to_mat(Sink::fold(0, |acc, item| acc + item), Keep::both),
2527 )
2528 .unwrap();
2529 assert_eq!(explicit.0, "flow");
2530 assert_eq!(wait(explicit.1), 9);
2531 }
2532
2533 #[test]
2534 fn materializer_shutdown_fails_materialization() {
2535 let materializer = Materializer::new();
2536 let named = materializer.with_name_prefix("test-stream");
2537 materializer.shutdown();
2538
2539 let graph = Source::single(1).to(Sink::ignore());
2540
2541 assert_eq!(named.name_prefix(), "test-stream");
2542 assert_eq!(
2543 graph.run_with_materializer(&named),
2544 Err(StreamError::AbruptTermination)
2545 );
2546 }
2547
2548 #[test]
2549 fn materializer_shutdown_fails_running_stream_completion() {
2550 let materializer = Materializer::new();
2551 let completion = Source::repeat(1)
2552 .run_with_materializer(Sink::ignore(), &materializer)
2553 .unwrap();
2554
2555 assert_eq!(materializer.active_streams(), 1);
2556 materializer.shutdown();
2557 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2558 assert_eq!(materializer.active_streams(), 0);
2559 }
2560
2561 #[test]
2562 fn dropped_stream_completion_cancels_running_stream() {
2563 let materializer = Materializer::new();
2564 let completion = Source::repeat(1)
2565 .run_with_materializer(Sink::ignore(), &materializer)
2566 .unwrap();
2567
2568 assert_eq!(materializer.active_streams(), 1);
2569 drop(completion);
2570 for _ in 0..50 {
2571 if materializer.active_streams() == 0 {
2572 break;
2573 }
2574 thread::sleep(Duration::from_millis(5));
2575 }
2576 assert_eq!(materializer.active_streams(), 0);
2577 }
2578
2579 #[test]
2580 fn runtime_timers_fire_cancel_and_stop_on_shutdown() {
2581 let materializer = Materializer::new();
2582 let (once_tx, once_rx) = mpsc::channel();
2583 let once = materializer.schedule_once(Duration::from_millis(5), move || {
2584 once_tx.send(()).unwrap();
2585 });
2586 once_rx.recv_timeout(Duration::from_millis(250)).unwrap();
2587 assert!(!once.is_cancelled());
2588
2589 let (cancelled_tx, cancelled_rx) = mpsc::channel();
2590 let cancelled = materializer.schedule_once(Duration::from_millis(25), move || {
2591 cancelled_tx.send(()).unwrap();
2592 });
2593 assert!(cancelled.cancel());
2594 assert!(!cancelled.cancel());
2595 assert!(cancelled.is_cancelled());
2596 assert!(
2597 cancelled_rx
2598 .recv_timeout(Duration::from_millis(75))
2599 .is_err()
2600 );
2601
2602 let fixed_delay_count = StdArc::new(StdAtomicUsize::new(0));
2603 let fixed_delay_task_count = StdArc::clone(&fixed_delay_count);
2604 let fixed_delay = materializer.schedule_with_fixed_delay(
2605 Duration::from_millis(1),
2606 Duration::from_millis(5),
2607 move || {
2608 fixed_delay_task_count.fetch_add(1, StdOrdering::SeqCst);
2609 },
2610 );
2611 thread::sleep(Duration::from_millis(25));
2612 assert!(fixed_delay_count.load(StdOrdering::SeqCst) > 0);
2613 fixed_delay.cancel();
2614
2615 let fixed_rate_count = StdArc::new(StdAtomicUsize::new(0));
2616 let fixed_rate_task_count = StdArc::clone(&fixed_rate_count);
2617 let fixed_rate = materializer.schedule_at_fixed_rate(
2618 Duration::from_millis(1),
2619 Duration::from_millis(5),
2620 move || {
2621 fixed_rate_task_count.fetch_add(1, StdOrdering::SeqCst);
2622 },
2623 );
2624 thread::sleep(Duration::from_millis(25));
2625 assert!(fixed_rate_count.load(StdOrdering::SeqCst) > 0);
2626 fixed_rate.cancel();
2627
2628 let shutdown_materializer = Materializer::new();
2629 let (shutdown_tx, shutdown_rx) = mpsc::channel();
2630 shutdown_materializer.schedule_once(Duration::from_millis(25), move || {
2631 shutdown_tx.send(()).unwrap();
2632 });
2633 shutdown_materializer.shutdown();
2634 assert!(shutdown_rx.recv_timeout(Duration::from_millis(75)).is_err());
2635 }
2636
2637 #[test]
2638 fn runtime_timer_driver_preserves_fixed_rate_cadence_under_slow_tasks() {
2639 use std::sync::{Condvar, Mutex};
2640
2641 #[derive(Debug)]
2642 enum TimerEvent {
2643 Started(usize, Instant),
2644 Completed(usize, Instant),
2645 }
2646
2647 let recv_event = |rx: &mpsc::Receiver<TimerEvent>, label: &str| {
2648 rx.recv_timeout(Duration::from_secs(20))
2649 .unwrap_or_else(|err| panic!("{label}: expected timer event within 20 s: {err}"))
2650 };
2651 let release = |gate: &StdArc<(Mutex<bool>, Condvar)>| {
2652 let (released, condvar) = &**gate;
2653 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2654 *released = true;
2655 condvar.notify_all();
2656 };
2657
2658 let interval = Duration::from_secs(2);
2659 let overrun = interval + Duration::from_millis(250);
2660
2661 let rate_materializer = Materializer::new();
2662 let (rate_tx, rate_rx) = mpsc::channel();
2663 let rate_runs = StdArc::new(StdAtomicUsize::new(0));
2664 let rate_task_runs = StdArc::clone(&rate_runs);
2665 let rate_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2666 let rate_task_gate = StdArc::clone(&rate_gate);
2667 let fixed_rate =
2668 rate_materializer.schedule_at_fixed_rate(Duration::ZERO, interval, move || {
2669 let run = rate_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2670 rate_tx
2671 .send(TimerEvent::Started(run, Instant::now()))
2672 .unwrap();
2673 if run == 1 {
2674 let (released, condvar) = &*rate_task_gate;
2675 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2676 while !*released {
2677 released = condvar
2678 .wait(released)
2679 .unwrap_or_else(|poison| poison.into_inner());
2680 }
2681 rate_tx
2682 .send(TimerEvent::Completed(run, Instant::now()))
2683 .unwrap();
2684 }
2685 });
2686 let rate_first_started = match recv_event(&rate_rx, "fixed-rate first task") {
2687 TimerEvent::Started(1, at) => at,
2688 other => panic!("fixed-rate first task: unexpected event {other:?}"),
2689 };
2690 assert!(wait_until(Duration::from_secs(20), || {
2691 rate_first_started.elapsed() >= overrun
2692 }));
2693 release(&rate_gate);
2694 let rate_first_completed = match recv_event(&rate_rx, "fixed-rate first completion") {
2695 TimerEvent::Completed(1, at) => at,
2696 other => panic!("fixed-rate first completion: unexpected event {other:?}"),
2697 };
2698 let rate_second_started = match recv_event(&rate_rx, "fixed-rate second task") {
2699 TimerEvent::Started(2, at) => at,
2700 other => panic!("fixed-rate second task: unexpected event {other:?}"),
2701 };
2702 fixed_rate.cancel();
2703 rate_materializer.shutdown();
2704
2705 let delay_materializer = Materializer::new();
2706 let (delay_tx, delay_rx) = mpsc::channel();
2707 let delay_runs = StdArc::new(StdAtomicUsize::new(0));
2708 let delay_task_runs = StdArc::clone(&delay_runs);
2709 let delay_gate = StdArc::new((Mutex::new(false), Condvar::new()));
2710 let delay_task_gate = StdArc::clone(&delay_gate);
2711 let fixed_delay =
2712 delay_materializer.schedule_with_fixed_delay(Duration::ZERO, interval, move || {
2713 let run = delay_task_runs.fetch_add(1, StdOrdering::SeqCst) + 1;
2714 delay_tx
2715 .send(TimerEvent::Started(run, Instant::now()))
2716 .unwrap();
2717 if run == 1 {
2718 let (released, condvar) = &*delay_task_gate;
2719 let mut released = released.lock().unwrap_or_else(|poison| poison.into_inner());
2720 while !*released {
2721 released = condvar
2722 .wait(released)
2723 .unwrap_or_else(|poison| poison.into_inner());
2724 }
2725 delay_tx
2726 .send(TimerEvent::Completed(run, Instant::now()))
2727 .unwrap();
2728 }
2729 });
2730 let delay_first_started = match recv_event(&delay_rx, "fixed-delay first task") {
2731 TimerEvent::Started(1, at) => at,
2732 other => panic!("fixed-delay first task: unexpected event {other:?}"),
2733 };
2734 assert!(wait_until(Duration::from_secs(20), || {
2735 delay_first_started.elapsed() >= overrun
2736 }));
2737 release(&delay_gate);
2738 let delay_first_completed = match recv_event(&delay_rx, "fixed-delay first completion") {
2739 TimerEvent::Completed(1, at) => at,
2740 other => panic!("fixed-delay first completion: unexpected event {other:?}"),
2741 };
2742 let delay_second_started = match recv_event(&delay_rx, "fixed-delay second task") {
2743 TimerEvent::Started(2, at) => at,
2744 other => panic!("fixed-delay second task: unexpected event {other:?}"),
2745 };
2746 fixed_delay.cancel();
2747 delay_materializer.shutdown();
2748
2749 let rate_task_time = rate_first_completed.duration_since(rate_first_started);
2750 let rate_catch_up = rate_second_started.duration_since(rate_first_completed);
2751 let delay_task_time = delay_first_completed.duration_since(delay_first_started);
2752 let delay_gap = delay_second_started.duration_since(delay_first_completed);
2753 assert!(
2754 rate_task_time >= interval,
2755 "fixed-rate first task should overrun its interval; ran for {rate_task_time:?}"
2756 );
2757 assert!(
2758 rate_catch_up < interval,
2759 "fixed-rate second task should catch up after an overrun; waited {rate_catch_up:?}"
2760 );
2761 assert!(
2762 delay_task_time >= interval,
2763 "fixed-delay first task should overrun its interval; ran for {delay_task_time:?}"
2764 );
2765 assert!(
2766 delay_gap >= interval,
2767 "fixed-delay second task fired before one full delay elapsed after completion: {delay_gap:?}",
2768 );
2769 }
2770
2771 #[test]
2772 fn runtime_repeating_timer_cancellation_stops_future_fires() {
2773 let materializer = Materializer::new();
2774 let (tx, rx) = mpsc::channel();
2775 let timer = materializer.schedule_at_fixed_rate(
2776 Duration::from_millis(1),
2777 Duration::from_millis(30),
2778 move || {
2779 tx.send(()).unwrap();
2780 },
2781 );
2782
2783 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2784 assert!(timer.cancel());
2785 assert!(rx.recv_timeout(Duration::from_millis(90)).is_err());
2786 materializer.shutdown();
2787 }
2788
2789 #[test]
2790 fn runtime_panicking_once_timer_does_not_kill_driver_or_later_timers() {
2791 let materializer = Materializer::new();
2792 materializer.schedule_once(Duration::from_millis(1), || {
2793 panic!("timer boom");
2794 });
2795
2796 let (tx, rx) = mpsc::channel();
2797 materializer.schedule_once(Duration::from_millis(20), move || {
2798 tx.send(()).unwrap();
2799 });
2800
2801 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2802 materializer.shutdown();
2803 }
2804
2805 #[test]
2806 fn runtime_panicking_fixed_rate_timer_stops_itself_and_leaves_driver_alive() {
2807 let materializer = Materializer::new();
2808 let panic_count = StdArc::new(StdAtomicUsize::new(0));
2809 let panic_count_task = StdArc::clone(&panic_count);
2810 materializer.schedule_at_fixed_rate(Duration::ZERO, Duration::from_millis(20), move || {
2811 panic_count_task.fetch_add(1, StdOrdering::SeqCst);
2812 panic!("fixed-rate boom");
2813 });
2814
2815 assert!(wait_until(Duration::from_millis(150), || {
2816 panic_count.load(StdOrdering::SeqCst) == 1
2817 }));
2818
2819 let (tx, rx) = mpsc::channel();
2820 materializer.schedule_once(Duration::from_millis(30), move || {
2821 tx.send(()).unwrap();
2822 });
2823 rx.recv_timeout(Duration::from_millis(250)).unwrap();
2824
2825 thread::sleep(Duration::from_millis(90));
2826 assert_eq!(panic_count.load(StdOrdering::SeqCst), 1);
2827 materializer.shutdown();
2828 }
2829
2830 #[test]
2831 fn runtime_slow_timer_task_does_not_delay_unrelated_timers() {
2832 let materializer = Materializer::new();
2833 let started = StdArc::new(StdAtomicBool::new(false));
2834 let started_task = StdArc::clone(&started);
2835 let slow_timer = materializer.schedule_at_fixed_rate(
2836 Duration::ZERO,
2837 Duration::from_millis(250),
2838 move || {
2839 started_task.store(true, StdOrdering::SeqCst);
2840 thread::sleep(Duration::from_millis(200));
2841 },
2842 );
2843
2844 assert!(wait_until(Duration::from_millis(100), || {
2845 started.load(StdOrdering::SeqCst)
2846 }));
2847
2848 let start = Instant::now();
2849 let (tx, rx) = mpsc::channel();
2850 materializer.schedule_once(Duration::from_millis(10), move || {
2851 tx.send(Instant::now()).unwrap();
2852 });
2853 let fired_at = rx.recv_timeout(Duration::from_millis(350)).unwrap();
2854 let elapsed = fired_at.duration_since(start);
2855
2856 slow_timer.cancel();
2857 materializer.shutdown();
2858 assert!(
2859 elapsed < Duration::from_millis(150),
2860 "unrelated timer was delayed by a blocking timer task: {elapsed:?}",
2861 );
2862 }
2863
2864 #[test]
2865 fn runtime_shutdown_stops_timer_driver_thread() {
2866 let materializer = Materializer::new();
2867 assert!(wait_until(Duration::from_secs(1), || materializer
2868 .timer_driver_is_live()));
2869
2870 materializer.shutdown();
2871 assert!(wait_until(Duration::from_secs(2), || !materializer
2872 .timer_driver_is_live()));
2873 }
2874
2875 #[test]
2876 fn runtime_timer_driver_orders_many_timers_by_deadline() {
2877 let materializer = Materializer::new();
2878 let (tx, rx) = mpsc::channel();
2879 let schedule = [(450_u64, 4_u8), (50, 1), (350, 3), (150, 2), (550, 5)];
2880
2881 for (delay_ms, value) in schedule {
2882 let tx = tx.clone();
2883 materializer.schedule_once(Duration::from_millis(delay_ms), move || {
2884 tx.send(value).unwrap();
2885 });
2886 }
2887 drop(tx);
2888
2889 let mut received = Vec::new();
2890 for _ in 0..schedule.len() {
2891 received.push(rx.recv_timeout(Duration::from_secs(10)).unwrap());
2892 }
2893 materializer.shutdown();
2894
2895 assert_eq!(received, vec![1, 2, 3, 4, 5]);
2896 }
2897
2898 #[test]
2899 fn runtime_timer_driver_uses_one_thread_per_runtime_regardless_of_timer_count() {
2900 let materializer = Materializer::new();
2901 let thread_name = materializer.timer_thread_name().to_owned();
2902 let linux_thread_name = thread_name.chars().take(15).collect::<String>();
2908 assert!(wait_until(Duration::from_secs(5), || {
2909 materializer.timer_driver_is_live() && linux_thread_count(&linux_thread_name) >= 1
2910 }));
2911 let live_timer_threads = linux_thread_count(&linux_thread_name);
2912
2913 for _ in 0..128 {
2914 materializer.schedule_once(Duration::from_secs(60), || {});
2915 }
2916
2917 assert!(
2918 wait_until(Duration::from_secs(5), || {
2919 materializer.timer_driver_is_live()
2920 && linux_thread_count(&linux_thread_name) == live_timer_threads
2921 }),
2922 "scheduling timers should not create extra timer threads for a runtime",
2923 );
2924 materializer.shutdown();
2925 assert!(wait_until(Duration::from_secs(5), || {
2926 !materializer.timer_driver_is_live()
2927 && linux_thread_count(&linux_thread_name) < live_timer_threads
2928 }));
2929 }
2930
2931 #[test]
2932 fn cancelled_and_never_sinks_have_distinct_materialization_results() {
2933 assert_eq!(
2934 Source::repeat(1)
2935 .run_with(Sink::cancelled())
2936 .expect("cancelled sink materializes"),
2937 NotUsed
2938 );
2939 assert_eq!(
2940 Source::single(1)
2941 .run_with(Sink::never())
2942 .expect("never sink materializes")
2943 .try_wait(),
2944 None
2945 );
2946 }
2947
2948 #[test]
2949 fn never_sink_finishes_on_materializer_shutdown() {
2950 let materializer = Materializer::new();
2951 let completion = Source::single(1)
2952 .run_with_materializer(Sink::never(), &materializer)
2953 .unwrap();
2954
2955 materializer.shutdown();
2956 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
2957 }
2958
2959 #[test]
2960 fn dropping_source_never_completion_releases_parked_worker() {
2961 let materializer = Materializer::new();
2962 let completion = Source::<i32>::never()
2963 .run_with_materializer(Sink::ignore(), &materializer)
2964 .unwrap();
2965
2966 assert!(wait_until(StdDuration::from_secs(1), || {
2967 materializer.active_streams() == 1
2968 }));
2969 assert_eq!(materializer.active_streams(), 1);
2970
2971 drop(completion);
2972
2973 assert!(wait_until(StdDuration::from_secs(15), || {
2974 materializer.active_streams() == 0
2975 }));
2976 assert_eq!(materializer.active_streams(), 0);
2977 }
2978
2979 #[test]
2980 fn future_and_maybe_sources_emit_values() {
2981 let future_value = Source::future(|| async { Ok(7) }).run_collect().unwrap();
2982 assert_eq!(future_value, vec![7]);
2983
2984 let future_source = Source::future_source(|| async { Ok(Source::from_iter([1, 2, 3])) })
2985 .run_collect()
2986 .unwrap();
2987 assert_eq!(future_source, vec![1, 2, 3]);
2988
2989 let (handle, source) = Source::maybe();
2990 assert_eq!(
2991 source.clone().run_collect(),
2992 Err(StreamError::MaybeIncomplete)
2993 );
2994 handle.complete(9).unwrap();
2995 assert_eq!(source.run_collect().unwrap(), vec![9]);
2996 }
2997
2998 #[test]
2999 fn wp6b_source_generators_emit_and_fail_like_stream_errors() {
3000 assert_eq!(
3001 Source::cycle(|| [1, 2, 3].into_iter())
3002 .take(8)
3003 .run_collect()
3004 .unwrap(),
3005 vec![1, 2, 3, 1, 2, 3, 1, 2]
3006 );
3007 assert_eq!(
3008 Source::<i32>::cycle(std::iter::empty::<i32>).run_collect(),
3009 Err(StreamError::Failed("empty iterator".into()))
3010 );
3011 assert_eq!(
3012 Source::unfold(0, |state| (state < 4).then_some((state + 1, state)))
3013 .run_collect()
3014 .unwrap(),
3015 vec![0, 1, 2, 3]
3016 );
3017 assert_eq!(
3018 Source::unfold_async(0, |state| async move {
3019 Ok((state < 4).then_some((state + 1, state * 2)))
3020 })
3021 .run_collect()
3022 .unwrap(),
3023 vec![0, 2, 4, 6]
3024 );
3025 assert!(matches!(
3026 Source::<i32>::lazy_single(|| panic!("boom")).run_collect(),
3027 Err(StreamError::Failed(message)) if message == "lazy_single factory panicked"
3028 ));
3029 }
3030
3031 #[test]
3032 fn wp6b_lazy_sources_defer_until_first_pull_and_complete_deferred_mat() {
3033 let created = StdArc::new(StdAtomicUsize::new(0));
3034 let created_for_source = StdArc::clone(&created);
3035 let source = Source::<i32>::lazy_source(move || {
3036 created_for_source.fetch_add(1, StdOrdering::SeqCst);
3037 Source::from_iter([7, 8]).map_materialized_value(|_| 99)
3038 });
3039 let materializer = Materializer::new();
3040 let (mut stream, mut mat) = StdArc::clone(&source.factory)
3041 .create(&materializer)
3042 .unwrap();
3043
3044 assert_eq!(created.load(StdOrdering::SeqCst), 0);
3045 assert!(mat.try_wait().is_none());
3046 assert_eq!(stream.next().unwrap().unwrap(), 7);
3047 assert_eq!(mat.wait().unwrap(), 99);
3048 assert_eq!(created.load(StdOrdering::SeqCst), 1);
3049 assert_eq!(stream.next().unwrap().unwrap(), 8);
3050
3051 let never_created = StdArc::new(StdAtomicUsize::new(0));
3052 let never_created_for_source = StdArc::clone(&never_created);
3053 let mat = Source::<i32>::lazy_future_source(move || {
3054 never_created_for_source.fetch_add(1, StdOrdering::SeqCst);
3055 async { Ok(Source::single(1)) }
3056 })
3057 .to(Sink::cancelled())
3058 .run()
3059 .unwrap();
3060 assert!(matches!(mat.wait(), Err(StreamError::Failed(_))));
3061 assert_eq!(never_created.load(StdOrdering::SeqCst), 0);
3062
3063 let lazy_future = StdArc::new(StdAtomicUsize::new(0));
3064 let lazy_future_for_source = StdArc::clone(&lazy_future);
3065 let source = Source::lazy_future(move || {
3066 lazy_future_for_source.fetch_add(1, StdOrdering::SeqCst);
3067 async { Ok(42) }
3068 });
3069 let (mut stream, _) = StdArc::clone(&source.factory)
3070 .create(&Materializer::new())
3071 .unwrap();
3072 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 0);
3073 assert_eq!(stream.next().unwrap().unwrap(), 42);
3074 assert_eq!(lazy_future.load(StdOrdering::SeqCst), 1);
3075 }
3076
3077 #[test]
3078 fn wp6b_unfold_resource_closes_on_completion_failure_and_cancellation() {
3079 let closed = StdArc::new(StdAtomicUsize::new(0));
3080 let closed_on_complete = StdArc::clone(&closed);
3081 let values = Source::unfold_resource(
3082 || Ok(std::collections::VecDeque::from([1, 2, 3])),
3083 |items| Ok(items.pop_front()),
3084 move |_items| {
3085 closed_on_complete.fetch_add(1, StdOrdering::SeqCst);
3086 Ok(())
3087 },
3088 )
3089 .run_collect()
3090 .unwrap();
3091 assert_eq!(values, vec![1, 2, 3]);
3092 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3093
3094 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3095 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3096 let failed = Source::<i32>::unfold_resource(
3097 || Ok(()),
3098 |_| Err(StreamError::Failed("read".into())),
3099 move |_| {
3100 closed_on_failure_for_close.fetch_add(1, StdOrdering::SeqCst);
3101 Err(StreamError::Failed("close".into()))
3102 },
3103 )
3104 .run_collect();
3105 assert_eq!(failed, Err(StreamError::Failed("read".into())));
3106 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3107
3108 let closed_on_cancel = StdArc::new(StdAtomicUsize::new(0));
3109 let closed_on_cancel_for_close = StdArc::clone(&closed_on_cancel);
3110 let first = Source::unfold_resource(
3111 || Ok(0_usize),
3112 |next| {
3113 let item = *next;
3114 *next += 1;
3115 Ok(Some(item))
3116 },
3117 move |_| {
3118 closed_on_cancel_for_close.fetch_add(1, StdOrdering::SeqCst);
3119 Ok(())
3120 },
3121 )
3122 .run_with(Sink::head())
3123 .unwrap();
3124 assert_eq!(first.wait().unwrap(), 0);
3125 assert!(wait_until(Duration::from_millis(250), || {
3126 closed_on_cancel.load(StdOrdering::SeqCst) == 1
3127 }));
3128 }
3129
3130 #[test]
3131 fn wp6b_async_resource_and_async_accumulators_are_sequential() {
3132 let closed = StdArc::new(StdAtomicUsize::new(0));
3133 let closed_for_close = StdArc::clone(&closed);
3134 let values = Source::unfold_resource_async(
3135 || async { Ok(std::collections::VecDeque::from([1, 2, 3])) },
3136 |items| {
3137 let item = items.pop_front();
3138 async move { Ok(item) }
3139 },
3140 move |_items| {
3141 let closed = StdArc::clone(&closed_for_close);
3142 async move {
3143 closed.fetch_add(1, StdOrdering::SeqCst);
3144 Ok(())
3145 }
3146 },
3147 )
3148 .run_collect()
3149 .unwrap();
3150 assert_eq!(values, vec![1, 2, 3]);
3151 assert_eq!(closed.load(StdOrdering::SeqCst), 1);
3152
3153 let closed_on_failure = StdArc::new(StdAtomicUsize::new(0));
3154 let closed_on_failure_for_close = StdArc::clone(&closed_on_failure);
3155 let failed = Source::<i32>::unfold_resource_async(
3156 || async { Ok(()) },
3157 |_resource| async { Err(StreamError::Failed("read".into())) },
3158 move |_resource| {
3159 let closed_on_failure = StdArc::clone(&closed_on_failure_for_close);
3160 async move {
3161 closed_on_failure.fetch_add(1, StdOrdering::SeqCst);
3162 Err(StreamError::Failed("close".into()))
3163 }
3164 },
3165 )
3166 .run_collect();
3167 assert_eq!(failed, Err(StreamError::Failed("read".into())));
3168 assert_eq!(closed_on_failure.load(StdOrdering::SeqCst), 1);
3169
3170 let active = StdArc::new(StdAtomicUsize::new(0));
3171 let max_active = StdArc::new(StdAtomicUsize::new(0));
3172 let active_for_stage = StdArc::clone(&active);
3173 let max_for_stage = StdArc::clone(&max_active);
3174 let scanned = Source::from_iter(1..=4)
3175 .scan_async(0, move |acc, item| {
3176 let active = StdArc::clone(&active_for_stage);
3177 let max_active = StdArc::clone(&max_for_stage);
3178 async move {
3179 let now = active.fetch_add(1, StdOrdering::SeqCst) + 1;
3180 max_active.fetch_max(now, StdOrdering::SeqCst);
3181 tokio::time::sleep(Duration::from_millis(1)).await;
3182 active.fetch_sub(1, StdOrdering::SeqCst);
3183 Ok(acc + item)
3184 }
3185 })
3186 .run_collect()
3187 .unwrap();
3188 assert_eq!(scanned, vec![0, 1, 3, 6, 10]);
3189 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
3190
3191 let folded = Source::from_iter(1..=4)
3192 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3193 .run_collect()
3194 .unwrap();
3195 assert_eq!(folded, vec![10]);
3196 }
3197
3198 #[test]
3199 fn wp6b_fold_async_materialization_does_not_drain_upstream() {
3200 let release = StdArc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
3201 let started = StdArc::new(StdAtomicBool::new(false));
3202 let source = {
3203 let release = StdArc::clone(&release);
3204 let started = StdArc::clone(&started);
3205 Source::from_factory(move || {
3206 let release = StdArc::clone(&release);
3207 let started = StdArc::clone(&started);
3208 let mut emitted = false;
3209 Box::new(std::iter::from_fn(move || {
3210 if emitted {
3211 return None;
3212 }
3213 emitted = true;
3214 started.store(true, StdOrdering::SeqCst);
3215 let (released, available) = &*release;
3216 let mut released = released.lock().unwrap();
3217 while !*released {
3218 released = available.wait(released).unwrap();
3219 }
3220 Some(Ok(1))
3221 }))
3222 })
3223 };
3224
3225 let (materialized_tx, materialized_rx) = mpsc::channel();
3226 let join = thread::spawn(move || {
3227 let queue = source
3228 .fold_async(0, |acc, item| async move { Ok(acc + item) })
3229 .run_with(Sink::queue())
3230 .unwrap();
3231 materialized_tx.send(queue).unwrap();
3232 });
3233
3234 let queue = match materialized_rx.recv_timeout(StdDuration::from_secs(1)) {
3235 Ok(queue) => queue,
3236 Err(error) => {
3237 let (released, available) = &*release;
3238 *released.lock().unwrap() = true;
3239 available.notify_all();
3240 let _ = join.join();
3241 panic!("fold_async materialization did not return before first pull: {error}");
3242 }
3243 };
3244 let (released, _) = &*release;
3245 assert!(
3246 !*released.lock().unwrap(),
3247 "test source was released before materialization returned"
3248 );
3249
3250 let (released, available) = &*release;
3251 *released.lock().unwrap() = true;
3252 available.notify_all();
3253 assert_eq!(queue.pull().unwrap(), Some(1));
3254 assert_eq!(queue.pull().unwrap(), None);
3255 join.join().unwrap();
3256 assert!(started.load(StdOrdering::SeqCst));
3257 }
3258
3259 #[test]
3260 fn wp6b_lazy_sink_and_flow_wait_for_first_element() {
3261 let lazy_sink_created = StdArc::new(StdAtomicUsize::new(0));
3262 let lazy_sink_created_for_factory = StdArc::clone(&lazy_sink_created);
3263 let empty_sink = Source::<i32>::empty()
3264 .run_with(Sink::lazy_sink(move || {
3265 lazy_sink_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3266 Sink::ignore()
3267 }))
3268 .unwrap();
3269 assert!(matches!(empty_sink.wait(), Err(StreamError::Failed(_))));
3270 assert_eq!(lazy_sink_created.load(StdOrdering::SeqCst), 0);
3271
3272 let foreach_sum = StdArc::new(StdAtomicUsize::new(0));
3273 let foreach_sum_for_sink = StdArc::clone(&foreach_sum);
3274 Source::from_iter([1_usize, 2, 3])
3275 .run_with(Sink::foreach_async(2, move |item| {
3276 let foreach_sum = StdArc::clone(&foreach_sum_for_sink);
3277 async move {
3278 foreach_sum.fetch_add(item, StdOrdering::SeqCst);
3279 Ok(())
3280 }
3281 }))
3282 .unwrap()
3283 .wait()
3284 .unwrap();
3285 assert_eq!(foreach_sum.load(StdOrdering::SeqCst), 6);
3286
3287 let lazy_flow_created = StdArc::new(StdAtomicUsize::new(0));
3288 let lazy_flow_created_for_factory = StdArc::clone(&lazy_flow_created);
3289 let lazy_flow = Flow::<i32, i32>::lazy_flow(move || {
3290 lazy_flow_created_for_factory.fetch_add(1, StdOrdering::SeqCst);
3291 Flow::identity()
3292 .map(|item: i32| item + 10)
3293 .map_materialized_value(|_| 123)
3294 });
3295 let mat = (lazy_flow.materialize)().unwrap();
3296 let mut stream = match lazy_flow.transform {
3297 flow::FlowTransform::Runtime(transform) => {
3298 transform(Box::new([Ok(1), Ok(2)].into_iter()), &Materializer::new()).unwrap()
3299 }
3300 flow::FlowTransform::Pure(_) => panic!("lazy flow must be runtime-backed"),
3301 };
3302 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 0);
3303 assert_eq!(stream.next().unwrap().unwrap(), 11);
3304 assert_eq!(mat.wait().unwrap(), 123);
3305 assert_eq!(lazy_flow_created.load(StdOrdering::SeqCst), 1);
3306 assert_eq!(stream.next().unwrap().unwrap(), 12);
3307
3308 let future_flow = Source::from_iter([1, 2])
3309 .via_mat(
3310 Flow::future_flow(|| async {
3311 Ok(Flow::identity()
3312 .map(|item: i32| item * 2)
3313 .map_materialized_value(|_| 77))
3314 }),
3315 Keep::right,
3316 )
3317 .to_mat(Sink::collect(), Keep::both)
3318 .run()
3319 .unwrap();
3320 assert_eq!(future_flow.0.wait().unwrap(), 77);
3321 assert_eq!(future_flow.1.wait().unwrap(), vec![2, 4]);
3322 }
3323
3324 #[test]
3325 fn wp6b_lazy_flow_double_use_in_one_chain_pairs_instances_in_order() {
3326 for round in 0..50 {
3331 let counter = StdArc::new(StdAtomicUsize::new(1));
3332 let factory_counter = StdArc::clone(&counter);
3333 let lazy: Flow<usize, usize, _> = Flow::lazy_flow(move || {
3334 let id = factory_counter.fetch_add(1, StdOrdering::SeqCst);
3335 Flow::identity()
3336 .map(move |x: usize| x * 100 + id)
3337 .map_materialized_value(move |_| id)
3338 });
3339 let lazy_again = lazy.clone();
3340
3341 let ((first_mat, second_mat), out) = Source::from_iter([0usize])
3342 .via_mat(lazy, Keep::right)
3343 .via_mat(lazy_again, Keep::both)
3344 .to_mat(Sink::collect(), Keep::both)
3345 .run()
3346 .unwrap();
3347
3348 let first_id = first_mat.wait().unwrap();
3349 let second_id = second_mat.wait().unwrap();
3350 let element = out.wait().unwrap()[0];
3351 assert_eq!(
3352 element,
3353 first_id * 100 + second_id,
3354 "round {round}: mats ({first_id},{second_id}) cross-wired with transform order"
3355 );
3356 assert_ne!(
3357 first_id, second_id,
3358 "round {round}: same factory instance paired twice"
3359 );
3360 }
3361 }
3362
3363 #[test]
3364 fn wp6b_lazy_flow_clones_materialize_concurrently_without_cross_wiring() {
3365 for _ in 0..20 {
3366 let next_id = StdArc::new(StdAtomicUsize::new(0));
3367 let next_id_for_factory = StdArc::clone(&next_id);
3368 let flow = Flow::<i32, i32>::lazy_flow(move || {
3369 let id = next_id_for_factory.fetch_add(1, StdOrdering::SeqCst) + 1;
3370 Flow::identity()
3371 .map(move |item: i32| item + (id as i32 * 100))
3372 .map_materialized_value(move |_| id)
3373 });
3374 let barrier = StdArc::new(std::sync::Barrier::new(3));
3375
3376 let spawn_materialization = |input: i32| {
3377 let flow = flow.clone();
3378 let barrier = StdArc::clone(&barrier);
3379 thread::spawn(move || {
3380 barrier.wait();
3381 let (mat, values) = Source::single(input)
3382 .via_mat(flow, Keep::right)
3383 .to_mat(Sink::collect(), Keep::both)
3384 .run()
3385 .unwrap();
3386 (input, mat.wait().unwrap(), values.wait().unwrap())
3387 })
3388 };
3389
3390 let first = spawn_materialization(1);
3391 let second = spawn_materialization(2);
3392 barrier.wait();
3393
3394 for result in [first.join().unwrap(), second.join().unwrap()] {
3395 let (input, mat_id, values) = result;
3396 assert_eq!(values, vec![input + (mat_id as i32 * 100)]);
3397 }
3398 assert_eq!(next_id.load(StdOrdering::SeqCst), 2);
3399 }
3400 }
3401
3402 #[test]
3403 fn wp6b_map_with_resource_emits_close_item_before_terminal_error() {
3404 let queue = Source::from_factory(|| {
3405 Box::new(vec![Ok(1), Err(StreamError::Failed("upstream".into()))].into_iter())
3406 })
3407 .map_with_resource(
3408 || Ok(()),
3409 |_resource, item| Ok(item + 10),
3410 |_resource| Ok(Some(99)),
3411 )
3412 .run_with(Sink::queue())
3413 .unwrap();
3414
3415 assert_eq!(queue.pull().unwrap(), Some(11));
3416 assert_eq!(queue.pull().unwrap(), Some(99));
3417 assert_eq!(queue.pull(), Err(StreamError::Failed("upstream".into())));
3418
3419 let failed: StreamResult<Vec<i32>> = Source::single(1)
3420 .map_with_resource(
3421 || Ok(()),
3422 |_resource, _item| -> StreamResult<i32> { Err(StreamError::Failed("map".into())) },
3423 |_resource| -> StreamResult<Option<i32>> {
3424 Err(StreamError::Failed("close".into()))
3425 },
3426 )
3427 .run_collect();
3428 assert_eq!(failed, Err(StreamError::Failed("map".into())));
3429 }
3430
3431 #[test]
3432 fn stateful_and_terminal_source_operators_work() {
3433 let stateful = Source::from_iter([1, 2, 3])
3434 .stateful_map(0, |sum, item| {
3435 *sum += item;
3436 *sum
3437 })
3438 .run_collect()
3439 .unwrap();
3440 assert_eq!(stateful, vec![1, 3, 6]);
3441
3442 let concat = Source::from_iter([1, 2, 3])
3443 .stateful_map_concat(0, |sum, item| {
3444 *sum += item;
3445 [item, *sum]
3446 })
3447 .run_collect()
3448 .unwrap();
3449 assert_eq!(concat, vec![1, 1, 2, 3, 3, 6]);
3450
3451 assert_eq!(
3452 Source::from_iter([1, 2, 3])
3453 .fold(10, |acc, item| acc + item)
3454 .run_collect()
3455 .unwrap(),
3456 vec![16]
3457 );
3458 assert_eq!(
3459 Source::from_iter([1, 2, 3])
3460 .reduce(|acc, item| acc + item)
3461 .run_collect()
3462 .unwrap(),
3463 vec![6]
3464 );
3465 }
3466
3467 #[test]
3468 fn concat_and_sliding_emit_before_unbounded_upstream_finishes() {
3469 let concat = Source::single(())
3470 .map_concat(|_| 0_u64..)
3471 .take(1)
3472 .run_collect()
3473 .unwrap();
3474 assert_eq!(concat, vec![0]);
3475
3476 let sliding = Source::repeat(1_u64)
3477 .sliding(2, 1)
3478 .take(1)
3479 .run_collect()
3480 .unwrap();
3481 assert_eq!(sliding, vec![vec![1, 1]]);
3482 }
3483
3484 #[test]
3485 fn fan_in_source_operators_follow_ordering_rules() {
3486 assert_eq!(
3487 Source::from_iter([1, 2])
3488 .concat(Source::from_iter([3, 4]))
3489 .run_collect()
3490 .unwrap(),
3491 vec![1, 2, 3, 4]
3492 );
3493 assert_eq!(
3494 Source::from_iter([3, 4])
3495 .prepend(Source::from_iter([1, 2]))
3496 .run_collect()
3497 .unwrap(),
3498 vec![1, 2, 3, 4]
3499 );
3500 assert_eq!(
3501 Source::empty()
3502 .or_else(Source::from_iter([10, 20]))
3503 .run_collect()
3504 .unwrap(),
3505 vec![10, 20]
3506 );
3507 assert_eq!(
3508 Source::from_iter([1, 2])
3509 .or_else(Source::from_iter([10, 20]))
3510 .run_collect()
3511 .unwrap(),
3512 vec![1, 2]
3513 );
3514 assert_eq!(
3515 Source::from_iter([1, 2, 3])
3516 .interleave(Source::from_iter([10, 11, 12]), 2)
3517 .run_collect()
3518 .unwrap(),
3519 vec![1, 2, 10, 11, 3, 12]
3520 );
3521 }
3522
3523 #[test]
3524 fn fan_in_flow_operators_compose_with_primary_stream() {
3525 let concat = Source::from_iter([1, 2])
3526 .via(Flow::identity().concat(Source::from_iter([3, 4])))
3527 .run_collect()
3528 .unwrap();
3529 assert_eq!(concat, vec![1, 2, 3, 4]);
3530
3531 let prepend = Source::from_iter([3, 4])
3532 .via(Flow::identity().prepend(Source::from_iter([1, 2])))
3533 .run_collect()
3534 .unwrap();
3535 assert_eq!(prepend, vec![1, 2, 3, 4]);
3536
3537 let interleave = Source::from_iter([1, 2, 3])
3538 .via(Flow::identity().interleave(Source::from_iter([10, 11, 12]), 1))
3539 .run_collect()
3540 .unwrap();
3541 assert_eq!(interleave, vec![1, 10, 2, 11, 3, 12]);
3542
3543 let merge_sorted = Source::from_iter([1, 4])
3544 .via(Flow::identity().merge_sorted(Source::from_iter([2, 3, 5])))
3545 .run_collect()
3546 .unwrap();
3547 assert_eq!(merge_sorted, vec![1, 2, 3, 4, 5]);
3548
3549 let zip_latest = Source::from_iter([1, 2])
3550 .via(Flow::identity().zip_latest(Source::single(10)))
3551 .run_collect()
3552 .unwrap();
3553 assert_eq!(zip_latest, vec![(1, 10), (2, 10)]);
3554
3555 let zip_latest_with = Source::from_iter([1, 2])
3556 .via(
3557 Flow::identity()
3558 .zip_latest_with(Source::single(10), false, |left, right| left + right),
3559 )
3560 .run_collect()
3561 .unwrap();
3562 assert_eq!(zip_latest_with, vec![11, 12]);
3563 }
3564
3565 #[test]
3566 fn fan_in_operators_propagate_errors_and_eager_close() {
3567 assert!(matches!(
3568 Source::failed(StreamError::Failed("boom".into()))
3569 .or_else(Source::from_iter([1, 2]))
3570 .run_collect(),
3571 Err(StreamError::Failed(_))
3572 ));
3573 assert!(matches!(
3574 Source::from_iter([1, 2])
3575 .prepend(Source::failed(StreamError::Failed("boom".into())))
3576 .run_collect(),
3577 Err(StreamError::Failed(_))
3578 ));
3579 assert_eq!(
3580 Source::from_iter([1, 2])
3581 .interleave_all([Source::empty()], 1, true)
3582 .run_collect()
3583 .unwrap(),
3584 vec![1]
3585 );
3586 }
3587
3588 #[test]
3589 fn interleave_lazy_pulls_only_inputs_needed_for_first_segment() {
3590 use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering};
3591
3592 let pulls: Arc<[AtomicUsize; 3]> = Arc::new([
3593 AtomicUsize::new(0),
3594 AtomicUsize::new(0),
3595 AtomicUsize::new(0),
3596 ]);
3597
3598 let make_source = |idx: usize| {
3599 let pulls = Arc::clone(&pulls);
3600 Source::from_materialized_factory(move |_| {
3601 let pulls = Arc::clone(&pulls);
3602 let mut emitted = false;
3603 Ok((
3604 Box::new(std::iter::from_fn(move || {
3605 pulls[idx].fetch_add(1, Ordering::SeqCst);
3606 if !emitted && idx == 0 {
3607 emitted = true;
3608 Some(Ok(42))
3609 } else {
3610 None
3611 }
3612 })) as BoxStream<i32>,
3613 NotUsed,
3614 ))
3615 })
3616 };
3617
3618 let result = make_source(0)
3619 .interleave_all([make_source(1), make_source(2)], 1, false)
3620 .run_with(Sink::head());
3621
3622 assert_eq!(wait(result.unwrap()), 42);
3623 assert_eq!(pulls[0].load(Ordering::SeqCst), 1);
3624 assert_eq!(
3625 pulls[1].load(Ordering::SeqCst),
3626 0,
3627 "second input should not be pulled when downstream cancels after first element"
3628 );
3629 assert_eq!(
3630 pulls[2].load(Ordering::SeqCst),
3631 0,
3632 "third input should not be pulled before its turn"
3633 );
3634 }
3635
3636 #[test]
3637 fn interleave_non_eager_drains_remaining_when_one_input_completes() {
3638 assert_eq!(
3639 Source::from_iter([1, 2, 3, 4])
3640 .interleave_all(
3641 [Source::from_iter([10]), Source::from_iter([20, 21, 22])],
3642 1,
3643 false
3644 )
3645 .run_collect()
3646 .unwrap(),
3647 vec![1, 10, 20, 2, 21, 3, 22, 4]
3648 );
3649 }
3650
3651 #[test]
3652 fn remaining_merge_and_zip_family_matches_expected_ordering() {
3653 assert_eq!(
3654 Source::from_iter([1, 4])
3655 .merge_sorted(Source::from_iter([2, 3, 5]))
3656 .run_collect()
3657 .unwrap(),
3658 vec![1, 2, 3, 4, 5]
3659 );
3660
3661 assert_eq!(
3662 Source::from_iter([1, 2])
3663 .merge_latest(Source::single(10), false)
3664 .run_collect()
3665 .unwrap(),
3666 vec![vec![1, 10], vec![2, 10]]
3667 );
3668
3669 assert_eq!(
3670 Source::from_iter([1, 2, 3])
3671 .merge_all([Source::from_iter([10, 11])], false)
3672 .run_collect()
3673 .unwrap(),
3674 vec![1, 10, 2, 11, 3]
3675 );
3676
3677 assert_eq!(
3678 Source::from_iter([1, 2, 3])
3679 .zip_with(Source::from_iter([10, 11, 12]), |left, right| left + right)
3680 .run_collect()
3681 .unwrap(),
3682 vec![11, 13, 15]
3683 );
3684
3685 assert_eq!(
3686 Source::from_iter([1, 2])
3687 .zip_latest(Source::single(10))
3688 .run_collect()
3689 .unwrap(),
3690 vec![(1, 10), (2, 10)]
3691 );
3692
3693 assert_eq!(
3694 Source::from_iter([1, 2, 3])
3695 .zip_latest_with(Source::from_iter([10]), false, |left, right| left + right)
3696 .run_collect()
3697 .unwrap(),
3698 vec![11, 12, 13]
3699 );
3700
3701 assert_eq!(
3702 Source::from_iter([1, 2])
3703 .zip_all(Source::from_iter([10, 11, 12]), -1, -2)
3704 .run_collect()
3705 .unwrap(),
3706 vec![(1, 10), (2, 11), (-1, 12)]
3707 );
3708
3709 assert_eq!(
3710 Source::from_iter([5, 6, 7])
3711 .zip_with_index()
3712 .run_collect()
3713 .unwrap(),
3714 vec![(5, 0), (6, 1), (7, 2)]
3715 );
3716
3717 assert_eq!(
3718 Source::zip_n([Source::from_iter([1, 2]), Source::from_iter([10, 20])])
3719 .run_collect()
3720 .unwrap(),
3721 vec![vec![1, 10], vec![2, 20]]
3722 );
3723
3724 assert_eq!(
3725 Source::zip_with_n(
3726 [
3727 Source::from_iter([1, 2]),
3728 Source::from_iter([10, 20]),
3729 Source::from_iter([100, 200]),
3730 ],
3731 |values| values.into_iter().sum::<i32>(),
3732 )
3733 .run_collect()
3734 .unwrap(),
3735 vec![111, 222]
3736 );
3737
3738 assert_eq!(
3739 Source::merge_prioritized_n(
3740 [
3741 (Source::from_iter([1, 2, 3, 4]), 2),
3742 (Source::from_iter([10, 11]), 1),
3743 ],
3744 false,
3745 )
3746 .run_collect()
3747 .unwrap(),
3748 vec![1, 2, 10, 3, 4, 11]
3749 );
3750
3751 assert_eq!(
3752 Source::combine(
3753 Source::from_iter([1, 2, 3]),
3754 Source::from_iter([10, 11]),
3755 std::iter::empty::<Source<i32, NotUsed>>(),
3756 SourceCombineStrategy::Merge {
3757 eager_complete: false,
3758 },
3759 )
3760 .run_collect()
3761 .unwrap(),
3762 vec![1, 10, 2, 11, 3]
3763 );
3764
3765 let combined_sink = Sink::combine(
3766 Sink::ignore(),
3767 Sink::ignore(),
3768 std::iter::empty::<Sink<i32, NotUsed>>(),
3769 SinkCombineStrategy::Broadcast,
3770 );
3771 assert_eq!(
3772 Source::from_iter([1, 2, 3])
3773 .run_with(combined_sink)
3774 .unwrap(),
3775 NotUsed
3776 );
3777 }
3778
3779 #[test]
3780 fn sink_combine_broadcast_delivers_every_element_to_every_child() {
3781 let first_count = StdArc::new(StdAtomicUsize::new(0));
3785 let second_count = StdArc::new(StdAtomicUsize::new(0));
3786 let first_counter = StdArc::clone(&first_count);
3787 let second_counter = StdArc::clone(&second_count);
3788 let combined = Sink::combine(
3789 Sink::foreach(move |_: i32| {
3790 first_counter.fetch_add(1, StdOrdering::SeqCst);
3791 }),
3792 Sink::foreach(move |_: i32| {
3793 second_counter.fetch_add(1, StdOrdering::SeqCst);
3794 }),
3795 std::iter::empty::<Sink<i32, NotUsed>>(),
3796 SinkCombineStrategy::Broadcast,
3797 );
3798 assert_eq!(
3799 Source::from_iter(0..100).run_with(combined).unwrap(),
3800 NotUsed
3801 );
3802 assert!(wait_until(StdDuration::from_secs(1), || {
3807 first_count.load(StdOrdering::SeqCst) == 100
3808 && second_count.load(StdOrdering::SeqCst) == 100
3809 }));
3810 }
3811
3812 #[test]
3813 fn zip_latest_completes_when_one_side_finishes_without_emitting() {
3814 assert_eq!(
3818 Source::from_iter(std::iter::empty::<i32>())
3819 .zip_latest_with(Source::repeat(10), false, |left, right| left + right)
3820 .run_collect()
3821 .unwrap(),
3822 Vec::<i32>::new()
3823 );
3824 assert_eq!(
3825 Source::repeat(10)
3826 .zip_latest_with(
3827 Source::from_iter(std::iter::empty::<i32>()),
3828 false,
3829 |left, right| left + right,
3830 )
3831 .run_collect()
3832 .unwrap(),
3833 Vec::<i32>::new()
3834 );
3835 }
3836
3837 #[test]
3838 fn zip_family_completion_boundaries_match_expected_results() {
3839 assert_eq!(
3840 Source::from_iter([1, 2, 3])
3841 .zip_with(Source::from_iter([10]), |left, right| left + right)
3842 .run_collect()
3843 .unwrap(),
3844 vec![11]
3845 );
3846
3847 assert_eq!(
3848 Source::from_iter([1, 2, 3])
3849 .zip_latest_with(Source::from_iter([10]), true, |left, right| left + right)
3850 .run_collect()
3851 .unwrap(),
3852 vec![11, 12]
3853 );
3854
3855 assert_eq!(
3856 Source::zip_n([
3857 Source::from_iter([1, 2, 3]),
3858 Source::from_iter([10]),
3859 Source::from_iter([100, 200, 300]),
3860 ])
3861 .run_collect()
3862 .unwrap(),
3863 vec![vec![1, 10, 100]]
3864 );
3865 }
3866
3867 #[test]
3868 fn combine_strategies_follow_merge_concat_and_priority_rules() {
3869 assert_eq!(
3870 Source::combine(
3871 Source::from_iter([1, 2]),
3872 Source::from_iter([10, 11]),
3873 [Source::from_iter([100])],
3874 SourceCombineStrategy::Concat,
3875 )
3876 .run_collect()
3877 .unwrap(),
3878 vec![1, 2, 10, 11, 100]
3879 );
3880
3881 assert_eq!(
3882 Source::combine(
3883 Source::from_iter([1, 2, 3, 4]),
3884 Source::from_iter([10, 11]),
3885 std::iter::empty::<Source<i32, NotUsed>>(),
3886 SourceCombineStrategy::Prioritized {
3887 priorities: vec![2, 1],
3888 eager_complete: false,
3889 },
3890 )
3891 .run_collect()
3892 .unwrap(),
3893 vec![1, 2, 10, 3, 4, 11]
3894 );
3895 }
3896
3897 #[test]
3898 fn concat_lazy_defers_follow_on_source_until_needed() {
3899 let source_counter = StdArc::new(StdAtomicUsize::new(0));
3900 let source_counter_clone = StdArc::clone(&source_counter);
3901 let lazy_source = Source::from_materialized_factory(move |_| {
3902 source_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3903 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3904 });
3905 let source_head = Source::single(1)
3906 .concat_lazy(lazy_source)
3907 .run_with(Sink::head());
3908 assert_eq!(wait(source_head.unwrap()), 1);
3909 assert_eq!(source_counter.load(StdOrdering::SeqCst), 0);
3910
3911 let flow_counter = StdArc::new(StdAtomicUsize::new(0));
3912 let flow_counter_clone = StdArc::clone(&flow_counter);
3913 let lazy_flow_source = Source::from_materialized_factory(move |_| {
3914 flow_counter_clone.fetch_add(1, StdOrdering::SeqCst);
3915 Ok((Box::new(std::iter::once(Ok(99))), NotUsed))
3916 });
3917 let flow_head = Source::single(1)
3918 .via(Flow::identity().concat_lazy(lazy_flow_source))
3919 .run_with(Sink::head());
3920 assert_eq!(wait(flow_head.unwrap()), 1);
3921 assert_eq!(flow_counter.load(StdOrdering::SeqCst), 0);
3922 }
3923
3924 #[test]
3925 fn also_to_completes_when_side_sink_cancels() {
3926 assert_eq!(
3927 Source::from_iter([1, 2, 3])
3928 .also_to(Sink::cancelled())
3929 .run_collect()
3930 .unwrap(),
3931 Vec::<i32>::new()
3932 );
3933 assert_eq!(
3934 Source::from_iter([1, 2, 3])
3935 .also_to_all([Sink::cancelled(), Sink::cancelled()])
3936 .run_collect()
3937 .unwrap(),
3938 Vec::<i32>::new()
3939 );
3940 }
3941
3942 #[test]
3943 fn also_to_completes_gracefully_when_side_sink_disconnects() {
3944 let result = Source::from_iter(0..100)
3945 .also_to(Sink::head())
3946 .run_collect()
3947 .unwrap();
3948 assert!(!result.is_empty(), "main should emit at least one element");
3949 assert!(
3950 result.len() < 100,
3951 "main should complete early when side disconnects"
3952 );
3953 }
3954
3955 #[test]
3956 fn also_to_propagates_original_error_when_side_is_disconnected() {
3957 let err = StreamError::Failed("distinctive-boom".into());
3958 assert!(matches!(
3959 Source::<i32>::failed(err.clone())
3960 .also_to(Sink::cancelled())
3961 .run_collect(),
3962 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3963 ));
3964 assert!(matches!(
3965 Source::<i32>::failed(err.clone())
3966 .also_to_all([Sink::cancelled()])
3967 .run_collect(),
3968 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3969 ));
3970 assert!(matches!(
3971 Source::<i32>::failed(err)
3972 .divert_to(Sink::cancelled(), |_: &i32| true)
3973 .run_collect(),
3974 Err(StreamError::Failed(msg)) if msg == "distinctive-boom"
3975 ));
3976 }
3977
3978 #[test]
3979 fn divert_to_routes_matching_elements_to_side_sink() {
3980 let diverted = Source::from_iter([1, 2, 3, 4])
3981 .divert_to(Sink::ignore(), |item| item % 2 == 0)
3982 .run_collect()
3983 .unwrap();
3984 assert_eq!(diverted, vec![1, 3]);
3985 }
3986
3987 #[test]
3988 fn wire_tap_drops_when_side_sink_backpressures() {
3989 let tapped = Source::from_iter([1, 2, 3])
3990 .wire_tap(Sink::head())
3991 .run_collect()
3992 .unwrap();
3993 assert_eq!(tapped, vec![1, 2, 3]);
3994
3995 let tapped_via_flow = Source::from_iter([1, 2, 3])
3996 .via(Flow::identity().wire_tap(Sink::head()))
3997 .run_collect()
3998 .unwrap();
3999 assert_eq!(tapped_via_flow, vec![1, 2, 3]);
4000 }
4001
4002 #[test]
4003 fn async_mapping_variants_complete() {
4004 let ordered = Source::from_iter(0..4)
4005 .map_async(2, |item| async move { Ok(item * 2) })
4006 .run_collect()
4007 .unwrap();
4008 assert_eq!(ordered, vec![0, 2, 4, 6]);
4009
4010 let unordered = Source::from_iter(0..4)
4011 .map_async_unordered(2, |item| async move { Ok(item * 2) })
4012 .run_collect()
4013 .unwrap();
4014 assert_eq!(unordered, vec![0, 2, 4, 6]);
4015
4016 let partitioned = Source::from_iter(0..4)
4017 .map_async_partitioned(4, 1, |item| item % 2, |item| async move { Ok(item + 1) })
4018 .run_collect()
4019 .unwrap();
4020 assert_eq!(partitioned, vec![1, 2, 3, 4]);
4021 }
4022
4023 #[test]
4024 fn map_async_ordered_bounds_pulls_behind_stuck_head() {
4025 let pulls = StdArc::new(StdAtomicUsize::new(0));
4026 let pulls_for_source = StdArc::clone(&pulls);
4027 let probe = Source::from_fn_iter(move || {
4028 let pulls = StdArc::clone(&pulls_for_source);
4029 std::iter::from_fn(move || {
4030 let next = pulls.fetch_add(1, StdOrdering::SeqCst);
4031 Some(next)
4032 })
4033 })
4034 .map_async(2, |item| async move {
4035 if item == 0 {
4036 tokio::time::sleep(StdDuration::from_millis(300)).await;
4037 }
4038 Ok(item)
4039 })
4040 .run_with(TestSink::probe())
4041 .unwrap();
4042
4043 probe.request(16);
4044 thread::sleep(StdDuration::from_millis(100));
4045 assert!(
4046 pulls.load(StdOrdering::SeqCst) <= 3,
4047 "pulled {} elements with parallelism=2 behind a stuck ordered head",
4048 pulls.load(StdOrdering::SeqCst)
4049 );
4050 }
4051
4052 #[test]
4053 fn async_mapping_parks_until_woken_future_completes() {
4054 struct WakeOnceFuture {
4055 value: Option<u64>,
4056 ready: StdArc<StdAtomicBool>,
4057 started: bool,
4058 polls: StdArc<StdAtomicUsize>,
4059 latest_waker: StdArc<Mutex<Option<std::task::Waker>>>,
4060 }
4061
4062 impl std::future::Future for WakeOnceFuture {
4063 type Output = StreamResult<u64>;
4064
4065 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
4066 let this = self.as_mut().get_mut();
4067 this.polls.fetch_add(1, StdOrdering::SeqCst);
4068 *this.latest_waker.lock().expect("latest wake slot mutex") =
4069 Some(cx.waker().clone());
4070 if this.ready.load(StdOrdering::SeqCst) {
4071 return Poll::Ready(Ok(this.value.take().unwrap()));
4072 }
4073
4074 if !this.started {
4075 this.started = true;
4076 let ready = StdArc::clone(&this.ready);
4077 let latest_waker = StdArc::clone(&this.latest_waker);
4078 thread::spawn(move || {
4079 thread::sleep(Duration::from_millis(20));
4080 ready.store(true, StdOrdering::SeqCst);
4081 if let Some(waker) =
4082 latest_waker.lock().expect("latest wake slot mutex").take()
4083 {
4084 waker.wake();
4085 }
4086 });
4087 }
4088 Poll::Pending
4089 }
4090 }
4091
4092 let polls = StdArc::new(StdAtomicUsize::new(0));
4093 let polls_for_stage = StdArc::clone(&polls);
4094 let start = Instant::now();
4095
4096 let values = Source::single(41)
4097 .map_async(1, move |item| WakeOnceFuture {
4098 value: Some(item + 1),
4099 ready: StdArc::new(StdAtomicBool::new(false)),
4100 started: false,
4101 polls: StdArc::clone(&polls_for_stage),
4102 latest_waker: StdArc::new(Mutex::new(None)),
4103 })
4104 .run_collect()
4105 .unwrap();
4106
4107 assert_eq!(values, vec![42]);
4108 let elapsed = start.elapsed();
4109 assert!(
4110 elapsed >= StdDuration::from_millis(15) && elapsed < StdDuration::from_millis(250),
4111 "pending future should park until woken once, elapsed={elapsed:?}"
4112 );
4113 assert!(
4114 polls.load(StdOrdering::SeqCst) < 4096,
4115 "pending future was repolled too aggressively"
4116 );
4117 }
4118
4119 #[test]
4120 fn async_mapping_emits_before_unbounded_upstream_finishes() {
4121 let ordered = Source::repeat(1)
4122 .map_async(2, |item| async move { Ok(item + 1) })
4123 .take(1)
4124 .run_collect()
4125 .unwrap();
4126 assert_eq!(ordered, vec![2]);
4127
4128 let unordered = Source::repeat(1)
4129 .map_async_unordered(2, |item| async move { Ok(item + 1) })
4130 .take(1)
4131 .run_collect()
4132 .unwrap();
4133 assert_eq!(unordered, vec![2]);
4134
4135 let partitioned = Source::repeat(1)
4136 .map_async_partitioned(2, 1, |_| 0_u8, |item| async move { Ok(item + 1) })
4137 .take(1)
4138 .run_collect()
4139 .unwrap();
4140 assert_eq!(partitioned, vec![2]);
4141 }
4142
4143 #[test]
4144 fn partitioned_async_mapping_limits_same_key_concurrency() {
4145 let active = StdArc::new(StdAtomicUsize::new(0));
4146 let max_active = StdArc::new(StdAtomicUsize::new(0));
4147 let active_for_stage = StdArc::clone(&active);
4148 let max_for_stage = StdArc::clone(&max_active);
4149
4150 let values = Source::from_iter(0..6)
4151 .map_async_partitioned(
4152 4,
4153 1,
4154 |_| 0_u8,
4155 move |item| {
4156 let active = StdArc::clone(&active_for_stage);
4157 let max_active = StdArc::clone(&max_for_stage);
4158 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4159 max_active.fetch_max(current, StdOrdering::SeqCst);
4160 async move {
4161 thread::sleep(Duration::from_millis(1));
4162 active.fetch_sub(1, StdOrdering::SeqCst);
4163 Ok(item)
4164 }
4165 },
4166 )
4167 .run_collect()
4168 .unwrap();
4169
4170 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
4171 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4172 }
4173
4174 #[test]
4175 fn partitioned_async_mapping_scans_past_blocked_pending_key() {
4176 let active = StdArc::new(StdAtomicUsize::new(0));
4177 let max_active = StdArc::new(StdAtomicUsize::new(0));
4178 let active_for_stage = StdArc::clone(&active);
4179 let max_for_stage = StdArc::clone(&max_active);
4180 let (release_tx, release_rx) = oneshot::channel::<()>();
4181 let release_rx = StdArc::new(std::sync::Mutex::new(Some(release_rx)));
4182 let release_rx_for_stage = StdArc::clone(&release_rx);
4183 let max_for_release = StdArc::clone(&max_active);
4184
4185 let releaser = thread::spawn(move || {
4186 let deadline = Instant::now() + StdDuration::from_secs(1);
4187 while max_for_release.load(StdOrdering::SeqCst) < 2 && Instant::now() < deadline {
4188 thread::yield_now();
4189 }
4190 let _ = release_tx.send(());
4191 });
4192
4193 let values = Source::from_iter([0, 2, 1])
4194 .map_async_partitioned(
4195 2,
4196 1,
4197 |item| item % 2,
4198 move |item| {
4199 let active = StdArc::clone(&active_for_stage);
4200 let max_active = StdArc::clone(&max_for_stage);
4201 let release_rx = StdArc::clone(&release_rx_for_stage);
4202 let current = active.fetch_add(1, StdOrdering::SeqCst) + 1;
4203 max_active.fetch_max(current, StdOrdering::SeqCst);
4204 async move {
4205 if item == 0 {
4206 let receiver = release_rx
4207 .lock()
4208 .expect("release receiver mutex")
4209 .take()
4210 .expect("release receiver present");
4211 let _ = receiver.await;
4212 }
4213 active.fetch_sub(1, StdOrdering::SeqCst);
4214 Ok(item)
4215 }
4216 },
4217 )
4218 .run_collect()
4219 .unwrap();
4220 releaser.join().unwrap();
4221
4222 assert_eq!(values, vec![0, 2, 1]);
4223 assert_eq!(max_active.load(StdOrdering::SeqCst), 2);
4224 }
4225
4226 #[test]
4227 fn partitioned_async_mapping_p1_still_evaluates_partition() {
4228 let partitions = StdArc::new(StdAtomicUsize::new(0));
4229 let partitions_for_stage = StdArc::clone(&partitions);
4230
4231 let values = Source::from_iter(0..8)
4232 .map_async_partitioned(
4233 1,
4234 1,
4235 move |item| {
4236 partitions_for_stage.fetch_add(1, StdOrdering::SeqCst);
4237 item % 2
4238 },
4239 |item| async move { Ok(item + 1) },
4240 )
4241 .run_collect()
4242 .unwrap();
4243
4244 assert_eq!(values, (1..9).collect::<Vec<_>>());
4245 assert_eq!(partitions.load(StdOrdering::SeqCst), 8);
4246 }
4247
4248 #[test]
4249 fn partitioned_async_mapping_handles_many_keys_high_parallelism() {
4250 let active_by_key =
4251 StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4252 let max_by_key = StdArc::new((0..16).map(|_| StdAtomicUsize::new(0)).collect::<Vec<_>>());
4253 let active_for_stage = StdArc::clone(&active_by_key);
4254 let max_for_stage = StdArc::clone(&max_by_key);
4255
4256 let values = Source::from_iter(0..512_usize)
4257 .map_async_partitioned(
4258 32,
4259 1,
4260 |item| item % 16,
4261 move |item| {
4262 let active = StdArc::clone(&active_for_stage);
4263 let max_active = StdArc::clone(&max_for_stage);
4264 let key = item % 16;
4265 let current = active[key].fetch_add(1, StdOrdering::SeqCst) + 1;
4266 max_active[key].fetch_max(current, StdOrdering::SeqCst);
4267 async move {
4268 active[key].fetch_sub(1, StdOrdering::SeqCst);
4269 Ok(item)
4270 }
4271 },
4272 )
4273 .run_collect()
4274 .unwrap();
4275
4276 assert_eq!(values, (0..512).collect::<Vec<_>>());
4277 for max_active in max_by_key.iter() {
4278 assert_eq!(max_active.load(StdOrdering::SeqCst), 1);
4279 }
4280 }
4281
4282 #[test]
4283 fn error_operators_map_recover_and_complete() {
4284 let mapped = Source::<i32>::failed(StreamError::Failed("boom".into()))
4285 .map_error(|_| StreamError::Failed("mapped".into()))
4286 .run_collect();
4287 assert_eq!(mapped, Err(StreamError::Failed("mapped".into())));
4288
4289 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4290 .recover(|error| match error {
4291 StreamError::Failed(_) => Some(42),
4292 _ => None,
4293 })
4294 .run_collect()
4295 .unwrap();
4296 assert_eq!(recovered, vec![42]);
4297
4298 let unrecovered = Source::<i32>::failed(StreamError::Failed("original".into()))
4299 .recover(|_| None)
4300 .run_collect();
4301 assert_eq!(unrecovered, Err(StreamError::Failed("original".into())));
4302
4303 let recovered_with = Source::<i32>::failed(StreamError::Failed("boom".into()))
4304 .recover_with_retries(1, |_| Some(Source::from_iter([1, 2])))
4305 .run_collect()
4306 .unwrap();
4307 assert_eq!(recovered_with, vec![1, 2]);
4308
4309 let declined_recover_with = Source::<i32>::failed(StreamError::Failed("declined".into()))
4310 .recover_with_retries(1, |_| None)
4311 .run_collect();
4312 assert_eq!(
4313 declined_recover_with,
4314 Err(StreamError::Failed("declined".into()))
4315 );
4316
4317 let completed = Source::from_factory(|| {
4318 Box::new(vec![Ok(1), Err(StreamError::Failed("ignored".into())), Ok(2)].into_iter())
4319 })
4320 .on_error_complete()
4321 .run_collect()
4322 .unwrap();
4323 assert_eq!(completed, vec![1]);
4324 }
4325
4326 #[test]
4327 fn sliding_matches_akka_window_semantics() {
4328 assert_eq!(
4330 Source::from_iter(1..=4)
4331 .sliding(3, 1)
4332 .run_collect()
4333 .unwrap(),
4334 vec![vec![1, 2, 3], vec![2, 3, 4]]
4335 );
4336 assert_eq!(
4337 Source::from_iter(1..=4)
4338 .sliding(2, 1)
4339 .run_collect()
4340 .unwrap(),
4341 vec![vec![1, 2], vec![2, 3], vec![3, 4]]
4342 );
4343 assert_eq!(
4345 Source::from_iter(1..=3)
4346 .sliding(3, 1)
4347 .run_collect()
4348 .unwrap(),
4349 vec![vec![1, 2, 3]]
4350 );
4351 assert_eq!(
4353 Source::from_iter(1..=2)
4354 .sliding(3, 1)
4355 .run_collect()
4356 .unwrap(),
4357 vec![vec![1, 2]]
4358 );
4359 assert_eq!(
4361 Source::from_iter(1..=3)
4362 .sliding(1, 1)
4363 .run_collect()
4364 .unwrap(),
4365 vec![vec![1], vec![2], vec![3]]
4366 );
4367 assert_eq!(
4369 Source::from_iter(1..=6)
4370 .sliding(2, 3)
4371 .run_collect()
4372 .unwrap(),
4373 vec![vec![1, 2], vec![4, 5]]
4374 );
4375 assert_eq!(
4377 Source::from_iter(1..=3)
4378 .sliding(2, 4)
4379 .run_collect()
4380 .unwrap(),
4381 vec![vec![1, 2]]
4382 );
4383 }
4384
4385 #[test]
4386 fn recover_with_retries_indefinitely_like_akka() {
4387 let attempts = StdArc::new(StdAtomicUsize::new(0));
4388 let attempts_in_stage = StdArc::clone(&attempts);
4389 let recovered = Source::<i32>::failed(StreamError::Failed("boom".into()))
4392 .recover_with(move |_error| {
4393 if attempts_in_stage.fetch_add(1, StdOrdering::SeqCst) < 5 {
4394 Some(Source::<i32>::failed(StreamError::Failed("again".into())))
4395 } else {
4396 Some(Source::from_iter([42]))
4397 }
4398 })
4399 .run_collect()
4400 .unwrap();
4401 assert_eq!(recovered, vec![42]);
4402 assert_eq!(attempts.load(StdOrdering::SeqCst), 6);
4403 }
4404
4405 #[test]
4406 fn many_concurrent_streams_do_not_starve_the_pool() {
4407 let materializer = Materializer::new();
4416 let busy = 6_usize;
4417
4418 let mut held = Vec::with_capacity(busy);
4419 for _ in 0..busy {
4420 held.push(
4421 Source::single(1_u64)
4422 .run_with_materializer(Sink::never(), &materializer)
4423 .unwrap(),
4424 );
4425 }
4426
4427 for _ in 0..400 {
4428 if materializer.active_streams() >= busy {
4429 break;
4430 }
4431 thread::sleep(Duration::from_millis(5));
4432 }
4433 assert_eq!(materializer.active_streams(), busy);
4434
4435 let sum = Source::from_iter(0_u64..5)
4438 .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
4439 .unwrap();
4440 assert_eq!(sum.wait().unwrap(), 10);
4441
4442 materializer.shutdown();
4443 for completion in held {
4444 assert_eq!(completion.wait(), Err(StreamError::AbruptTermination));
4445 }
4446 }
4447}