1use super::flow;
2use super::*;
3use crate::Attributes;
4use crate::stream::error::{decide_supervision, panic_stream_error};
5
6type CombinedSinkChild<In> = (
11 std::sync::mpsc::SyncSender<CombinedSinkMessage<In>>,
12 Box<dyn std::any::Any + Send>,
13);
14
15type CombinedSinkRunner<In> =
16 dyn Fn(&Materializer) -> StreamResult<CombinedSinkChild<In>> + Send + Sync;
17type DeferredSinkFactory<In, Mat> =
18 dyn Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync;
19
20enum CombinedSinkMessage<In> {
21 Item(StreamResult<In>),
22 Flush(std::sync::mpsc::SyncSender<()>),
23 Close,
24}
25
26pub struct Sink<In, Mat> {
27 runner: Arc<SinkRunner<In, Mat>>,
28 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
29 attributes: Attributes,
30 deferred_factory: Option<Arc<DeferredSinkFactory<In, Mat>>>,
31 pub(crate) fold_fp: Option<Arc<dyn FoldFastPathDyn<In>>>,
33}
34
35fn map_mat_dyn<In, Mat, NextMat>(
39 sink: Sink<In, Mat>,
40 f: Arc<dyn Fn(Mat) -> NextMat + Send + Sync + 'static>,
41) -> Sink<In, NextMat>
42where
43 In: Send + 'static,
44 Mat: Send + 'static,
45 NextMat: Send + 'static,
46{
47 let Sink {
48 runner,
49 inline_runner,
50 attributes,
51 deferred_factory,
52 fold_fp: _,
53 } = sink;
54 let mapped_runner = {
55 let f = Arc::clone(&f);
56 Arc::new(move |input, materializer: &Materializer| {
57 let mat = runner(input, materializer)?;
58 Ok(f(mat))
59 }) as Arc<SinkRunner<In, NextMat>>
60 };
61 let mapped_inline_runner = inline_runner.map(|ir| {
62 let f = Arc::clone(&f);
63 Arc::new(move |input, materializer: &Materializer| {
64 let result = ir(input, materializer)?;
65 Ok(f(result))
66 }) as Arc<SinkRunner<In, NextMat>>
67 });
68 let mapped_factory = deferred_factory.map(|factory| {
69 let f = Arc::clone(&f);
70 Arc::new(move |materializer: &Materializer, attrs: &Attributes| {
71 map_mat_dyn(factory(materializer, attrs), Arc::clone(&f))
72 }) as Arc<DeferredSinkFactory<In, NextMat>>
73 });
74 Sink {
75 runner: mapped_runner,
76 inline_runner: mapped_inline_runner,
77 attributes,
78 deferred_factory: mapped_factory,
79 fold_fp: None,
80 }
81}
82
83impl<In, Mat> Clone for Sink<In, Mat> {
84 fn clone(&self) -> Self {
85 Self {
86 runner: Arc::clone(&self.runner),
87 inline_runner: self.inline_runner.as_ref().map(Arc::clone),
88 attributes: self.attributes.clone(),
89 deferred_factory: self.deferred_factory.as_ref().map(Arc::clone),
90 fold_fp: self.fold_fp.as_ref().map(Arc::clone),
91 }
92 }
93}
94
95impl<In: Send + 'static, Mat: Send + 'static> Sink<In, Mat> {
96 pub(crate) fn from_runner<F>(runner: F) -> Self
97 where
98 F: Fn(BoxStream<In>, &Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
99 {
100 Self::from_runner_parts(Arc::new(runner), None)
101 }
102
103 pub(crate) fn from_runner_parts(
104 runner: Arc<SinkRunner<In, Mat>>,
105 inline_runner: Option<Arc<SinkRunner<In, Mat>>>,
106 ) -> Self {
107 Self {
108 runner,
109 inline_runner,
110 attributes: Attributes::default(),
111 deferred_factory: None,
112 fold_fp: None,
113 }
114 }
115
116 pub(super) fn run(
117 &self,
118 input: BoxStream<In>,
119 materializer: &Materializer,
120 ) -> StreamResult<Mat> {
121 if let Some(factory) = &self.deferred_factory {
122 let attrs = materializer.effective_attributes(&self.attributes);
123 return factory(materializer, &attrs).run(input, materializer);
124 }
125 (self.runner)(input, materializer)
126 }
127
128 pub(super) fn can_inline(&self) -> bool {
129 self.inline_runner.is_some()
130 }
131
132 pub(super) fn run_inline(
133 &self,
134 input: BoxStream<In>,
135 materializer: &Materializer,
136 ) -> StreamResult<Mat> {
137 if let Some(factory) = &self.deferred_factory {
138 let attrs = materializer.effective_attributes(&self.attributes);
139 return factory(materializer, &attrs).run_inline(input, materializer);
140 }
141 (self
142 .inline_runner
143 .as_ref()
144 .expect("inline sink runner exists"))(input, materializer)
145 }
146
147 pub fn run_with<SourceMat: Send + 'static>(
148 self,
149 source: Source<In, SourceMat>,
150 ) -> StreamResult<SourceMat> {
151 source.to(self).run()
152 }
153
154 pub fn run_with_materializer<SourceMat: Send + 'static>(
155 self,
156 source: Source<In, SourceMat>,
157 materializer: &Materializer,
158 ) -> StreamResult<SourceMat> {
159 source.to(self).run_with_materializer(materializer)
160 }
161
162 #[must_use]
163 pub fn from_materializer<F>(factory: F) -> Self
164 where
165 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
166 {
167 let factory = Arc::new(factory);
168 Self {
169 runner: Arc::new(|_input, _materializer| {
170 Err(StreamError::Failed(
171 "deferred sink factory must be driven through Sink::run".into(),
172 ))
173 }),
174 inline_runner: None,
175 attributes: Attributes::default(),
176 deferred_factory: Some(factory),
177 fold_fp: None,
178 }
179 }
180
181 #[must_use]
182 pub fn setup<F>(factory: F) -> Self
183 where
184 F: Fn(&Materializer, &Attributes) -> Sink<In, Mat> + Send + Sync + 'static,
185 {
186 Self::from_materializer(factory)
187 }
188
189 pub fn pre_materialize(
190 &self,
191 materializer: &Materializer,
192 ) -> StreamResult<(Mat, Sink<In, NotUsed>)> {
193 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
194 let materialized = self.clone().run(
195 Box::new(std::iter::from_fn(move || receiver.recv().ok())),
196 materializer,
197 )?;
198 let sender = Arc::new(Mutex::new(Some(sender)));
199 let sink = Sink::from_runner(move |input, _materializer| {
200 let Some(sender) = sender
201 .lock()
202 .expect("pre-materialized sink poisoned")
203 .take()
204 else {
205 return Err(StreamError::Failed(
206 "pre-materialized sink has already been materialized".into(),
207 ));
208 };
209 for item in input {
210 if sender.send(item).is_err() {
211 break;
212 }
213 }
214 Ok(NotUsed)
215 });
216 Ok((materialized, sink.with_attributes(self.attributes.clone())))
217 }
218
219 #[must_use]
220 pub fn map_materialized_value<NextMat, F>(self, f: F) -> Sink<In, NextMat>
221 where
222 NextMat: Send + 'static,
223 F: Fn(Mat) -> NextMat + Send + Sync + 'static,
224 {
225 map_mat_dyn(self, Arc::new(f))
231 }
232
233 #[must_use]
234 pub fn attributes(&self) -> &Attributes {
235 &self.attributes
236 }
237
238 #[must_use]
239 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
240 self.attributes = attributes;
241 self
242 }
243
244 #[must_use]
245 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
246 self.attributes = self.attributes.and(attributes);
247 self
248 }
249
250 #[must_use]
251 pub fn named(self, name: impl Into<String>) -> Self {
252 self.add_attributes(Attributes::named(name))
253 }
254}
255
256#[derive(Clone)]
257pub struct RunnableGraph<Mat> {
258 pub(super) runner: Arc<RunnableGraphRunner<Mat>>,
259 attributes: Attributes,
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq)]
263pub enum SinkCombineStrategy {
264 Broadcast,
265 Balance,
266}
267
268impl<Mat: Send + 'static> RunnableGraph<Mat> {
269 pub(super) fn from_runner<F>(runner: F) -> Self
270 where
271 F: Fn(&Materializer) -> StreamResult<Mat> + Send + Sync + 'static,
272 {
273 Self {
274 runner: Arc::new(runner),
275 attributes: Attributes::default(),
276 }
277 }
278
279 pub fn run(&self) -> StreamResult<Mat> {
280 Materializer::new().materialize(self)
281 }
282
283 pub fn run_with_materializer(&self, materializer: &Materializer) -> StreamResult<Mat> {
284 materializer.materialize(self)
285 }
286
287 #[must_use]
288 pub fn map_materialized_value<Next, F>(self, f: F) -> RunnableGraph<Next>
289 where
290 Next: Send + 'static,
291 F: Fn(Mat) -> Next + Send + Sync + 'static,
292 {
293 let f = Arc::new(f);
294 RunnableGraph::from_runner(move |materializer| {
295 let mat = (self.runner)(materializer)?;
296 Ok(f(mat))
297 })
298 }
299
300 #[must_use]
301 pub fn attributes(&self) -> &Attributes {
302 &self.attributes
303 }
304
305 #[must_use]
306 pub fn with_attributes(mut self, attributes: Attributes) -> Self {
307 self.attributes = attributes;
308 self
309 }
310
311 #[must_use]
312 pub fn add_attributes(mut self, attributes: Attributes) -> Self {
313 self.attributes = self.attributes.and(attributes);
314 self
315 }
316
317 #[must_use]
318 pub fn named(self, name: impl Into<String>) -> Self {
319 self.add_attributes(Attributes::named(name))
320 }
321}
322
323impl<In: Clone + Send + 'static> Sink<In, NotUsed> {
324 #[must_use]
325 pub fn combine<M1, M2, MRest, I>(
326 first: Sink<In, M1>,
327 second: Sink<In, M2>,
328 rest: I,
329 strategy: SinkCombineStrategy,
330 ) -> Sink<In, NotUsed>
331 where
332 M1: Send + 'static,
333 M2: Send + 'static,
334 MRest: Send + 'static,
335 I: IntoIterator<Item = Sink<In, MRest>>,
336 {
337 let mut runners: Vec<Arc<CombinedSinkRunner<In>>> = vec![
338 Arc::new(move |materializer| {
339 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
340 let mat = first.run(
341 Box::new(std::iter::from_fn(move || {
342 loop {
343 match receiver.recv().ok()? {
344 CombinedSinkMessage::Item(item) => return Some(item),
345 CombinedSinkMessage::Flush(ack) => {
346 let _ = ack.send(());
347 }
348 CombinedSinkMessage::Close => return None,
349 }
350 }
351 })),
352 materializer,
353 )?;
354 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
355 }),
356 Arc::new(move |materializer| {
357 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
358 let mat = second.run(
359 Box::new(std::iter::from_fn(move || {
360 loop {
361 match receiver.recv().ok()? {
362 CombinedSinkMessage::Item(item) => return Some(item),
363 CombinedSinkMessage::Flush(ack) => {
364 let _ = ack.send(());
365 }
366 CombinedSinkMessage::Close => return None,
367 }
368 }
369 })),
370 materializer,
371 )?;
372 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
373 }),
374 ];
375 runners.extend(rest.into_iter().map(|sink| {
376 Arc::new(move |materializer: &Materializer| {
377 let (sender, receiver) = std::sync::mpsc::sync_channel(0);
378 let mat = sink.run(
379 Box::new(std::iter::from_fn(move || {
380 loop {
381 match receiver.recv().ok()? {
382 CombinedSinkMessage::Item(item) => return Some(item),
383 CombinedSinkMessage::Flush(ack) => {
384 let _ = ack.send(());
385 }
386 CombinedSinkMessage::Close => return None,
387 }
388 }
389 })),
390 materializer,
391 )?;
392 Ok((sender, Box::new(mat) as Box<dyn std::any::Any + Send>))
393 }) as Arc<CombinedSinkRunner<In>>
394 }));
395
396 Sink::from_runner(move |mut input: BoxStream<In>, materializer| {
397 let mut children = runners
400 .iter()
401 .map(|runner| runner(materializer))
402 .collect::<StreamResult<Vec<_>>>()?;
403 let mut next = 0usize;
404 for item in input.by_ref() {
405 match item {
406 Ok(value) => match strategy {
407 SinkCombineStrategy::Broadcast => {
408 children.retain(|(sender, _)| {
409 sender
410 .send(CombinedSinkMessage::Item(Ok(value.clone())))
411 .is_ok()
412 });
413 if children.is_empty() {
414 break;
415 }
416 }
417 SinkCombineStrategy::Balance => {
418 while !children.is_empty() {
419 let index = next % children.len();
420 next = next.wrapping_add(1);
421 match children[index]
422 .0
423 .send(CombinedSinkMessage::Item(Ok(value.clone())))
424 {
425 Ok(()) => break,
426 Err(_) => {
427 children.remove(index);
428 }
429 }
430 }
431 if children.is_empty() {
432 break;
433 }
434 }
435 },
436 Err(error) => {
437 for (sender, _) in &children {
438 let _ = sender.send(CombinedSinkMessage::Item(Err(error.clone())));
439 }
440 return Err(error);
441 }
442 }
443 }
444 for (sender, _) in &children {
448 let (ack_sender, ack_receiver) = std::sync::mpsc::sync_channel(0);
449 if sender.send(CombinedSinkMessage::Flush(ack_sender)).is_ok() {
450 let _ = ack_receiver.recv();
451 }
452 }
453 let mats: Vec<_> = children
456 .into_iter()
457 .map(|(sender, mat)| {
458 let _ = sender.send(CombinedSinkMessage::Close);
459 mat
460 })
461 .collect();
462 drop(mats);
463 Ok(NotUsed)
464 })
465 }
466}
467
468impl<In: Send + 'static, Mat: Send + 'static> Sink<In, StreamCompletion<Mat>> {
469 fn from_task_runner<F>(runner: F) -> Self
470 where
471 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
472 {
473 Self::from_task_runner_with_inline(runner, false)
474 }
475
476 fn from_task_runner_with_inline<F>(runner: F, inline: bool) -> Self
477 where
478 F: Fn(BoxStream<In>) -> StreamResult<Mat> + Send + Sync + 'static,
479 {
480 let runner = Arc::new(runner);
481 let async_runner = {
482 let runner = Arc::clone(&runner);
483 Arc::new(move |input, materializer: &Materializer| {
484 let runner = Arc::clone(&runner);
485 let state = Arc::clone(&materializer.inner.state);
486 Ok(materializer.spawn_stream(move |cancelled| {
487 runner(runtime_checked_stream(input, state, Some(cancelled)))
488 }))
489 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
490 };
491 let inline_runner = inline.then(|| {
492 let runner = Arc::clone(&runner);
493 Arc::new(move |input, materializer: &Materializer| {
494 let runner = Arc::clone(&runner);
495 let state = Arc::clone(&materializer.inner.state);
496 Ok(materializer.spawn_stream_inline(move |cancelled| {
497 runner(runtime_checked_stream(input, state, Some(cancelled)))
498 }))
499 }) as Arc<SinkRunner<In, StreamCompletion<Mat>>>
500 });
501 Sink::from_runner_parts(async_runner, inline_runner)
502 }
503}
504
505impl<In: Send + 'static> Sink<In, StreamCompletion<Vec<In>>> {
506 #[must_use]
507 pub fn collect() -> Self {
508 let task_runner = Sink::from_task_runner(|input| input.collect());
509 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::CollectDescriptor::<In> {
510 _phantom: std::marker::PhantomData,
511 });
512 Sink {
513 runner: task_runner.runner,
514 inline_runner: task_runner.inline_runner,
515 attributes: task_runner.attributes,
516 deferred_factory: task_runner.deferred_factory,
517 fold_fp: Some(fp),
518 }
519 }
520
521 #[must_use]
522 pub fn collection() -> Self {
523 Self::collect()
524 }
525
526 #[must_use]
527 pub fn take_last(n: usize) -> Self {
528 Sink::from_task_runner(move |input| {
529 if n == 0 {
530 for item in input {
531 let _ = item?;
532 }
533 return Ok(Vec::new());
534 }
535 let mut buffer = VecDeque::with_capacity(n);
536 for item in input {
537 let item = item?;
538 if buffer.len() == n {
539 buffer.pop_front();
540 }
541 buffer.push_back(item);
542 }
543 Ok(buffer.into_iter().collect())
544 })
545 }
546}
547
548impl<In: Send + 'static> Sink<In, StreamCompletion<NotUsed>> {
549 #[must_use]
550 pub fn ignore() -> Self {
551 let task_runner = Sink::from_runner(|input, materializer| {
552 let state = Arc::clone(&materializer.inner.state);
553 Ok(materializer.spawn_stream(move |cancelled| {
554 let input = runtime_checked_stream(input, state, Some(cancelled));
555 for item in input {
556 item?;
557 }
558 Ok(NotUsed)
559 }))
560 });
561 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::IgnoreDescriptor::<In> {
562 _phantom: std::marker::PhantomData,
563 });
564 Sink {
565 runner: task_runner.runner,
566 inline_runner: task_runner.inline_runner,
567 attributes: task_runner.attributes,
568 deferred_factory: task_runner.deferred_factory,
569 fold_fp: Some(fp),
570 }
571 }
572
573 #[must_use]
574 pub fn on_complete<F>(callback: F) -> Self
575 where
576 F: FnOnce() + Send + Sync + 'static,
577 {
578 let callback = Arc::new(Mutex::new(Some(callback)));
579 Sink::from_task_runner(move |input| {
580 for item in input {
581 item?;
582 }
583 if let Some(cb) = callback.lock().expect("on_complete poisoned").take() {
584 cb();
585 }
586 Ok(NotUsed)
587 })
588 }
589
590 #[must_use]
591 pub fn never() -> Self {
592 Sink::from_runner(|input, materializer| {
593 let state = Arc::clone(&materializer.inner.state);
594 let shutdown_state = Arc::clone(&state);
595 Ok(materializer.spawn_stream(move |cancelled| {
596 let input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
597 for item in input {
598 item?;
599 }
600 loop {
601 if shutdown_state.shutdown.load(Ordering::SeqCst) {
602 return Err(StreamError::AbruptTermination);
603 }
604 if cancelled.load(Ordering::SeqCst) {
605 return Err(StreamError::Cancelled);
606 }
607 thread::sleep(Duration::from_millis(1));
608 }
609 }))
610 })
611 }
612
613 #[must_use]
614 pub fn foreach<F>(f: F) -> Self
615 where
616 F: Fn(In) + Send + Sync + 'static,
617 {
618 Sink::from_task_runner(move |input| {
619 for item in input {
620 f(item?);
621 }
622 Ok(NotUsed)
623 })
624 }
625
626 #[must_use]
627 pub fn foreach_async<F, Fut>(parallelism: usize, f: F) -> Self
628 where
629 F: Fn(In) -> Fut + Send + Sync + 'static,
630 Fut: Future<Output = StreamResult<()>> + Send + 'static,
631 {
632 Flow::identity()
633 .map_async_unordered(parallelism, f)
634 .to_mat(Sink::ignore(), Keep::right)
635 }
636
637 #[must_use]
638 pub fn foreach_result<F>(f: F) -> Self
639 where
640 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
641 {
642 Sink::from_task_runner(move |input| {
643 for item in input {
644 f(item?)?;
645 }
646 Ok(NotUsed)
647 })
648 }
649
650 #[must_use]
651 pub fn foreach_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
652 where
653 F: Fn(In) -> StreamResult<()> + Send + Sync + 'static,
654 {
655 Sink::from_task_runner(move |input| {
656 for item in input {
657 let item = item?;
658 match catch_unwind(AssertUnwindSafe(|| f(item)))
659 .unwrap_or_else(|_| Err(panic_stream_error("foreach_result callback")))
660 {
661 Ok(()) => {}
662 Err(error) => match decide_supervision(&decider, &error) {
663 SupervisionDirective::Stop => return Err(error),
664 SupervisionDirective::Resume | SupervisionDirective::Restart => {}
665 },
666 }
667 }
668 Ok(NotUsed)
669 })
670 }
671}
672
673impl<In: Send + 'static> Sink<In, StreamCompletion<In>> {
674 #[must_use]
691 pub fn head() -> Self {
692 Sink::from_task_runner_with_inline(
693 |mut input| input.next().unwrap_or(Err(StreamError::EmptyStream)),
694 true,
695 )
696 }
697
698 #[must_use]
699 pub fn last() -> Self {
700 Sink::from_task_runner(|input| {
701 let mut last = None;
702 for item in input {
703 last = Some(item?);
704 }
705 last.ok_or(StreamError::EmptyStream)
706 })
707 }
708
709 #[must_use]
710 pub fn reduce<F>(f: F) -> Self
711 where
712 F: Fn(In, In) -> In + Send + Sync + 'static,
713 {
714 Sink::from_task_runner(move |mut input| {
715 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
716 for item in input {
717 acc = f(acc, item?);
718 }
719 Ok(acc)
720 })
721 }
722
723 #[must_use]
724 pub fn reduce_result<F>(f: F) -> Self
725 where
726 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
727 {
728 Sink::from_task_runner(move |mut input| {
729 let mut acc = input.next().unwrap_or(Err(StreamError::EmptyStream))?;
730 for item in input {
731 acc = f(acc, item?)?;
732 }
733 Ok(acc)
734 })
735 }
736
737 #[must_use]
738 pub fn reduce_result_with_supervision<F>(f: F, decider: SupervisionDecider) -> Self
739 where
740 In: Clone,
741 F: Fn(In, In) -> StreamResult<In> + Send + Sync + 'static,
742 {
743 Sink::from_task_runner(move |mut input: BoxStream<In>| {
744 let mut acc = Some(input.next().unwrap_or(Err(StreamError::EmptyStream))?);
745 for item in input {
746 let item = item?;
747 let Some(previous) = acc.take() else {
748 acc = Some(item);
749 continue;
750 };
751 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
752 .unwrap_or_else(|_| Err(panic_stream_error("reduce_result callback")))
753 {
754 Ok(next) => acc = Some(next),
755 Err(error) => match decide_supervision(&decider, &error) {
756 SupervisionDirective::Stop => return Err(error),
757 SupervisionDirective::Resume => acc = Some(previous),
758 SupervisionDirective::Restart => acc = None,
759 },
760 }
761 }
762 acc.ok_or(StreamError::EmptyStream)
763 })
764 }
765}
766
767impl<In: Send + 'static> Sink<In, StreamCompletion<Option<In>>> {
768 #[must_use]
785 pub fn head_option() -> Self {
786 Sink::from_task_runner_with_inline(
787 |mut input| match input.next() {
788 Some(Ok(item)) => Ok(Some(item)),
789 Some(Err(error)) => Err(error),
790 None => Ok(None),
791 },
792 true,
793 )
794 }
795
796 #[must_use]
797 pub fn last_option() -> Self {
798 Sink::from_task_runner(|input| {
799 let mut last = None;
800 for item in input {
801 last = Some(item?);
802 }
803 Ok(last)
804 })
805 }
806}
807
808impl<In: Send + 'static> Sink<In, NotUsed> {
809 #[must_use]
810 pub fn cancelled() -> Self {
811 Sink::from_runner(|_input, _materializer| Ok(NotUsed))
812 }
813
814 #[must_use]
815 pub fn future_sink<InnerMat, F, Fut>(future: F) -> Sink<In, StreamCompletion<InnerMat>>
816 where
817 InnerMat: Send + 'static,
818 F: Fn() -> Fut + Send + Sync + 'static,
819 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
820 {
821 Self::lazy_future_sink(future)
822 }
823
824 #[must_use]
825 pub fn lazy_sink<InnerMat, F>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
826 where
827 InnerMat: Send + 'static,
828 F: Fn() -> Sink<In, InnerMat> + Send + Sync + 'static,
829 {
830 let create = Arc::new(create);
831 Sink::from_runner(move |input, materializer| {
832 let create = Arc::clone(&create);
833 let state = Arc::clone(&materializer.inner.state);
834 let worker_materializer =
835 materializer.with_name_prefix(materializer.name_prefix().to_owned());
836 Ok(materializer.spawn_stream(move |cancelled| {
837 let input = runtime_checked_stream(input, state, Some(cancelled));
838 run_lazy_sink(input, &worker_materializer, move || {
839 catch_unwind_failed("lazy_sink factory", || create())
840 })
841 }))
842 })
843 }
844
845 #[must_use]
846 pub fn lazy_future_sink<InnerMat, F, Fut>(create: F) -> Sink<In, StreamCompletion<InnerMat>>
847 where
848 InnerMat: Send + 'static,
849 F: Fn() -> Fut + Send + Sync + 'static,
850 Fut: Future<Output = StreamResult<Sink<In, InnerMat>>> + Send + 'static,
851 {
852 let create = Arc::new(create);
853 Sink::from_runner(move |input, materializer| {
854 let create = Arc::clone(&create);
855 let state = Arc::clone(&materializer.inner.state);
856 let worker_materializer =
857 materializer.with_name_prefix(materializer.name_prefix().to_owned());
858 Ok(materializer.spawn_stream(move |cancelled| {
859 let input = runtime_checked_stream(input, state, Some(cancelled));
860 run_lazy_sink(input, &worker_materializer, move || {
861 catch_unwind_failed("lazy_future_sink factory", || create())
862 .and_then(flow::run_future_inline_or_spawn)
863 })
864 }))
865 })
866 }
867
868 #[must_use]
869 pub fn fold<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
870 where
871 Acc: Clone + Send + Sync + 'static,
872 F: Fn(Acc, In) -> Acc + Send + Sync + 'static,
873 {
874 let f_arc = Arc::new(f);
875 let zero_clone = zero.clone();
876 let f_arc2 = Arc::clone(&f_arc);
877 let task_runner = {
878 let zero = zero;
879 Sink::from_task_runner(move |input| {
880 let mut acc = zero.clone();
881 for item in input {
882 acc = f_arc(acc, item?);
883 }
884 Ok(acc)
885 })
886 };
887 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldDescriptor {
888 zero: zero_clone,
889 f: f_arc2,
890 });
891 Sink {
892 runner: task_runner.runner,
893 inline_runner: task_runner.inline_runner,
894 attributes: task_runner.attributes,
895 deferred_factory: task_runner.deferred_factory,
896 fold_fp: Some(fp),
897 }
898 }
899
900 #[must_use]
901 pub fn fold_result<Acc, F>(zero: Acc, f: F) -> Sink<In, StreamCompletion<Acc>>
902 where
903 Acc: Clone + Send + Sync + 'static,
904 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
905 {
906 let f_arc = Arc::new(f);
907 let zero_clone = zero.clone();
908 let f_arc2 = Arc::clone(&f_arc);
909 let task_runner = {
910 let zero = zero;
911 Sink::from_task_runner(move |input| {
912 let mut acc = zero.clone();
913 for item in input {
914 acc = f_arc(acc, item?)?;
915 }
916 Ok(acc)
917 })
918 };
919 let fp: Arc<dyn FoldFastPathDyn<In>> = Arc::new(flow::FoldResultDescriptor {
920 zero: zero_clone,
921 f: f_arc2,
922 });
923 Sink {
924 runner: task_runner.runner,
925 inline_runner: task_runner.inline_runner,
926 attributes: task_runner.attributes,
927 deferred_factory: task_runner.deferred_factory,
928 fold_fp: Some(fp),
929 }
930 }
931
932 #[must_use]
933 pub fn fold_result_with_supervision<Acc, F>(
934 zero: Acc,
935 f: F,
936 decider: SupervisionDecider,
937 ) -> Sink<In, StreamCompletion<Acc>>
938 where
939 Acc: Clone + Send + Sync + 'static,
940 F: Fn(Acc, In) -> StreamResult<Acc> + Send + Sync + 'static,
941 {
942 Sink::from_task_runner(move |input| {
943 let mut acc = zero.clone();
944 for item in input {
945 let item = item?;
946 let previous = acc;
947 match catch_unwind(AssertUnwindSafe(|| f(previous.clone(), item)))
948 .unwrap_or_else(|_| Err(panic_stream_error("fold_result callback")))
949 {
950 Ok(next) => acc = next,
951 Err(error) => match decide_supervision(&decider, &error) {
952 SupervisionDirective::Stop => return Err(error),
953 SupervisionDirective::Resume => acc = previous,
954 SupervisionDirective::Restart => acc = zero.clone(),
955 },
956 }
957 }
958 Ok(acc)
959 })
960 }
961}
962
963fn run_lazy_sink<In, InnerMat, F>(
964 mut input: BoxStream<In>,
965 materializer: &Materializer,
966 create: F,
967) -> StreamResult<InnerMat>
968where
969 In: Send + 'static,
970 InnerMat: Send + 'static,
971 F: FnOnce() -> StreamResult<Sink<In, InnerMat>>,
972{
973 let first = match input.next() {
974 Some(Ok(item)) => item,
975 Some(Err(error)) => return Err(error),
976 None => {
977 return Err(StreamError::Failed(
978 "lazy sink was never materialized".into(),
979 ));
980 }
981 };
982 let sink = create()?;
983 sink.run(prepend_first_stream(first, input), materializer)
984}
985
986fn prepend_first_stream<In>(first: In, mut rest: BoxStream<In>) -> BoxStream<In>
987where
988 In: Send + 'static,
989{
990 let mut first = Some(first);
991 Box::new(std::iter::from_fn(move || {
992 if let Some(item) = first.take() {
993 Some(Ok(item))
994 } else {
995 rest.next()
996 }
997 }))
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use super::*;
1003 use crate::{Source, StreamCompletion};
1004
1005 #[test]
1006 fn map_materialized_value_on_deferred_sink_does_not_explode() {
1007 let sink = Sink::<u64, _>::setup(|_, _| Sink::fold(0u64, |acc, x| acc + x));
1010 let sink = sink
1014 .map_materialized_value(|sc: StreamCompletion<u64>| sc)
1015 .map_materialized_value(|sc| sc);
1016
1017 let sum = Source::from_iter(1u64..=3)
1018 .run_with(sink)
1019 .unwrap()
1020 .wait()
1021 .unwrap();
1022 assert_eq!(sum, 6u64);
1023 }
1024}