Skip to main content

datum/stream/
sink.rs

1//! `Sink<In, Mat>` — the terminal of a linear stream — and `RunnableGraph<Mat>`,
2//! the closed, reusable blueprint produced by `Source::to`/`to_mat`.
3//!
4//! `RunnableGraph::run` is the execution boundary and builds a fresh `Materializer`
5//! per call. Terminal sinks (`collect`/`fold`/`head`/…) materialize a
6//! `StreamCompletion<T>`.
7
8use super::flow;
9use super::*;
10use crate::Attributes;
11use crate::stream::error::{decide_supervision, panic_stream_error};
12
13/// A materialized child of `Sink::combine`: the feed channel plus the child's
14/// materialized value. The mat is held (type-erased) until the parent runner
15/// finishes — dropping it early would trip `StreamCompletion`'s
16/// cancel-on-drop and silently cancel the child before it consumes anything.
17type CombinedSinkChild<In> = (
18    std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
19    Box<dyn std::any::Any + Send>,
20);
21
22type CombinedSinkRunner<In> =
23    dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
24type DeferredSinkFactory<In, Mat> =
25    dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
26
27enum CombinedSinkMessage<In> {
28    Item(StreamResult<In>),
29    Flush(std::sync::mpsc::SyncSender<()>),
30    Close,
31}
32
33const TERMINAL_CONSUMER_BATCH: usize = 64;
34
35pub struct Sink<In, Mat> {
36    runner: Arc<SinkRunner<In, Mat>>,
37    inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
38    hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
39    raw_hinted_runner: Option<Arc<HintedSinkRunner<In, Mat>>>,
40    attributes: Attributes,
41    deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
42    /// Set only for recognised fold/collect/ignore sinks. `None` for all other sinks.
43    pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
44}
45
46/// Non-generic helper for `Sink::map_materialized_value` — accepts an erased function so
47/// that only one monomorphization exists per `(In, Mat, NextMat)` triple. This prevents
48/// the deferred-factory recursion from producing an ever-growing closure type chain.
49fn map_mat_dyn<In, Mat, NextMat>(
50    sink: Sink<In, Mat>,
51    f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
52) -> Sink<In, NextMat>
53where
54    In: Send + 'static,
55    Mat: Send + 'static,
56    NextMat: Send + 'static,
57{
58    let Sink {
59        runner,
60        inline_runner,
61        hinted_runner,
62        raw_hinted_runner,
63        attributes,
64        deferred_factory,
65        fold_fp: _,
66    } = sink;
67    let mapped_runner = {
68        let f = Arc::clone(&f);
69        Arc::new(move |input, materializer: &Materializer| {
70            let mat = runner(input, materializer)?;
71            Ok(f(mat))
72        }) as Arc<SinkRunner<In, NextMat>>
73    };
74    let mapped_inline_runner = inline_runner.map(|ir| {
75        let f = Arc::clone(&f);
76        Arc::new(move |input, materializer: &Materializer| {
77            let result = ir(input, materializer)?;
78            Ok(f(result))
79        }) as Arc<SinkRunner<In, NextMat>>
80    });
81    let mapped_hinted_runner = hinted_runner.map(|hr| {
82        let f = Arc::clone(&f);
83        Arc::new(
84            move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
85                let result = hr(input, materializer, hints)?;
86                Ok(f(result))
87            },
88        ) as Arc<HintedSinkRunner<In, NextMat>>
89    });
90    let mapped_raw_hinted_runner = raw_hinted_runner.map(|hr| {
91        let f = Arc::clone(&f);
92        Arc::new(
93            move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
94                let result = hr(input, materializer, hints)?;
95                Ok(f(result))
96            },
97        ) as Arc<HintedSinkRunner<In, NextMat>>
98    });
99    let mapped_factory = deferred_factory.map(|factory| {
100        let f = Arc::clone(&f);
101        Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
102            map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
103        }) as Arc<DeferredSinkFactory<In, NextMat>>
104    });
105    Sink {
106        runner: mapped_runner,
107        inline_runner: mapped_inline_runner,
108        hinted_runner: mapped_hinted_runner,
109        raw_hinted_runner: mapped_raw_hinted_runner,
110        attributes,
111        deferred_factory: mapped_factory,
112        fold_fp: None,
113    }
114}
115
116impl<In, Mat> Clone for Sink<In, Mat> {
117    fn clone(&self) -> Self {
118        Self {
119            runner: Arc::clone(&self.runner),
120            inline_runner: self.inline_runner.as_ref().map(Arc::clone),
121            hinted_runner: self.hinted_runner.as_ref().map(Arc::clone),
122            raw_hinted_runner: self.raw_hinted_runner.as_ref().map(Arc::clone),
123            attributes: self.attributes.clone(),
124            deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
125            fold_fp: self.fold_fp.as_ref().map(Arc::clone),
126        }
127    }
128}
129
130impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
131    pub(crate) fn from_runner<F>(runner: F) -> Self
132    where
133        F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
134    {
135        Self::from_runner_parts(Arc::new(runner), None)
136    }
137
138    pub(crate) fn from_runner_parts(
139        runner: Arc<SinkRunner<In, Mat>>,
140        inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
141    ) -> Self {
142        Self {
143            runner,
144            inline_runner,
145            hinted_runner: None,
146            raw_hinted_runner: None,
147            attributes: Attributes::default(),
148            deferred_factory: None,
149            fold_fp: None,
150        }
151    }
152
153    pub(crate) fn from_hinted_runner<F>(runner: F) -> Self
154    where
155        F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
156            + Send
157            + Sync
158            + 'static,
159    {
160        let hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
161        let fallback = {
162            let hinted_runner = Arc::clone(&hinted_runner);
163            Arc::new(move |input, materializer: &Materializer| {
164                hinted_runner(input, materializer, SourceRuntimeHints::default())
165            }) as Arc<SinkRunner<In, Mat>>
166        };
167        Self {
168            runner: fallback,
169            inline_runner: None,
170            hinted_runner: Some(hinted_runner),
171            raw_hinted_runner: None,
172            attributes: Attributes::default(),
173            deferred_factory: None,
174            fold_fp: None,
175        }
176    }
177
178    pub(crate) fn from_raw_hinted_runner<F>(runner: F) -> Self
179    where
180        F: Fn(BoxStream<In>, &Materializer, SourceRuntimeHints) -> StreamResult<Mat>
181            + Send
182            + Sync
183            + 'static,
184    {
185        let raw_hinted_runner: Arc<HintedSinkRunner<In, Mat>> = Arc::new(runner);
186        let fallback = {
187            let raw_hinted_runner = Arc::clone(&raw_hinted_runner);
188            Arc::new(move |input, materializer: &Materializer| {
189                let input =
190                    runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
191                raw_hinted_runner(input, materializer, SourceRuntimeHints::default())
192            }) as Arc<SinkRunner<In, Mat>>
193        };
194        Self {
195            runner: fallback,
196            inline_runner: None,
197            hinted_runner: None,
198            raw_hinted_runner: Some(raw_hinted_runner),
199            attributes: Attributes::default(),
200            deferred_factory: None,
201            fold_fp: None,
202        }
203    }
204
205    pub(super) fn run(
206        &self,
207        input: BoxStream<In>,
208        materializer: &Materializer,
209    ) -> StreamResult<Mat> {
210        self.run_with_source_hints(input, materializer, SourceRuntimeHints::default())
211    }
212
213    pub(super) fn run_with_source_hints(
214        &self,
215        input: BoxStream<In>,
216        materializer: &Materializer,
217        hints: SourceRuntimeHints,
218    ) -> StreamResult<Mat> {
219        if let Some(factory) = &self.deferred_factory {
220            let attrs = materializer.effective_attributes(&self.attributes);
221            return factory(materializer, &attrs).run_with_source_hints(input, materializer, hints);
222        }
223        if let Some(hinted_runner) = &self.hinted_runner {
224            return hinted_runner(input, materializer, hints);
225        }
226        (self.runner)(input, materializer)
227    }
228
229    pub(super) fn run_from_source(
230        &self,
231        input: BoxStream<In>,
232        materializer: &Materializer,
233        hints: SourceRuntimeHints,
234    ) -> StreamResult<Mat> {
235        if let Some(factory) = &self.deferred_factory {
236            let attrs = materializer.effective_attributes(&self.attributes);
237            return factory(materializer, &attrs).run_from_source(input, materializer, hints);
238        }
239        if let Some(raw_hinted_runner) = &self.raw_hinted_runner {
240            return raw_hinted_runner(input, materializer, hints);
241        }
242        let input = runtime_checked_stream(input, Arc::clone(&materializer.inner.state), None);
243        self.run_with_source_hints(input, materializer, hints)
244    }
245
246    pub(super) fn can_inline(&self) -> bool {
247        self.inline_runner.is_some()
248    }
249
250    pub(super) fn run_inline(
251        &self,
252        input: BoxStream<In>,
253        materializer: &Materializer,
254    ) -> StreamResult<Mat> {
255        if let Some(factory) = &self.deferred_factory {
256            let attrs = materializer.effective_attributes(&self.attributes);
257            return factory(materializer, &attrs).run_inline(input, materializer);
258        }
259        (self
260            .inline_runner
261            .as_ref()
262            .expect("inline sink runner exists"))(input, materializer)
263    }
264
265    pub fn run_with<SourceMat: Send + 'static>(
266        self,
267        source: Source<In, SourceMat>,
268    ) -> StreamResult<SourceMat> {
269        source.to(self).run()
270    }
271
272    pub fn run_with_materializer<SourceMat: Send + 'static>(
273        self,
274        source: Source<In, SourceMat>,
275        materializer: &Materializer,
276    ) -> StreamResult<SourceMat> {
277        source.to(self).run_with_materializer(materializer)
278    }
279
280    #[must_use]
281    pub fn from_materializer<F>(factory: F) -> Self
282    where
283        F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
284    {
285        let factory = Arc::new(factory);
286        Self {
287            runner: Arc::new(|_input, _materializer| {
288                Err(StreamError::Failed(
289                    "deferred sink factory must be driven through Sink::run".into(),
290                ))
291            }),
292            inline_runner: None,
293            hinted_runner: None,
294            raw_hinted_runner: None,
295            attributes: Attributes::default(),
296            deferred_factory: Some(factory),
297            fold_fp: None,
298        }
299    }
300
301    #[must_use]
302    pub fn setup<F>(factory: F) -> Self
303    where
304        F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
305    {
306        Self::from_materializer(factory)
307    }
308
309    pub fn pre_materialize(
310        &self,
311        materializer: &Materializer,
312    ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
313        let (sender, receiver) = std::sync::mpsc::sync_channel(1);
314        let materialized = self.clone().run(
315            Box::new(std::iter::from_fn(move || receiver.recv().ok())),
316            materializer,
317        )?;
318        let sender = Arc::new(Mutex::new(Some(sender)));
319        let sink = Sink::from_runner(move |input, _materializer| {
320            let Some(sender) = sender
321                .lock()
322                .expect("pre-materialized sink poisoned")
323                .take()
324            else {
325                return Err(StreamError::Failed(
326                    "pre-materialized sink has already been materialized".into(),
327                ));
328            };
329            for item in input {
330                if sender.send(item).is_err() {
331                    break;
332                }
333            }
334            Ok(NotUsed)
335        });
336        Ok((materialized, sink.with_attributes(self.attributes.clone())))
337    }
338
339    #[must_use]
340    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
341    where
342        NextMat: Send + 'static,
343        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
344    {
345        // Delegate to a non-generic helper that accepts `Arc<dyn Fn>`. This breaks
346        // the infinite monomorphization chain that Rust 1.96 rejects: the old inline
347        // implementation called `map_materialized_value` inside the deferred-factory
348        // closure, which produced a new instantiation for a strictly deeper closure type
349        // at each level (depth > 1024 triggered the recursion limit).
350        map_mat_dyn(self, Arc::new(f))
351    }
352
353    #[must_use]
354    pub fn attributes(&self) -> &Attributes {
355        &self.attributes
356    }
357
358    #[must_use]
359    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
360        self.attributes = attributes;
361        self
362    }
363
364    #[must_use]
365    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
366        self.attributes = self.attributes.and(attributes);
367        self
368    }
369
370    #[must_use]
371    pub fn named(self, name: impl Into<String>) -> Self {
372        self.add_attributes(Attributes::named(name))
373    }
374}
375
376#[derive(Clone)]
377pub struct RunnableGraph<Mat> {
378    pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
379    attributes: Attributes,
380}
381
382#[derive(Debug, Clone, Copy, PartialEq, Eq)]
383pub enum SinkCombineStrategy {
384    Broadcast,
385    Balance,
386}
387
388impl<Mat: Send + 'static> RunnableGraph<Mat> {
389    pub(super) fn from_runner<F>(runner: F) -> Self
390    where
391        F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
392    {
393        Self {
394            runner: Arc::new(runner),
395            attributes: Attributes::default(),
396        }
397    }
398
399    pub fn run(&self) -> StreamResult<Mat> {
400        Materializer::new().materialize(self)
401    }
402
403    pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
404        materializer.materialize(self)
405    }
406
407    #[must_use]
408    pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
409    where
410        Next: Send + 'static,
411        F: Fn(Mat) -> Next + Send + Sync + 'static,
412    {
413        let f = Arc::new(f);
414        RunnableGraph::from_runner(move |materializer| {
415            let mat = (self.runner)(materializer)?;
416            Ok(f(mat))
417        })
418    }
419
420    #[must_use]
421    pub fn attributes(&self) -> &Attributes {
422        &self.attributes
423    }
424
425    #[must_use]
426    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
427        self.attributes = attributes;
428        self
429    }
430
431    #[must_use]
432    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
433        self.attributes = self.attributes.and(attributes);
434        self
435    }
436
437    #[must_use]
438    pub fn named(self, name: impl Into<String>) -> Self {
439        self.add_attributes(Attributes::named(name))
440    }
441}
442
443impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
444    #[must_use]
445    pub fn combine<M1, M2, MRest, I>(
446        first: Sink<In, M1>,
447        second: Sink<In, M2>,
448        rest: I,
449        strategy: SinkCombineStrategy,
450    ) -> Sink<In, NotUsed>
451    where
452        M1: Send + 'static,
453        M2: Send + 'static,
454        MRest: Send + 'static,
455        I: IntoIterator<Item = Sink<In, MRest>>,
456    {
457        let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
458            Arc::new(move |materializer| {
459                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
460                let mat = first.run(
461                    Box::new(std::iter::from_fn(move || {
462                        loop {
463                            match receiver.recv().ok()? {
464                                CombinedSinkMessage::Item(item) => return Some(item),
465                                CombinedSinkMessage::Flush(ack) => {
466                                    let _ = ack.send(());
467                                }
468                                CombinedSinkMessage::Close => return None,
469                            }
470                        }
471                    })),
472                    materializer,
473                )?;
474                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
475            }),
476            Arc::new(move |materializer| {
477                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
478                let mat = second.run(
479                    Box::new(std::iter::from_fn(move || {
480                        loop {
481                            match receiver.recv().ok()? {
482                                CombinedSinkMessage::Item(item) => return Some(item),
483                                CombinedSinkMessage::Flush(ack) => {
484                                    let _ = ack.send(());
485                                }
486                                CombinedSinkMessage::Close => return None,
487                            }
488                        }
489                    })),
490                    materializer,
491                )?;
492                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
493            }),
494        ];
495        runners.extend(rest.into_iter().map(|sink| {
496            Arc::new(move |materializer: &Materializer| {
497                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
498                let mat = sink.run(
499                    Box::new(std::iter::from_fn(move || {
500                        loop {
501                            match receiver.recv().ok()? {
502                                CombinedSinkMessage::Item(item) => return Some(item),
503                                CombinedSinkMessage::Flush(ack) => {
504                                    let _ = ack.send(());
505                                }
506                                CombinedSinkMessage::Close => return None,
507                            }
508                        }
509                    })),
510                    materializer,
511                )?;
512                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
513            }) as Arc<CombinedSinkRunner<In>>
514        }));
515
516        Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
517            // Each child's materialized value is held until the feed loop is
518            // done: dropping it would trip cancel-on-drop and kill the child.
519            let mut children = runners
520                .iter()
521                .map(|runner| runner(materializer))
522                .collect::<StreamResult<Vec<_>>>()?;
523            let mut next = 0usize;
524            for item in input.by_ref() {
525                match item {
526                    Ok(value) => match strategy {
527                        SinkCombineStrategy::Broadcast => {
528                            children.retain(|(sender, _)| {
529                                sender
530                                    .send(CombinedSinkMessage::Item(Ok(value.clone())))
531                                    .is_ok()
532                            });
533                            if children.is_empty() {
534                                break;
535                            }
536                        }
537                        SinkCombineStrategy::Balance => {
538                            while !children.is_empty() {
539                                let index = next % children.len();
540                                next = next.wrapping_add(1);
541                                match children[index]
542                                    .0
543                                    .send(CombinedSinkMessage::Item(Ok(value.clone())))
544                                {
545                                    Ok(()) => break,
546                                    Err(_) => {
547                                        children.remove(index);
548                                    }
549                                }
550                            }
551                            if children.is_empty() {
552                                break;
553                            }
554                        }
555                    },
556                    Err(error) => {
557                        for (sender, _) in &children {
558                            let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
559                        }
560                        return Err(error);
561                    }
562                }
563            }
564            // Wait for each child to request the next element after the final
565            // handoff so terminal side effects have definitely run before we
566            // release its materialized value.
567            for (sender, _) in &children {
568                let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
569                if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
570                    let _ = ack_receiver.recv();
571                }
572            }
573            // Then close every feed channel so each child observes end-of-stream
574            // and only afterwards release the held mats.
575            let mats: Vec<_> = children
576                .into_iter()
577                .map(|(sender, mat)| {
578                    let _ = sender.send(CombinedSinkMessage::Close);
579                    mat
580                })
581                .collect();
582            drop(mats);
583            Ok(NotUsed)
584        })
585    }
586}
587
588impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
589    fn from_task_runner<F>(runner: F) -> Self
590    where
591        F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
592    {
593        Self::from_task_runner_with_inline(runner, false)
594    }
595
596    fn from_raw_terminal_task_runner<F>(runner: F) -> Self
597    where
598        F: Fn(
599                BoxStream<In>,
600                Materializer,
601                Arc<AtomicBool>,
602                SourceRuntimeHints,
603            ) -> StreamResult<Mat>
604            + Send
605            + Sync
606            + 'static,
607    {
608        let runner = Arc::new(runner);
609        let async_runner = {
610            let runner = Arc::clone(&runner);
611            Arc::new(move |input, materializer: &Materializer| {
612                let runner = Arc::clone(&runner);
613                let worker_materializer =
614                    materializer.with_name_prefix(materializer.name_prefix().to_owned());
615                Ok(materializer.spawn_stream(move |cancelled| {
616                    runner(
617                        input,
618                        worker_materializer,
619                        cancelled,
620                        SourceRuntimeHints::default(),
621                    )
622                }))
623            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
624        };
625        let raw_hinted_runner = {
626            let runner = Arc::clone(&runner);
627            Arc::new(
628                move |input, materializer: &Materializer, hints: SourceRuntimeHints| {
629                    let runner = Arc::clone(&runner);
630                    let worker_materializer =
631                        materializer.with_name_prefix(materializer.name_prefix().to_owned());
632                    Ok(materializer.spawn_stream(move |cancelled| {
633                        runner(input, worker_materializer, cancelled, hints)
634                    }))
635                },
636            ) as Arc<HintedSinkRunner<In, StreamCompletion<Mat>>>
637        };
638        Sink {
639            runner: async_runner,
640            inline_runner: None,
641            hinted_runner: None,
642            raw_hinted_runner: Some(raw_hinted_runner),
643            attributes: Attributes::default(),
644            deferred_factory: None,
645            fold_fp: None,
646        }
647    }
648
649    fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
650    where
651        F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
652    {
653        let runner = Arc::new(runner);
654        let async_runner = {
655            let runner = Arc::clone(&runner);
656            Arc::new(move |input, materializer: &Materializer| {
657                let runner = Arc::clone(&runner);
658                let state = Arc::clone(&materializer.inner.state);
659                Ok(materializer.spawn_stream(move |cancelled| {
660                    runner(runtime_checked_stream(input, state, Some(cancelled)))
661                }))
662            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
663        };
664        let inline_runner = inline.then(|| {
665            let runner = Arc::clone(&runner);
666            Arc::new(move |input, materializer: &Materializer| {
667                let runner = Arc::clone(&runner);
668                let state = Arc::clone(&materializer.inner.state);
669                Ok(materializer.spawn_stream_inline(move |cancelled| {
670                    runner(runtime_checked_stream(input, state, Some(cancelled)))
671                }))
672            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
673        });
674        Sink::from_runner_parts(async_runner, inline_runner)
675    }
676}
677
678impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
679    #[must_use]
680    pub fn collect() -> Self {
681        let task_runner =
682            Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
683                run_collect_terminal(input, materializer, cancelled, hints)
684            });
685        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
686            _phantom: std::marker::PhantomData,
687        });
688        Sink {
689            runner: task_runner.runner,
690            inline_runner: task_runner.inline_runner,
691            hinted_runner: task_runner.hinted_runner,
692            raw_hinted_runner: task_runner.raw_hinted_runner,
693            attributes: task_runner.attributes,
694            deferred_factory: task_runner.deferred_factory,
695            fold_fp: Some(fp),
696        }
697    }
698
699    #[must_use]
700    pub fn collection() -> Self {
701        Self::collect()
702    }
703
704    #[must_use]
705    pub fn take_last(n: usize) -> Self {
706        Sink::from_task_runner(move |input| {
707            if n == 0 {
708                for item in input {
709                    let _ = item?;
710                }
711                return Ok(Vec::new());
712            }
713            let mut buffer = VecDeque::with_capacity(n);
714            for item in input {
715                let item = item?;
716                if buffer.len() == n {
717                    buffer.pop_front();
718                }
719                buffer.push_back(item);
720            }
721            Ok(buffer.into_iter().collect())
722        })
723    }
724}
725
726impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
727    #[must_use]
728    pub fn ignore() -> Self {
729        let task_runner =
730            Sink::from_raw_terminal_task_runner(|input, materializer, cancelled, hints| {
731                run_ignore_terminal(input, materializer, cancelled, hints)
732            });
733        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
734            _phantom: std::marker::PhantomData,
735        });
736        Sink {
737            runner: task_runner.runner,
738            inline_runner: task_runner.inline_runner,
739            hinted_runner: task_runner.hinted_runner,
740            raw_hinted_runner: task_runner.raw_hinted_runner,
741            attributes: task_runner.attributes,
742            deferred_factory: task_runner.deferred_factory,
743            fold_fp: Some(fp),
744        }
745    }
746
747    #[must_use]
748    pub fn on_complete<F>(callback: F) -> Self
749    where
750        F: FnOnce() + Send + Sync + 'static,
751    {
752        let callback = Arc::new(Mutex::new(Some(callback)));
753        Sink::from_task_runner(move |input| {
754            for item in input {
755                item?;
756            }
757            if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
758                cb();
759            }
760            Ok(NotUsed)
761        })
762    }
763
764    #[must_use]
765    pub fn never() -> Self {
766        Sink::from_runner(|input, materializer| {
767            let state = Arc::clone(&materializer.inner.state);
768            let shutdown_state = Arc::clone(&state);
769            Ok(materializer.spawn_stream(move |cancelled| {
770                let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
771                for item in input {
772                    item?;
773                }
774                loop {
775                    if shutdown_state.shutdown.load(Ordering::SeqCst) {
776                        return Err(StreamError::AbruptTermination);
777                    }
778                    if cancelled.load(Ordering::SeqCst) {
779                        return Err(StreamError::Cancelled);
780                    }
781                    thread::sleep(Duration::from_millis(1));
782                }
783            }))
784        })
785    }
786
787    #[must_use]
788    pub fn foreach<F>(f: F) -> Self
789    where
790        F: Fn(In) + Send + Sync + 'static,
791    {
792        let f_arc = Arc::new(f);
793        let f_runner = Arc::clone(&f_arc);
794        let task_runner = Sink::from_task_runner(move |input| {
795            for item in input {
796                f_runner(item?);
797            }
798            Ok(NotUsed)
799        });
800        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::ForeachDescriptor { f: f_arc });
801        Sink {
802            runner: task_runner.runner,
803            inline_runner: task_runner.inline_runner,
804            hinted_runner: task_runner.hinted_runner,
805            raw_hinted_runner: task_runner.raw_hinted_runner,
806            attributes: task_runner.attributes,
807            deferred_factory: task_runner.deferred_factory,
808            fold_fp: Some(fp),
809        }
810    }
811
812    #[must_use]
813    pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
814    where
815        F: Fn(In) -> Fut + Send + Sync + 'static,
816        Fut: Future<Output = StreamResult<()>> + Send + 'static,
817    {
818        Flow::identity()
819            .map_async_unordered(parallelism, f)
820            .to_mat(Sink::ignore(), Keep::right)
821    }
822
823    #[must_use]
824    pub fn foreach_result<F>(f: F) -> Self
825    where
826        F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
827    {
828        Sink::from_task_runner(move |input| {
829            for item in input {
830                f(item?)?;
831            }
832            Ok(NotUsed)
833        })
834    }
835
836    #[must_use]
837    pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
838    where
839        F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
840    {
841        Sink::from_task_runner(move |input| {
842            for item in input {
843                let item = item?;
844                match catch_unwind(AssertUnwindSafe(|| f(item)))
845                    .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
846                {
847                    Ok(()) => {}
848                    Err(error) => match decide_supervision(&decider, &error) {
849                        SupervisionDirective::Stop => return Err(error),
850                        SupervisionDirective::Resume | SupervisionDirective::Restart => {}
851                    },
852                }
853            }
854            Ok(NotUsed)
855        })
856    }
857}
858
859impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
860    /// Materializes a sink that completes with the stream's first element, or
861    /// fails with [`StreamError::EmptyStream`] if the stream is empty.
862    ///
863    /// Fed by a synchronous bounded eager source (`Source::single`/`from_iter`/
864    /// `empty`/`failed`, optionally through inline-preserving synchronous flows
865    /// such as `map`/`filter`/`identity`), this terminal takes the inline head
866    /// fast path: the first element is produced on the calling thread *during*
867    /// materialization, without spawning a worker or a oneshot channel. As a
868    /// result `run_with_materializer()` blocks until that element is available and
869    /// the returned [`StreamCompletion`] is already resolved, so any work a caller
870    /// expected to overlap between `run_with_materializer()` returning and
871    /// `.wait()` is already done for these sources. Other sources — and any chain
872    /// that passes through a non-preserving operator, including `ActorFlow::ask`
873    /// (whose blocking cross-thread reply wait must not run inline on the caller) —
874    /// still drain on a runtime worker as before, keeping the returned
875    /// [`StreamCompletion`] non-blocking/awaitable.
876    #[must_use]
877    pub fn head() -> Self {
878        Sink::from_task_runner_with_inline(
879            |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
880            true,
881        )
882    }
883
884    #[must_use]
885    pub fn last() -> Self {
886        Sink::from_task_runner(|input| {
887            let mut last = None;
888            for item in input {
889                last = Some(item?);
890            }
891            last.ok_or(StreamError::EmptyStream)
892        })
893    }
894
895    #[must_use]
896    pub fn reduce<F>(f: F) -> Self
897    where
898        F: Fn(In, In) -> In + Send + Sync + 'static,
899    {
900        Sink::from_task_runner(move |mut input| {
901            let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
902            for item in input {
903                acc = f(acc, item?);
904            }
905            Ok(acc)
906        })
907    }
908
909    #[must_use]
910    pub fn reduce_result<F>(f: F) -> Self
911    where
912        F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
913    {
914        Sink::from_task_runner(move |mut input| {
915            let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
916            for item in input {
917                acc = f(acc, item?)?;
918            }
919            Ok(acc)
920        })
921    }
922
923    #[must_use]
924    pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
925    where
926        In: Clone,
927        F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
928    {
929        Sink::from_task_runner(move |mut input: BoxStream<In>| {
930            let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
931            for item in input {
932                let item = item?;
933                let Some(previous) = acc.take() else {
934                    acc = Some(item);
935                    continue;
936                };
937                match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
938                    .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
939                {
940                    Ok(next) => acc = Some(next),
941                    Err(error) => match decide_supervision(&decider, &error) {
942                        SupervisionDirective::Stop => return Err(error),
943                        SupervisionDirective::Resume => acc = Some(previous),
944                        SupervisionDirective::Restart => acc = None,
945                    },
946                }
947            }
948            acc.ok_or(StreamError::EmptyStream)
949        })
950    }
951}
952
953impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
954    /// Materializes a sink that completes with `Some(first element)`, or `None`
955    /// if the stream is empty.
956    ///
957    /// Like [`Sink::head`], fed by a synchronous bounded eager source
958    /// (`Source::single`/`from_iter`/`empty`/`failed`, optionally through
959    /// inline-preserving synchronous flows such as `map`/`filter`/`identity`) this
960    /// terminal takes the inline head fast path: the result is produced on the
961    /// calling thread *during* materialization without spawning a worker or a
962    /// oneshot channel, so `run_with_materializer()` blocks until it is available
963    /// and the returned [`StreamCompletion`] is already resolved. Any work a caller
964    /// expected to overlap between `run_with_materializer()` returning and
965    /// `.wait()` is already done for these sources; other sources — and any chain
966    /// through a non-preserving operator, including `ActorFlow::ask` (whose
967    /// blocking cross-thread reply wait must not run inline) — still drain on a
968    /// runtime worker, keeping the returned [`StreamCompletion`]
969    /// non-blocking/awaitable.
970    #[must_use]
971    pub fn head_option() -> Self {
972        Sink::from_task_runner_with_inline(
973            |mut input| match input.next() {
974                Some(Ok(item)) => Ok(Some(item)),
975                Some(Err(error)) => Err(error),
976                None => Ok(None),
977            },
978            true,
979        )
980    }
981
982    #[must_use]
983    pub fn last_option() -> Self {
984        Sink::from_task_runner(|input| {
985            let mut last = None;
986            for item in input {
987                last = Some(item?);
988            }
989            Ok(last)
990        })
991    }
992}
993
994impl<In: Send + 'static> Sink<In, NotUsed> {
995    #[must_use]
996    pub fn cancelled() -> Self {
997        Sink::from_runner(|_input, _materializer| Ok(NotUsed))
998    }
999
1000    #[must_use]
1001    pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
1002    where
1003        InnerMat: Send + 'static,
1004        F: Fn() -> Fut + Send + Sync + 'static,
1005        Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1006    {
1007        Self::lazy_future_sink(future)
1008    }
1009
1010    #[must_use]
1011    pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1012    where
1013        InnerMat: Send + 'static,
1014        F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
1015    {
1016        let create = Arc::new(create);
1017        Sink::from_runner(move |input, materializer| {
1018            let create = Arc::clone(&create);
1019            let state = Arc::clone(&materializer.inner.state);
1020            let worker_materializer =
1021                materializer.with_name_prefix(materializer.name_prefix().to_owned());
1022            Ok(materializer.spawn_stream(move |cancelled| {
1023                let input = runtime_checked_stream(input, state, Some(cancelled));
1024                run_lazy_sink(input, &worker_materializer, move || {
1025                    catch_unwind_failed("lazy_sink factory", || create())
1026                })
1027            }))
1028        })
1029    }
1030
1031    #[must_use]
1032    pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
1033    where
1034        InnerMat: Send + 'static,
1035        F: Fn() -> Fut + Send + Sync + 'static,
1036        Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
1037    {
1038        let create = Arc::new(create);
1039        Sink::from_runner(move |input, materializer| {
1040            let create = Arc::clone(&create);
1041            let state = Arc::clone(&materializer.inner.state);
1042            let worker_materializer =
1043                materializer.with_name_prefix(materializer.name_prefix().to_owned());
1044            Ok(materializer.spawn_stream(move |cancelled| {
1045                let input = runtime_checked_stream(input, state, Some(cancelled));
1046                run_lazy_sink(input, &worker_materializer, move || {
1047                    catch_unwind_failed("lazy_future_sink factory", || create())
1048                        .and_then(flow::run_future_inline_or_spawn)
1049                })
1050            }))
1051        })
1052    }
1053
1054    #[must_use]
1055    pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1056    where
1057        Acc: Clone + Send + Sync + 'static,
1058        F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
1059    {
1060        let f_arc = Arc::new(f);
1061        let zero_clone = zero.clone();
1062        let f_arc2 = Arc::clone(&f_arc);
1063        let task_runner = {
1064            let zero = zero;
1065            Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1066                run_fold_terminal(
1067                    input,
1068                    materializer,
1069                    cancelled,
1070                    hints,
1071                    zero.clone(),
1072                    f_arc.as_ref(),
1073                )
1074            })
1075        };
1076        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
1077            zero: zero_clone,
1078            f: f_arc2,
1079        });
1080        Sink {
1081            runner: task_runner.runner,
1082            inline_runner: task_runner.inline_runner,
1083            hinted_runner: task_runner.hinted_runner,
1084            raw_hinted_runner: task_runner.raw_hinted_runner,
1085            attributes: task_runner.attributes,
1086            deferred_factory: task_runner.deferred_factory,
1087            fold_fp: Some(fp),
1088        }
1089    }
1090
1091    #[must_use]
1092    pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
1093    where
1094        Acc: Clone + Send + Sync + 'static,
1095        F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1096    {
1097        let f_arc = Arc::new(f);
1098        let zero_clone = zero.clone();
1099        let f_arc2 = Arc::clone(&f_arc);
1100        let task_runner = {
1101            let zero = zero;
1102            Sink::from_raw_terminal_task_runner(move |input, materializer, cancelled, hints| {
1103                run_fold_result_terminal(
1104                    input,
1105                    materializer,
1106                    cancelled,
1107                    hints,
1108                    zero.clone(),
1109                    f_arc.as_ref(),
1110                )
1111            })
1112        };
1113        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
1114            zero: zero_clone,
1115            f: f_arc2,
1116        });
1117        Sink {
1118            runner: task_runner.runner,
1119            inline_runner: task_runner.inline_runner,
1120            hinted_runner: task_runner.hinted_runner,
1121            raw_hinted_runner: task_runner.raw_hinted_runner,
1122            attributes: task_runner.attributes,
1123            deferred_factory: task_runner.deferred_factory,
1124            fold_fp: Some(fp),
1125        }
1126    }
1127
1128    #[must_use]
1129    pub fn fold_result_with_supervision<Acc, F>(
1130        zero: Acc,
1131        f: F,
1132        decider: SupervisionDecider,
1133    ) -> Sink<In, StreamCompletion<Acc>>
1134    where
1135        Acc: Clone + Send + Sync + 'static,
1136        F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
1137    {
1138        Sink::from_task_runner(move |input| {
1139            let mut acc = zero.clone();
1140            for item in input {
1141                let item = item?;
1142                let previous = acc;
1143                match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
1144                    .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
1145                {
1146                    Ok(next) => acc = next,
1147                    Err(error) => match decide_supervision(&decider, &error) {
1148                        SupervisionDirective::Stop => return Err(error),
1149                        SupervisionDirective::Resume => acc = previous,
1150                        SupervisionDirective::Restart => acc = zero.clone(),
1151                    },
1152                }
1153            }
1154            Ok(acc)
1155        })
1156    }
1157}
1158
1159fn run_lazy_sink<In, InnerMat, F>(
1160    mut input: BoxStream<In>,
1161    materializer: &Materializer,
1162    create: F,
1163) -> StreamResult<InnerMat>
1164where
1165    In: Send + 'static,
1166    InnerMat: Send + 'static,
1167    F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
1168{
1169    let first = match input.next() {
1170        Some(Ok(item)) => item,
1171        Some(Err(error)) => return Err(error),
1172        None => {
1173            return Err(StreamError::Failed(
1174                "lazy sink was never materialized".into(),
1175            ));
1176        }
1177    };
1178    let sink = create()?;
1179    sink.run(prepend_first_stream(first, input), materializer)
1180}
1181
1182fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
1183where
1184    In: Send + 'static,
1185{
1186    let mut first = Some(first);
1187    Box::new(std::iter::from_fn(move || {
1188        if let Some(item) = first.take() {
1189            Some(Ok(item))
1190        } else {
1191            rest.next()
1192        }
1193    }))
1194}
1195
1196fn terminal_consumer_status(
1197    materializer: &Materializer,
1198    cancelled: &Arc<AtomicBool>,
1199) -> StreamResult<()> {
1200    if materializer.is_shutdown() {
1201        Err(StreamError::AbruptTermination)
1202    } else if cancelled.load(Ordering::SeqCst) {
1203        Err(StreamError::Cancelled)
1204    } else {
1205        Ok(())
1206    }
1207}
1208
1209fn run_collect_terminal<In: Send + 'static>(
1210    mut input: BoxStream<In>,
1211    materializer: Materializer,
1212    cancelled: Arc<AtomicBool>,
1213    hints: SourceRuntimeHints,
1214) -> StreamResult<Vec<In>> {
1215    if !hints.terminal_consumer_batch {
1216        let input = runtime_checked_stream(
1217            input,
1218            Arc::clone(&materializer.inner.state),
1219            Some(cancelled),
1220        );
1221        return input.collect();
1222    }
1223
1224    let mut items = Vec::with_capacity(hints.inline_micro_max_success_items.unwrap_or(0));
1225    loop {
1226        terminal_consumer_status(&materializer, &cancelled)?;
1227        {
1228            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1229            for _ in 0..TERMINAL_CONSUMER_BATCH {
1230                match input.next() {
1231                    Some(Ok(item)) => items.push(item),
1232                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1233                        Ok(()) => return Err(error),
1234                        Err(status) => return Err(status),
1235                    },
1236                    None => {
1237                        return terminal_consumer_status(&materializer, &cancelled).map(|()| items);
1238                    }
1239                }
1240            }
1241        }
1242    }
1243}
1244
1245fn run_ignore_terminal<In: Send + 'static>(
1246    input: BoxStream<In>,
1247    materializer: Materializer,
1248    cancelled: Arc<AtomicBool>,
1249    hints: SourceRuntimeHints,
1250) -> StreamResult<NotUsed> {
1251    if !hints.terminal_consumer_batch {
1252        let input = runtime_checked_stream(
1253            input,
1254            Arc::clone(&materializer.inner.state),
1255            Some(cancelled),
1256        );
1257        for item in input {
1258            item?;
1259        }
1260        return Ok(NotUsed);
1261    }
1262
1263    let mut input = input;
1264    loop {
1265        terminal_consumer_status(&materializer, &cancelled)?;
1266        {
1267            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1268            for _ in 0..TERMINAL_CONSUMER_BATCH {
1269                match input.next() {
1270                    Some(Ok(_)) => {}
1271                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1272                        Ok(()) => return Err(error),
1273                        Err(status) => return Err(status),
1274                    },
1275                    None => {
1276                        return terminal_consumer_status(&materializer, &cancelled)
1277                            .map(|()| NotUsed);
1278                    }
1279                }
1280            }
1281        }
1282    }
1283}
1284
1285fn run_fold_terminal<In, Acc, F>(
1286    input: BoxStream<In>,
1287    materializer: Materializer,
1288    cancelled: Arc<AtomicBool>,
1289    hints: SourceRuntimeHints,
1290    zero: Acc,
1291    f: &F,
1292) -> StreamResult<Acc>
1293where
1294    In: Send + 'static,
1295    Acc: Send + 'static,
1296    F: Fn(Acc, In) -> Acc,
1297{
1298    if !hints.terminal_consumer_batch {
1299        let input = runtime_checked_stream(
1300            input,
1301            Arc::clone(&materializer.inner.state),
1302            Some(cancelled),
1303        );
1304        let mut acc = zero;
1305        for item in input {
1306            acc = f(acc, item?);
1307        }
1308        return Ok(acc);
1309    }
1310
1311    let mut input = input;
1312    let mut acc = zero;
1313    loop {
1314        terminal_consumer_status(&materializer, &cancelled)?;
1315        {
1316            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1317            for _ in 0..TERMINAL_CONSUMER_BATCH {
1318                match input.next() {
1319                    Some(Ok(item)) => acc = f(acc, item),
1320                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1321                        Ok(()) => return Err(error),
1322                        Err(status) => return Err(status),
1323                    },
1324                    None => {
1325                        return terminal_consumer_status(&materializer, &cancelled).map(|()| acc);
1326                    }
1327                }
1328            }
1329        }
1330    }
1331}
1332
1333fn run_fold_result_terminal<In, Acc, F>(
1334    input: BoxStream<In>,
1335    materializer: Materializer,
1336    cancelled: Arc<AtomicBool>,
1337    hints: SourceRuntimeHints,
1338    zero: Acc,
1339    f: &F,
1340) -> StreamResult<Acc>
1341where
1342    In: Send + 'static,
1343    Acc: Send + 'static,
1344    F: Fn(Acc, In) -> StreamResult<Acc>,
1345{
1346    if !hints.terminal_consumer_batch {
1347        let input = runtime_checked_stream(
1348            input,
1349            Arc::clone(&materializer.inner.state),
1350            Some(cancelled),
1351        );
1352        let mut acc = zero;
1353        for item in input {
1354            acc = f(acc, item?)?;
1355        }
1356        return Ok(acc);
1357    }
1358
1359    let mut input = input;
1360    let mut acc = Some(zero);
1361    loop {
1362        terminal_consumer_status(&materializer, &cancelled)?;
1363        {
1364            let _cancel_scope = set_current_stream_cancelled(&cancelled);
1365            for _ in 0..TERMINAL_CONSUMER_BATCH {
1366                match input.next() {
1367                    Some(Ok(item)) => {
1368                        let previous = acc.take().expect("fold accumulator present");
1369                        match f(previous, item) {
1370                            Ok(next) => acc = Some(next),
1371                            Err(error) => {
1372                                return match terminal_consumer_status(&materializer, &cancelled) {
1373                                    Ok(()) => Err(error),
1374                                    Err(status) => Err(status),
1375                                };
1376                            }
1377                        }
1378                    }
1379                    Some(Err(error)) => match terminal_consumer_status(&materializer, &cancelled) {
1380                        Ok(()) => return Err(error),
1381                        Err(status) => return Err(status),
1382                    },
1383                    None => {
1384                        return terminal_consumer_status(&materializer, &cancelled)
1385                            .map(|()| acc.expect("fold accumulator present"));
1386                    }
1387                }
1388            }
1389        }
1390    }
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395    use super::*;
1396    use crate::{Source, StreamCompletion};
1397    use std::time::Instant;
1398
1399    fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
1400        let deadline = Instant::now() + timeout;
1401        while Instant::now() < deadline {
1402            if condition() {
1403                return true;
1404            }
1405            thread::sleep(Duration::from_millis(5));
1406        }
1407        condition()
1408    }
1409
1410    #[test]
1411    fn map_materialized_value_on_deferred_sink_does_not_explode() {
1412        // Sink::setup sets deferred_factory: Some(_) — the branch that triggered the
1413        // monomorphization cascade on Rust 1.96 (infinite closure-type nesting).
1414        let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1415        // Two nested map_materialized_value calls mirror the failing bench/bin pattern.
1416        // Without the fix each call produced a new instantiation wrapping the previous
1417        // closure type, creating an infinite chain the compiler could not terminate.
1418        let sink = sink
1419            .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1420            .map_materialized_value(|sc| sc);
1421
1422        let sum = Source::from_iter(1u64..=3)
1423            .run_with(sink)
1424            .unwrap()
1425            .wait()
1426            .unwrap();
1427        assert_eq!(sum, 6u64);
1428    }
1429
1430    #[test]
1431    fn batched_terminal_fold_observes_completion_drop_cancellation() {
1432        let materializer = Materializer::new();
1433        let completion = Source::repeat(1_u64)
1434            .run_with_materializer(Sink::fold(0_u64, |acc, item| acc + item), &materializer)
1435            .expect("fold terminal materializes");
1436
1437        assert!(wait_until(Duration::from_secs(1), || {
1438            materializer.active_streams() == 1
1439        }));
1440        drop(completion);
1441
1442        assert!(wait_until(Duration::from_secs(5), || {
1443            materializer.active_streams() == 0
1444        }));
1445    }
1446}