Skip to main content

datum/stream/
sink.rs

1use super::flow;
2use super::*;
3use crate::Attributes;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5
6/// A materialized child of `Sink::combine`: the feed channel plus the child's
7/// materialized value. The mat is held (type-erased) until the parent runner
8/// finishes — dropping it early would trip `StreamCompletion`'s
9/// cancel-on-drop and silently cancel the child before it consumes anything.
10type CombinedSinkChild<In> = (
11    std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
12    Box<dyn std::any::Any + Send>,
13);
14
15type CombinedSinkRunner<In> =
16    dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
17type DeferredSinkFactory<In, Mat> =
18    dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
19
20enum CombinedSinkMessage<In> {
21    Item(StreamResult<In>),
22    Flush(std::sync::mpsc::SyncSender<()>),
23    Close,
24}
25
26pub struct Sink<In, Mat> {
27    runner: Arc<SinkRunner<In, Mat>>,
28    inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
29    attributes: Attributes,
30    deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
31    /// Set only for recognised fold/collect/ignore sinks. `None` for all other sinks.
32    pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
33}
34
35/// Non-generic helper for `Sink::map_materialized_value` — accepts an erased function so
36/// that only one monomorphization exists per `(In, Mat, NextMat)` triple. This prevents
37/// the deferred-factory recursion from producing an ever-growing closure type chain.
38fn map_mat_dyn<In, Mat, NextMat>(
39    sink: Sink<In, Mat>,
40    f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
41) -> Sink<In, NextMat>
42where
43    In: Send + 'static,
44    Mat: Send + 'static,
45    NextMat: Send + 'static,
46{
47    let Sink {
48        runner,
49        inline_runner,
50        attributes,
51        deferred_factory,
52        fold_fp: _,
53    } = sink;
54    let mapped_runner = {
55        let f = Arc::clone(&f);
56        Arc::new(move |input, materializer: &Materializer| {
57            let mat = runner(input, materializer)?;
58            Ok(f(mat))
59        }) as Arc<SinkRunner<In, NextMat>>
60    };
61    let mapped_inline_runner = inline_runner.map(|ir| {
62        let f = Arc::clone(&f);
63        Arc::new(move |input, materializer: &Materializer| {
64            let result = ir(input, materializer)?;
65            Ok(f(result))
66        }) as Arc<SinkRunner<In, NextMat>>
67    });
68    let mapped_factory = deferred_factory.map(|factory| {
69        let f = Arc::clone(&f);
70        Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
71            map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
72        }) as Arc<DeferredSinkFactory<In, NextMat>>
73    });
74    Sink {
75        runner: mapped_runner,
76        inline_runner: mapped_inline_runner,
77        attributes,
78        deferred_factory: mapped_factory,
79        fold_fp: None,
80    }
81}
82
83impl<In, Mat> Clone for Sink<In, Mat> {
84    fn clone(&self) -> Self {
85        Self {
86            runner: Arc::clone(&self.runner),
87            inline_runner: self.inline_runner.as_ref().map(Arc::clone),
88            attributes: self.attributes.clone(),
89            deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
90            fold_fp: self.fold_fp.as_ref().map(Arc::clone),
91        }
92    }
93}
94
95impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
96    pub(crate) fn from_runner<F>(runner: F) -> Self
97    where
98        F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
99    {
100        Self::from_runner_parts(Arc::new(runner), None)
101    }
102
103    pub(crate) fn from_runner_parts(
104        runner: Arc<SinkRunner<In, Mat>>,
105        inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
106    ) -> Self {
107        Self {
108            runner,
109            inline_runner,
110            attributes: Attributes::default(),
111            deferred_factory: None,
112            fold_fp: None,
113        }
114    }
115
116    pub(super) fn run(
117        &self,
118        input: BoxStream<In>,
119        materializer: &Materializer,
120    ) -> StreamResult<Mat> {
121        if let Some(factory) = &self.deferred_factory {
122            let attrs = materializer.effective_attributes(&self.attributes);
123            return factory(materializer, &attrs).run(input, materializer);
124        }
125        (self.runner)(input, materializer)
126    }
127
128    pub(super) fn can_inline(&self) -> bool {
129        self.inline_runner.is_some()
130    }
131
132    pub(super) fn run_inline(
133        &self,
134        input: BoxStream<In>,
135        materializer: &Materializer,
136    ) -> StreamResult<Mat> {
137        if let Some(factory) = &self.deferred_factory {
138            let attrs = materializer.effective_attributes(&self.attributes);
139            return factory(materializer, &attrs).run_inline(input, materializer);
140        }
141        (self
142            .inline_runner
143            .as_ref()
144            .expect("inline sink runner exists"))(input, materializer)
145    }
146
147    pub fn run_with<SourceMat: Send + 'static>(
148        self,
149        source: Source<In, SourceMat>,
150    ) -> StreamResult<SourceMat> {
151        source.to(self).run()
152    }
153
154    pub fn run_with_materializer<SourceMat: Send + 'static>(
155        self,
156        source: Source<In, SourceMat>,
157        materializer: &Materializer,
158    ) -> StreamResult<SourceMat> {
159        source.to(self).run_with_materializer(materializer)
160    }
161
162    #[must_use]
163    pub fn from_materializer<F>(factory: F) -> Self
164    where
165        F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
166    {
167        let factory = Arc::new(factory);
168        Self {
169            runner: Arc::new(|_input, _materializer| {
170                Err(StreamError::Failed(
171                    "deferred sink factory must be driven through Sink::run".into(),
172                ))
173            }),
174            inline_runner: None,
175            attributes: Attributes::default(),
176            deferred_factory: Some(factory),
177            fold_fp: None,
178        }
179    }
180
181    #[must_use]
182    pub fn setup<F>(factory: F) -> Self
183    where
184        F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
185    {
186        Self::from_materializer(factory)
187    }
188
189    pub fn pre_materialize(
190        &self,
191        materializer: &Materializer,
192    ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
193        let (sender, receiver) = std::sync::mpsc::sync_channel(1);
194        let materialized = self.clone().run(
195            Box::new(std::iter::from_fn(move || receiver.recv().ok())),
196            materializer,
197        )?;
198        let sender = Arc::new(Mutex::new(Some(sender)));
199        let sink = Sink::from_runner(move |input, _materializer| {
200            let Some(sender) = sender
201                .lock()
202                .expect("pre-materialized sink poisoned")
203                .take()
204            else {
205                return Err(StreamError::Failed(
206                    "pre-materialized sink has already been materialized".into(),
207                ));
208            };
209            for item in input {
210                if sender.send(item).is_err() {
211                    break;
212                }
213            }
214            Ok(NotUsed)
215        });
216        Ok((materialized, sink.with_attributes(self.attributes.clone())))
217    }
218
219    #[must_use]
220    pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
221    where
222        NextMat: Send + 'static,
223        F: Fn(Mat) -> NextMat + Send + Sync + 'static,
224    {
225        // Delegate to a non-generic helper that accepts `Arc<dyn Fn>`. This breaks
226        // the infinite monomorphization chain that Rust 1.96 rejects: the old inline
227        // implementation called `map_materialized_value` inside the deferred-factory
228        // closure, which produced a new instantiation for a strictly deeper closure type
229        // at each level (depth > 1024 triggered the recursion limit).
230        map_mat_dyn(self, Arc::new(f))
231    }
232
233    #[must_use]
234    pub fn attributes(&self) -> &Attributes {
235        &self.attributes
236    }
237
238    #[must_use]
239    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
240        self.attributes = attributes;
241        self
242    }
243
244    #[must_use]
245    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
246        self.attributes = self.attributes.and(attributes);
247        self
248    }
249
250    #[must_use]
251    pub fn named(self, name: impl Into<String>) -> Self {
252        self.add_attributes(Attributes::named(name))
253    }
254}
255
256#[derive(Clone)]
257pub struct RunnableGraph<Mat> {
258    pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
259    attributes: Attributes,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum SinkCombineStrategy {
264    Broadcast,
265    Balance,
266}
267
268impl<Mat: Send + 'static> RunnableGraph<Mat> {
269    pub(super) fn from_runner<F>(runner: F) -> Self
270    where
271        F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
272    {
273        Self {
274            runner: Arc::new(runner),
275            attributes: Attributes::default(),
276        }
277    }
278
279    pub fn run(&self) -> StreamResult<Mat> {
280        Materializer::new().materialize(self)
281    }
282
283    pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
284        materializer.materialize(self)
285    }
286
287    #[must_use]
288    pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
289    where
290        Next: Send + 'static,
291        F: Fn(Mat) -> Next + Send + Sync + 'static,
292    {
293        let f = Arc::new(f);
294        RunnableGraph::from_runner(move |materializer| {
295            let mat = (self.runner)(materializer)?;
296            Ok(f(mat))
297        })
298    }
299
300    #[must_use]
301    pub fn attributes(&self) -> &Attributes {
302        &self.attributes
303    }
304
305    #[must_use]
306    pub fn with_attributes(mut self, attributes: Attributes) -> Self {
307        self.attributes = attributes;
308        self
309    }
310
311    #[must_use]
312    pub fn add_attributes(mut self, attributes: Attributes) -> Self {
313        self.attributes = self.attributes.and(attributes);
314        self
315    }
316
317    #[must_use]
318    pub fn named(self, name: impl Into<String>) -> Self {
319        self.add_attributes(Attributes::named(name))
320    }
321}
322
323impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
324    #[must_use]
325    pub fn combine<M1, M2, MRest, I>(
326        first: Sink<In, M1>,
327        second: Sink<In, M2>,
328        rest: I,
329        strategy: SinkCombineStrategy,
330    ) -> Sink<In, NotUsed>
331    where
332        M1: Send + 'static,
333        M2: Send + 'static,
334        MRest: Send + 'static,
335        I: IntoIterator<Item = Sink<In, MRest>>,
336    {
337        let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
338            Arc::new(move |materializer| {
339                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
340                let mat = first.run(
341                    Box::new(std::iter::from_fn(move || {
342                        loop {
343                            match receiver.recv().ok()? {
344                                CombinedSinkMessage::Item(item) => return Some(item),
345                                CombinedSinkMessage::Flush(ack) => {
346                                    let _ = ack.send(());
347                                }
348                                CombinedSinkMessage::Close => return None,
349                            }
350                        }
351                    })),
352                    materializer,
353                )?;
354                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
355            }),
356            Arc::new(move |materializer| {
357                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
358                let mat = second.run(
359                    Box::new(std::iter::from_fn(move || {
360                        loop {
361                            match receiver.recv().ok()? {
362                                CombinedSinkMessage::Item(item) => return Some(item),
363                                CombinedSinkMessage::Flush(ack) => {
364                                    let _ = ack.send(());
365                                }
366                                CombinedSinkMessage::Close => return None,
367                            }
368                        }
369                    })),
370                    materializer,
371                )?;
372                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
373            }),
374        ];
375        runners.extend(rest.into_iter().map(|sink| {
376            Arc::new(move |materializer: &Materializer| {
377                let (sender, receiver) = std::sync::mpsc::sync_channel(0);
378                let mat = sink.run(
379                    Box::new(std::iter::from_fn(move || {
380                        loop {
381                            match receiver.recv().ok()? {
382                                CombinedSinkMessage::Item(item) => return Some(item),
383                                CombinedSinkMessage::Flush(ack) => {
384                                    let _ = ack.send(());
385                                }
386                                CombinedSinkMessage::Close => return None,
387                            }
388                        }
389                    })),
390                    materializer,
391                )?;
392                Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
393            }) as Arc<CombinedSinkRunner<In>>
394        }));
395
396        Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
397            // Each child's materialized value is held until the feed loop is
398            // done: dropping it would trip cancel-on-drop and kill the child.
399            let mut children = runners
400                .iter()
401                .map(|runner| runner(materializer))
402                .collect::<StreamResult<Vec<_>>>()?;
403            let mut next = 0usize;
404            for item in input.by_ref() {
405                match item {
406                    Ok(value) => match strategy {
407                        SinkCombineStrategy::Broadcast => {
408                            children.retain(|(sender, _)| {
409                                sender
410                                    .send(CombinedSinkMessage::Item(Ok(value.clone())))
411                                    .is_ok()
412                            });
413                            if children.is_empty() {
414                                break;
415                            }
416                        }
417                        SinkCombineStrategy::Balance => {
418                            while !children.is_empty() {
419                                let index = next % children.len();
420                                next = next.wrapping_add(1);
421                                match children[index]
422                                    .0
423                                    .send(CombinedSinkMessage::Item(Ok(value.clone())))
424                                {
425                                    Ok(()) => break,
426                                    Err(_) => {
427                                        children.remove(index);
428                                    }
429                                }
430                            }
431                            if children.is_empty() {
432                                break;
433                            }
434                        }
435                    },
436                    Err(error) => {
437                        for (sender, _) in &children {
438                            let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
439                        }
440                        return Err(error);
441                    }
442                }
443            }
444            // Wait for each child to request the next element after the final
445            // handoff so terminal side effects have definitely run before we
446            // release its materialized value.
447            for (sender, _) in &children {
448                let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
449                if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
450                    let _ = ack_receiver.recv();
451                }
452            }
453            // Then close every feed channel so each child observes end-of-stream
454            // and only afterwards release the held mats.
455            let mats: Vec<_> = children
456                .into_iter()
457                .map(|(sender, mat)| {
458                    let _ = sender.send(CombinedSinkMessage::Close);
459                    mat
460                })
461                .collect();
462            drop(mats);
463            Ok(NotUsed)
464        })
465    }
466}
467
468impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
469    fn from_task_runner<F>(runner: F) -> Self
470    where
471        F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
472    {
473        Self::from_task_runner_with_inline(runner, false)
474    }
475
476    fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
477    where
478        F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
479    {
480        let runner = Arc::new(runner);
481        let async_runner = {
482            let runner = Arc::clone(&runner);
483            Arc::new(move |input, materializer: &Materializer| {
484                let runner = Arc::clone(&runner);
485                let state = Arc::clone(&materializer.inner.state);
486                Ok(materializer.spawn_stream(move |cancelled| {
487                    runner(runtime_checked_stream(input, state, Some(cancelled)))
488                }))
489            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
490        };
491        let inline_runner = inline.then(|| {
492            let runner = Arc::clone(&runner);
493            Arc::new(move |input, materializer: &Materializer| {
494                let runner = Arc::clone(&runner);
495                let state = Arc::clone(&materializer.inner.state);
496                Ok(materializer.spawn_stream_inline(move |cancelled| {
497                    runner(runtime_checked_stream(input, state, Some(cancelled)))
498                }))
499            }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
500        });
501        Sink::from_runner_parts(async_runner, inline_runner)
502    }
503}
504
505impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
506    #[must_use]
507    pub fn collect() -> Self {
508        let task_runner = Sink::from_task_runner(|input| input.collect());
509        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
510            _phantom: std::marker::PhantomData,
511        });
512        Sink {
513            runner: task_runner.runner,
514            inline_runner: task_runner.inline_runner,
515            attributes: task_runner.attributes,
516            deferred_factory: task_runner.deferred_factory,
517            fold_fp: Some(fp),
518        }
519    }
520
521    #[must_use]
522    pub fn collection() -> Self {
523        Self::collect()
524    }
525
526    #[must_use]
527    pub fn take_last(n: usize) -> Self {
528        Sink::from_task_runner(move |input| {
529            if n == 0 {
530                for item in input {
531                    let _ = item?;
532                }
533                return Ok(Vec::new());
534            }
535            let mut buffer = VecDeque::with_capacity(n);
536            for item in input {
537                let item = item?;
538                if buffer.len() == n {
539                    buffer.pop_front();
540                }
541                buffer.push_back(item);
542            }
543            Ok(buffer.into_iter().collect())
544        })
545    }
546}
547
548impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
549    #[must_use]
550    pub fn ignore() -> Self {
551        let task_runner = Sink::from_runner(|input, materializer| {
552            let state = Arc::clone(&materializer.inner.state);
553            Ok(materializer.spawn_stream(move |cancelled| {
554                let input = runtime_checked_stream(input, state, Some(cancelled));
555                for item in input {
556                    item?;
557                }
558                Ok(NotUsed)
559            }))
560        });
561        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
562            _phantom: std::marker::PhantomData,
563        });
564        Sink {
565            runner: task_runner.runner,
566            inline_runner: task_runner.inline_runner,
567            attributes: task_runner.attributes,
568            deferred_factory: task_runner.deferred_factory,
569            fold_fp: Some(fp),
570        }
571    }
572
573    #[must_use]
574    pub fn on_complete<F>(callback: F) -> Self
575    where
576        F: FnOnce() + Send + Sync + 'static,
577    {
578        let callback = Arc::new(Mutex::new(Some(callback)));
579        Sink::from_task_runner(move |input| {
580            for item in input {
581                item?;
582            }
583            if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
584                cb();
585            }
586            Ok(NotUsed)
587        })
588    }
589
590    #[must_use]
591    pub fn never() -> Self {
592        Sink::from_runner(|input, materializer| {
593            let state = Arc::clone(&materializer.inner.state);
594            let shutdown_state = Arc::clone(&state);
595            Ok(materializer.spawn_stream(move |cancelled| {
596                let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
597                for item in input {
598                    item?;
599                }
600                loop {
601                    if shutdown_state.shutdown.load(Ordering::SeqCst) {
602                        return Err(StreamError::AbruptTermination);
603                    }
604                    if cancelled.load(Ordering::SeqCst) {
605                        return Err(StreamError::Cancelled);
606                    }
607                    thread::sleep(Duration::from_millis(1));
608                }
609            }))
610        })
611    }
612
613    #[must_use]
614    pub fn foreach<F>(f: F) -> Self
615    where
616        F: Fn(In) + Send + Sync + 'static,
617    {
618        Sink::from_task_runner(move |input| {
619            for item in input {
620                f(item?);
621            }
622            Ok(NotUsed)
623        })
624    }
625
626    #[must_use]
627    pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
628    where
629        F: Fn(In) -> Fut + Send + Sync + 'static,
630        Fut: Future<Output = StreamResult<()>> + Send + 'static,
631    {
632        Flow::identity()
633            .map_async_unordered(parallelism, f)
634            .to_mat(Sink::ignore(), Keep::right)
635    }
636
637    #[must_use]
638    pub fn foreach_result<F>(f: F) -> Self
639    where
640        F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
641    {
642        Sink::from_task_runner(move |input| {
643            for item in input {
644                f(item?)?;
645            }
646            Ok(NotUsed)
647        })
648    }
649
650    #[must_use]
651    pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
652    where
653        F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
654    {
655        Sink::from_task_runner(move |input| {
656            for item in input {
657                let item = item?;
658                match catch_unwind(AssertUnwindSafe(|| f(item)))
659                    .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
660                {
661                    Ok(()) => {}
662                    Err(error) => match decide_supervision(&decider, &error) {
663                        SupervisionDirective::Stop => return Err(error),
664                        SupervisionDirective::Resume | SupervisionDirective::Restart => {}
665                    },
666                }
667            }
668            Ok(NotUsed)
669        })
670    }
671}
672
673impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
674    /// Materializes a sink that completes with the stream's first element, or
675    /// fails with [`StreamError::EmptyStream`] if the stream is empty.
676    ///
677    /// Fed by a synchronous bounded eager source (`Source::single`/`from_iter`/
678    /// `empty`/`failed`, optionally through inline-preserving synchronous flows
679    /// such as `map`/`filter`/`identity`), this terminal takes the inline head
680    /// fast path: the first element is produced on the calling thread *during*
681    /// materialization, without spawning a worker or a oneshot channel. As a
682    /// result `run_with_materializer()` blocks until that element is available and
683    /// the returned [`StreamCompletion`] is already resolved, so any work a caller
684    /// expected to overlap between `run_with_materializer()` returning and
685    /// `.wait()` is already done for these sources. Other sources — and any chain
686    /// that passes through a non-preserving operator, including `ActorFlow::ask`
687    /// (whose blocking cross-thread reply wait must not run inline on the caller) —
688    /// still drain on a runtime worker as before, keeping the returned
689    /// [`StreamCompletion`] non-blocking/awaitable.
690    #[must_use]
691    pub fn head() -> Self {
692        Sink::from_task_runner_with_inline(
693            |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
694            true,
695        )
696    }
697
698    #[must_use]
699    pub fn last() -> Self {
700        Sink::from_task_runner(|input| {
701            let mut last = None;
702            for item in input {
703                last = Some(item?);
704            }
705            last.ok_or(StreamError::EmptyStream)
706        })
707    }
708
709    #[must_use]
710    pub fn reduce<F>(f: F) -> Self
711    where
712        F: Fn(In, In) -> In + Send + Sync + 'static,
713    {
714        Sink::from_task_runner(move |mut input| {
715            let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
716            for item in input {
717                acc = f(acc, item?);
718            }
719            Ok(acc)
720        })
721    }
722
723    #[must_use]
724    pub fn reduce_result<F>(f: F) -> Self
725    where
726        F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
727    {
728        Sink::from_task_runner(move |mut input| {
729            let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
730            for item in input {
731                acc = f(acc, item?)?;
732            }
733            Ok(acc)
734        })
735    }
736
737    #[must_use]
738    pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
739    where
740        In: Clone,
741        F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
742    {
743        Sink::from_task_runner(move |mut input: BoxStream<In>| {
744            let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
745            for item in input {
746                let item = item?;
747                let Some(previous) = acc.take() else {
748                    acc = Some(item);
749                    continue;
750                };
751                match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
752                    .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
753                {
754                    Ok(next) => acc = Some(next),
755                    Err(error) => match decide_supervision(&decider, &error) {
756                        SupervisionDirective::Stop => return Err(error),
757                        SupervisionDirective::Resume => acc = Some(previous),
758                        SupervisionDirective::Restart => acc = None,
759                    },
760                }
761            }
762            acc.ok_or(StreamError::EmptyStream)
763        })
764    }
765}
766
767impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
768    /// Materializes a sink that completes with `Some(first element)`, or `None`
769    /// if the stream is empty.
770    ///
771    /// Like [`Sink::head`], fed by a synchronous bounded eager source
772    /// (`Source::single`/`from_iter`/`empty`/`failed`, optionally through
773    /// inline-preserving synchronous flows such as `map`/`filter`/`identity`) this
774    /// terminal takes the inline head fast path: the result is produced on the
775    /// calling thread *during* materialization without spawning a worker or a
776    /// oneshot channel, so `run_with_materializer()` blocks until it is available
777    /// and the returned [`StreamCompletion`] is already resolved. Any work a caller
778    /// expected to overlap between `run_with_materializer()` returning and
779    /// `.wait()` is already done for these sources; other sources — and any chain
780    /// through a non-preserving operator, including `ActorFlow::ask` (whose
781    /// blocking cross-thread reply wait must not run inline) — still drain on a
782    /// runtime worker, keeping the returned [`StreamCompletion`]
783    /// non-blocking/awaitable.
784    #[must_use]
785    pub fn head_option() -> Self {
786        Sink::from_task_runner_with_inline(
787            |mut input| match input.next() {
788                Some(Ok(item)) => Ok(Some(item)),
789                Some(Err(error)) => Err(error),
790                None => Ok(None),
791            },
792            true,
793        )
794    }
795
796    #[must_use]
797    pub fn last_option() -> Self {
798        Sink::from_task_runner(|input| {
799            let mut last = None;
800            for item in input {
801                last = Some(item?);
802            }
803            Ok(last)
804        })
805    }
806}
807
808impl<In: Send + 'static> Sink<In, NotUsed> {
809    #[must_use]
810    pub fn cancelled() -> Self {
811        Sink::from_runner(|_input, _materializer| Ok(NotUsed))
812    }
813
814    #[must_use]
815    pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
816    where
817        InnerMat: Send + 'static,
818        F: Fn() -> Fut + Send + Sync + 'static,
819        Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
820    {
821        Self::lazy_future_sink(future)
822    }
823
824    #[must_use]
825    pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
826    where
827        InnerMat: Send + 'static,
828        F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
829    {
830        let create = Arc::new(create);
831        Sink::from_runner(move |input, materializer| {
832            let create = Arc::clone(&create);
833            let state = Arc::clone(&materializer.inner.state);
834            let worker_materializer =
835                materializer.with_name_prefix(materializer.name_prefix().to_owned());
836            Ok(materializer.spawn_stream(move |cancelled| {
837                let input = runtime_checked_stream(input, state, Some(cancelled));
838                run_lazy_sink(input, &worker_materializer, move || {
839                    catch_unwind_failed("lazy_sink factory", || create())
840                })
841            }))
842        })
843    }
844
845    #[must_use]
846    pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
847    where
848        InnerMat: Send + 'static,
849        F: Fn() -> Fut + Send + Sync + 'static,
850        Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
851    {
852        let create = Arc::new(create);
853        Sink::from_runner(move |input, materializer| {
854            let create = Arc::clone(&create);
855            let state = Arc::clone(&materializer.inner.state);
856            let worker_materializer =
857                materializer.with_name_prefix(materializer.name_prefix().to_owned());
858            Ok(materializer.spawn_stream(move |cancelled| {
859                let input = runtime_checked_stream(input, state, Some(cancelled));
860                run_lazy_sink(input, &worker_materializer, move || {
861                    catch_unwind_failed("lazy_future_sink factory", || create())
862                        .and_then(flow::run_future_inline_or_spawn)
863                })
864            }))
865        })
866    }
867
868    #[must_use]
869    pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
870    where
871        Acc: Clone + Send + Sync + 'static,
872        F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
873    {
874        let f_arc = Arc::new(f);
875        let zero_clone = zero.clone();
876        let f_arc2 = Arc::clone(&f_arc);
877        let task_runner = {
878            let zero = zero;
879            Sink::from_task_runner(move |input| {
880                let mut acc = zero.clone();
881                for item in input {
882                    acc = f_arc(acc, item?);
883                }
884                Ok(acc)
885            })
886        };
887        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
888            zero: zero_clone,
889            f: f_arc2,
890        });
891        Sink {
892            runner: task_runner.runner,
893            inline_runner: task_runner.inline_runner,
894            attributes: task_runner.attributes,
895            deferred_factory: task_runner.deferred_factory,
896            fold_fp: Some(fp),
897        }
898    }
899
900    #[must_use]
901    pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
902    where
903        Acc: Clone + Send + Sync + 'static,
904        F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
905    {
906        let f_arc = Arc::new(f);
907        let zero_clone = zero.clone();
908        let f_arc2 = Arc::clone(&f_arc);
909        let task_runner = {
910            let zero = zero;
911            Sink::from_task_runner(move |input| {
912                let mut acc = zero.clone();
913                for item in input {
914                    acc = f_arc(acc, item?)?;
915                }
916                Ok(acc)
917            })
918        };
919        let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
920            zero: zero_clone,
921            f: f_arc2,
922        });
923        Sink {
924            runner: task_runner.runner,
925            inline_runner: task_runner.inline_runner,
926            attributes: task_runner.attributes,
927            deferred_factory: task_runner.deferred_factory,
928            fold_fp: Some(fp),
929        }
930    }
931
932    #[must_use]
933    pub fn fold_result_with_supervision<Acc, F>(
934        zero: Acc,
935        f: F,
936        decider: SupervisionDecider,
937    ) -> Sink<In, StreamCompletion<Acc>>
938    where
939        Acc: Clone + Send + Sync + 'static,
940        F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
941    {
942        Sink::from_task_runner(move |input| {
943            let mut acc = zero.clone();
944            for item in input {
945                let item = item?;
946                let previous = acc;
947                match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
948                    .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
949                {
950                    Ok(next) => acc = next,
951                    Err(error) => match decide_supervision(&decider, &error) {
952                        SupervisionDirective::Stop => return Err(error),
953                        SupervisionDirective::Resume => acc = previous,
954                        SupervisionDirective::Restart => acc = zero.clone(),
955                    },
956                }
957            }
958            Ok(acc)
959        })
960    }
961}
962
963fn run_lazy_sink<In, InnerMat, F>(
964    mut input: BoxStream<In>,
965    materializer: &Materializer,
966    create: F,
967) -> StreamResult<InnerMat>
968where
969    In: Send + 'static,
970    InnerMat: Send + 'static,
971    F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
972{
973    let first = match input.next() {
974        Some(Ok(item)) => item,
975        Some(Err(error)) => return Err(error),
976        None => {
977            return Err(StreamError::Failed(
978                "lazy sink was never materialized".into(),
979            ));
980        }
981    };
982    let sink = create()?;
983    sink.run(prepend_first_stream(first, input), materializer)
984}
985
986fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
987where
988    In: Send + 'static,
989{
990    let mut first = Some(first);
991    Box::new(std::iter::from_fn(move || {
992        if let Some(item) = first.take() {
993            Some(Ok(item))
994        } else {
995            rest.next()
996        }
997    }))
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use super::*;
1003    use crate::{Source, StreamCompletion};
1004
1005    #[test]
1006    fn map_materialized_value_on_deferred_sink_does_not_explode() {
1007        // Sink::setup sets deferred_factory: Some(_) — the branch that triggered the
1008        // monomorphization cascade on Rust 1.96 (infinite closure-type nesting).
1009        let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1010        // Two nested map_materialized_value calls mirror the failing bench/bin pattern.
1011        // Without the fix each call produced a new instantiation wrapping the previous
1012        // closure type, creating an infinite chain the compiler could not terminate.
1013        let sink = sink
1014            .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1015            .map_materialized_value(|sc| sc);
1016
1017        let sum = Source::from_iter(1u64..=3)
1018            .run_with(sink)
1019            .unwrap()
1020            .wait()
1021            .unwrap();
1022        assert_eq!(sum, 6u64);
1023    }
1024}