1#![allow(clippy::type_complexity, clippy::should_implement_trait)]
19
20use alloc::boxed::Box;
21use core::marker::PhantomData;
22
23use crate::batch::{Batch, BatchPolicy, BatchStage, BatchStageBytes, ByteSize};
24use crate::driver::{RunStats, SyncDriver};
25use crate::emit::{Emit, EmitError};
26use crate::error::{Error, ErrorPolicy, Result, StageError, StageFailure};
27use crate::sink::Sink;
28use crate::source::{IterSource, Source};
29use crate::stage::Stage;
30use crate::stage_id::StageId;
31
32#[doc(hidden)]
40pub enum StageOp<T> {
41 Process(T),
42 Flush,
43 Close,
44}
45
46#[doc(hidden)]
52pub type BoxedStageFn<T> = Box<dyn FnMut(StageOp<T>) -> Result<()> + Send + 'static>;
53
54struct StageEmit<'a, U> {
59 next_fn: &'a mut BoxedStageFn<U>,
60 cached_err: Option<Error>,
61}
62
63#[doc(hidden)]
69#[cfg(feature = "std")]
70pub enum DeadLetterOp {
71 Send(StageFailure),
73 Flush,
75 Close,
77}
78
79#[doc(hidden)]
81#[cfg(feature = "std")]
82pub type DeadLetterFn = Box<dyn FnMut(DeadLetterOp) -> Result<()> + Send + 'static>;
83
84#[cfg(feature = "std")]
90#[derive(Clone, Default)]
91pub(crate) struct DeadLetter {
92 inner: std::sync::Arc<std::sync::Mutex<Option<DeadLetterFn>>>,
93}
94
95#[cfg(feature = "std")]
96impl DeadLetter {
97 fn new() -> Self {
98 Self::default()
99 }
100
101 fn install(&self, f: DeadLetterFn) {
102 *self.inner.lock().expect("dead-letter mutex poisoned") = Some(f);
103 }
104
105 fn route(&self, failure: StageFailure) -> Result<()> {
106 let mut guard = self.inner.lock().expect("dead-letter mutex poisoned");
107 match guard.as_mut() {
108 Some(f) => f(DeadLetterOp::Send(failure)),
109 None => Ok(()),
110 }
111 }
112
113 fn finish(&self) -> Result<()> {
114 let mut guard = self.inner.lock().expect("dead-letter mutex poisoned");
115 if let Some(f) = guard.as_mut() {
116 f(DeadLetterOp::Flush)?;
117 f(DeadLetterOp::Close)?;
118 }
119 Ok(())
120 }
121}
122
123#[cfg(not(feature = "std"))]
127#[derive(Clone, Default)]
128pub(crate) struct DeadLetter;
129
130#[cfg(not(feature = "std"))]
131impl DeadLetter {
132 fn new() -> Self {
133 Self
134 }
135
136 fn route(&self, _failure: StageFailure) -> Result<()> {
137 Ok(())
138 }
139
140 fn finish(&self) -> Result<()> {
141 Ok(())
142 }
143}
144
145impl<'a, U> Emit for StageEmit<'a, U> {
146 type Item = U;
147
148 fn emit(&mut self, item: U) -> core::result::Result<(), EmitError> {
149 if self.cached_err.is_some() {
150 return Err(EmitError::Closed);
151 }
152 match (self.next_fn)(StageOp::Process(item)) {
153 Ok(()) => Ok(()),
154 Err(e) => {
155 self.cached_err = Some(e);
156 Err(EmitError::Closed)
157 }
158 }
159 }
160}
161
162pub struct Pipeline<S>
165where
166 S: Source,
167{
168 pub(crate) source: S,
169 pub(crate) source_id: StageId,
170 pub(crate) stage_fn: BoxedStageFn<S::Item>,
171 pub(crate) dead_letter: DeadLetter,
172}
173
174impl<S> Pipeline<S>
175where
176 S: Source + 'static,
177 S::Item: Send + 'static,
178 S::Error: Send + 'static,
179{
180 pub fn from_source(
182 source: S,
183 ) -> PipelineBuilder<
184 S::Item,
185 S,
186 impl FnOnce(BoxedStageFn<S::Item>) -> BoxedStageFn<S::Item> + Send + 'static,
187 > {
188 PipelineBuilder {
189 source,
190 source_id: StageId::new("source"),
191 finalize: identity_finalize::<S::Item>,
192 error_policy: ErrorPolicy::FailFast,
193 pending_stage_id: None,
194 dead_letter: DeadLetter::new(),
195 _marker: PhantomData,
196 }
197 }
198
199 pub fn run(self) -> Result<RunStats> {
206 SyncDriver::new().run(self)
207 }
208
209 pub fn run_with<D>(self, driver: D) -> Result<RunStats>
225 where
226 D: crate::driver::Driver,
227 S: Send,
228 S::Item: Send,
229 S::Error: Send,
230 {
231 driver.run(self)
232 }
233
234 #[cfg(feature = "std")]
242 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
243 pub fn run_threaded(self) -> Result<RunStats>
244 where
245 S: Send,
246 S::Item: Send,
247 S::Error: Send,
248 {
249 crate::driver::ThreadedDriver::new().run(self)
250 }
251}
252
253impl Pipeline<IterSource<core::iter::Empty<()>>> {
254 pub fn from_iter<II>(
266 iter: II,
267 ) -> PipelineBuilder<
268 II::Item,
269 IterSource<II::IntoIter>,
270 impl FnOnce(BoxedStageFn<II::Item>) -> BoxedStageFn<II::Item> + Send + 'static,
271 >
272 where
273 II: IntoIterator,
274 II::Item: Send + 'static,
275 II::IntoIter: Send + 'static,
276 {
277 Pipeline::from_source(IterSource::new(iter))
278 }
279}
280
281pub struct PipelineBuilder<T, S, Acc>
283where
284 S: Source,
285 Acc: FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static,
286{
287 source: S,
288 source_id: StageId,
289 finalize: Acc,
290 error_policy: ErrorPolicy,
291 pending_stage_id: Option<StageId>,
292 dead_letter: DeadLetter,
293 _marker: PhantomData<fn() -> T>,
294}
295
296fn identity_finalize<T: 'static + Send>(f: BoxedStageFn<T>) -> BoxedStageFn<T> {
297 f
298}
299
300fn handle_stage_error<E: StageError>(
308 policy: ErrorPolicy,
309 stage_id: StageId,
310 err: E,
311 dead_letter: &DeadLetter,
312) -> Result<()> {
313 match policy {
314 ErrorPolicy::FailFast => Err(Error::Stage {
315 stage: stage_id,
316 source: Box::new(err),
317 }),
318 ErrorPolicy::Continue => Ok(()),
319 ErrorPolicy::DeadLetter => dead_letter.route(StageFailure::new(stage_id, Box::new(err))),
320 }
321}
322
323impl<T, S, Acc> PipelineBuilder<T, S, Acc>
324where
325 S: Source + 'static,
326 S::Item: Send + 'static,
327 S::Error: Send + 'static,
328 T: Send + 'static,
329 Acc: FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static,
330{
331 #[must_use]
334 pub fn stage_id<I: Into<StageId>>(mut self, id: I) -> Self {
335 self.pending_stage_id = Some(id.into());
336 self
337 }
338
339 #[must_use]
342 pub fn on_error(mut self, policy: ErrorPolicy) -> Self {
343 self.error_policy = policy;
344 self
345 }
346
347 #[cfg(feature = "std")]
361 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
362 pub fn dead_letter<Sk>(self, sink: Sk) -> Self
363 where
364 Sk: Sink<Item = StageFailure> + Send + 'static,
365 Sk::Error: 'static,
366 {
367 let mut sink = sink;
368 let sink_id = StageId::new("dead_letter");
369 let f: DeadLetterFn = Box::new(move |op| match op {
370 DeadLetterOp::Send(failure) => sink.write(failure).map_err(|e| Error::Sink {
371 stage: sink_id,
372 source: Box::new(e),
373 }),
374 DeadLetterOp::Flush => sink.flush().map_err(|e| Error::Sink {
375 stage: sink_id,
376 source: Box::new(e),
377 }),
378 DeadLetterOp::Close => sink.close().map_err(|e| Error::Sink {
379 stage: sink_id,
380 source: Box::new(e),
381 }),
382 });
383 self.dead_letter.install(f);
384 self
385 }
386
387 pub fn map<U, F>(
389 self,
390 mut f: F,
391 ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
392 where
393 U: Send + 'static,
394 F: FnMut(T) -> U + Send + 'static,
395 {
396 let old_finalize = self.finalize;
397 let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
398 let mut next = next;
399 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
400 StageOp::Process(item) => next(StageOp::Process(f(item))),
401 StageOp::Flush => next(StageOp::Flush),
402 StageOp::Close => next(StageOp::Close),
403 });
404 old_finalize(t_fn)
405 };
406 PipelineBuilder {
407 source: self.source,
408 source_id: self.source_id,
409 finalize: new_finalize,
410 error_policy: self.error_policy,
411 pending_stage_id: None,
412 dead_letter: self.dead_letter,
413 _marker: PhantomData,
414 }
415 }
416
417 pub fn filter<F>(
420 self,
421 mut pred: F,
422 ) -> PipelineBuilder<T, S, impl FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static>
423 where
424 F: FnMut(&T) -> bool + Send + 'static,
425 {
426 let old_finalize = self.finalize;
427 let new_finalize = move |next: BoxedStageFn<T>| -> BoxedStageFn<S::Item> {
428 let mut next = next;
429 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
430 StageOp::Process(item) => {
431 if pred(&item) {
432 next(StageOp::Process(item))
433 } else {
434 Ok(())
435 }
436 }
437 StageOp::Flush => next(StageOp::Flush),
438 StageOp::Close => next(StageOp::Close),
439 });
440 old_finalize(t_fn)
441 };
442 PipelineBuilder {
443 source: self.source,
444 source_id: self.source_id,
445 finalize: new_finalize,
446 error_policy: self.error_policy,
447 pending_stage_id: None,
448 dead_letter: self.dead_letter,
449 _marker: PhantomData,
450 }
451 }
452
453 pub fn filter_map<U, F>(
455 self,
456 mut f: F,
457 ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
458 where
459 U: Send + 'static,
460 F: FnMut(T) -> Option<U> + Send + 'static,
461 {
462 let old_finalize = self.finalize;
463 let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
464 let mut next = next;
465 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
466 StageOp::Process(item) => match f(item) {
467 Some(out) => next(StageOp::Process(out)),
468 None => Ok(()),
469 },
470 StageOp::Flush => next(StageOp::Flush),
471 StageOp::Close => next(StageOp::Close),
472 });
473 old_finalize(t_fn)
474 };
475 PipelineBuilder {
476 source: self.source,
477 source_id: self.source_id,
478 finalize: new_finalize,
479 error_policy: self.error_policy,
480 pending_stage_id: None,
481 dead_letter: self.dead_letter,
482 _marker: PhantomData,
483 }
484 }
485
486 pub fn flat_map<U, F, II>(
488 self,
489 mut f: F,
490 ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
491 where
492 U: Send + 'static,
493 II: IntoIterator<Item = U>,
494 F: FnMut(T) -> II + Send + 'static,
495 {
496 let old_finalize = self.finalize;
497 let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
498 let mut next = next;
499 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
500 StageOp::Process(item) => {
501 for out in f(item) {
502 next(StageOp::Process(out))?;
503 }
504 Ok(())
505 }
506 StageOp::Flush => next(StageOp::Flush),
507 StageOp::Close => next(StageOp::Close),
508 });
509 old_finalize(t_fn)
510 };
511 PipelineBuilder {
512 source: self.source,
513 source_id: self.source_id,
514 finalize: new_finalize,
515 error_policy: self.error_policy,
516 pending_stage_id: None,
517 dead_letter: self.dead_letter,
518 _marker: PhantomData,
519 }
520 }
521
522 pub fn inspect<F>(
524 self,
525 mut f: F,
526 ) -> PipelineBuilder<T, S, impl FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static>
527 where
528 F: FnMut(&T) + Send + 'static,
529 {
530 let old_finalize = self.finalize;
531 let new_finalize = move |next: BoxedStageFn<T>| -> BoxedStageFn<S::Item> {
532 let mut next = next;
533 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
534 StageOp::Process(item) => {
535 f(&item);
536 next(StageOp::Process(item))
537 }
538 StageOp::Flush => next(StageOp::Flush),
539 StageOp::Close => next(StageOp::Close),
540 });
541 old_finalize(t_fn)
542 };
543 PipelineBuilder {
544 source: self.source,
545 source_id: self.source_id,
546 finalize: new_finalize,
547 error_policy: self.error_policy,
548 pending_stage_id: None,
549 dead_letter: self.dead_letter,
550 _marker: PhantomData,
551 }
552 }
553
554 pub fn try_map<U, F, E>(
556 self,
557 mut f: F,
558 ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
559 where
560 U: Send + 'static,
561 E: StageError,
562 F: FnMut(T) -> core::result::Result<U, E> + Send + 'static,
563 {
564 let stage_id = self.pending_stage_id.unwrap_or(StageId::new("try_map"));
565 let policy = self.error_policy;
566 let old_finalize = self.finalize;
567 let dead_letter = self.dead_letter.clone();
568 let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
569 let mut next = next;
570 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
571 StageOp::Process(item) => match f(item) {
572 Ok(out) => next(StageOp::Process(out)),
573 Err(e) => handle_stage_error(policy, stage_id, e, &dead_letter),
574 },
575 StageOp::Flush => next(StageOp::Flush),
576 StageOp::Close => next(StageOp::Close),
577 });
578 old_finalize(t_fn)
579 };
580 PipelineBuilder {
581 source: self.source,
582 source_id: self.source_id,
583 finalize: new_finalize,
584 error_policy: self.error_policy,
585 pending_stage_id: None,
586 dead_letter: self.dead_letter,
587 _marker: PhantomData,
588 }
589 }
590
591 pub fn stage<St>(
593 self,
594 mut stage: St,
595 ) -> PipelineBuilder<
596 St::Output,
597 S,
598 impl FnOnce(BoxedStageFn<St::Output>) -> BoxedStageFn<S::Item> + Send + 'static,
599 >
600 where
601 St: Stage<Input = T> + Send + 'static,
602 St::Output: Send + 'static,
603 St::Error: 'static,
604 {
605 let stage_id = self.pending_stage_id.unwrap_or(StageId::new("stage"));
606 let policy = self.error_policy;
607 let old_finalize = self.finalize;
608 let dead_letter = self.dead_letter.clone();
609 let new_finalize = move |next: BoxedStageFn<St::Output>| -> BoxedStageFn<S::Item> {
610 let mut next = next;
611 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
612 StageOp::Process(item) => {
613 let mut adapter = StageEmit {
614 next_fn: &mut next,
615 cached_err: None,
616 };
617 let stage_result = stage.process(item, &mut adapter);
618 if let Some(err) = adapter.cached_err {
619 return Err(err);
620 }
621 match stage_result {
622 Ok(()) => Ok(()),
623 Err(e) => handle_stage_error(policy, stage_id, e, &dead_letter),
624 }
625 }
626 StageOp::Flush => {
627 let mut adapter = StageEmit {
628 next_fn: &mut next,
629 cached_err: None,
630 };
631 let stage_result = stage.flush(&mut adapter);
632 if let Some(err) = adapter.cached_err {
633 return Err(err);
634 }
635 match stage_result {
636 Ok(()) => next(StageOp::Flush),
637 Err(e) => handle_stage_error(policy, stage_id, e, &dead_letter),
638 }
639 }
640 StageOp::Close => next(StageOp::Close),
641 });
642 old_finalize(t_fn)
643 };
644 PipelineBuilder {
645 source: self.source,
646 source_id: self.source_id,
647 finalize: new_finalize,
648 error_policy: self.error_policy,
649 pending_stage_id: None,
650 dead_letter: self.dead_letter,
651 _marker: PhantomData,
652 }
653 }
654
655 pub fn batch(
665 mut self,
666 policy: BatchPolicy,
667 ) -> PipelineBuilder<
668 Batch<T>,
669 S,
670 impl FnOnce(BoxedStageFn<Batch<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
671 > {
672 assert!(
673 policy.has_trigger(),
674 "BatchPolicy must have at least one trigger configured"
675 );
676 assert!(
677 policy.bytes_limit().is_none(),
678 "BatchPolicy::max_bytes requires PipelineBuilder::batch_bytes (T: ByteSize)"
679 );
680 let id = self
681 .pending_stage_id
682 .take()
683 .unwrap_or(StageId::new("batch"));
684 self.stage_id(id).stage(BatchStage::<T>::new(policy))
685 }
686
687 pub fn batch_bytes(
694 mut self,
695 policy: BatchPolicy,
696 ) -> PipelineBuilder<
697 Batch<T>,
698 S,
699 impl FnOnce(BoxedStageFn<Batch<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
700 >
701 where
702 T: ByteSize,
703 {
704 assert!(
705 policy.has_trigger(),
706 "BatchPolicy must have at least one trigger configured"
707 );
708 let id = self
709 .pending_stage_id
710 .take()
711 .unwrap_or(StageId::new("batch"));
712 self.stage_id(id).stage(BatchStageBytes::<T>::new(policy))
713 }
714
715 #[cfg(feature = "std")]
724 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
725 pub fn window(
726 mut self,
727 policy: crate::window::WindowPolicy,
728 ) -> PipelineBuilder<
729 crate::window::Window<T>,
730 S,
731 impl FnOnce(BoxedStageFn<crate::window::Window<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
732 >
733 where
734 T: Clone,
735 {
736 let id = self
737 .pending_stage_id
738 .take()
739 .unwrap_or(StageId::new("window"));
740 self.stage_id(id).stage(
741 crate::window::WindowStage::<T, crate::window::SystemClock>::new(
742 policy,
743 crate::window::SystemClock,
744 ),
745 )
746 }
747
748 #[cfg(feature = "std")]
752 #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
753 pub fn window_with<C>(
754 mut self,
755 policy: crate::window::WindowPolicy,
756 clock: C,
757 ) -> PipelineBuilder<
758 crate::window::Window<T>,
759 S,
760 impl FnOnce(BoxedStageFn<crate::window::Window<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
761 >
762 where
763 T: Clone,
764 C: crate::window::Clock + 'static,
765 {
766 let id = self
767 .pending_stage_id
768 .take()
769 .unwrap_or(StageId::new("window"));
770 self.stage_id(id)
771 .stage(crate::window::WindowStage::<T, C>::new(policy, clock))
772 }
773
774 pub fn sink<Sk>(self, sink: Sk) -> Pipeline<S>
776 where
777 Sk: Sink<Item = T> + Send + 'static,
778 Sk::Error: 'static,
779 {
780 let mut sink = sink;
781 let sink_id = self.pending_stage_id.unwrap_or(StageId::new("sink"));
782 let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
783 StageOp::Process(item) => sink.write(item).map_err(|e| Error::Sink {
784 stage: sink_id,
785 source: Box::new(e),
786 }),
787 StageOp::Flush => sink.flush().map_err(|e| Error::Sink {
788 stage: sink_id,
789 source: Box::new(e),
790 }),
791 StageOp::Close => sink.close().map_err(|e| Error::Sink {
792 stage: sink_id,
793 source: Box::new(e),
794 }),
795 });
796 let item_fn: BoxedStageFn<S::Item> = (self.finalize)(t_fn);
797 Pipeline {
798 source: self.source,
799 source_id: self.source_id,
800 stage_fn: item_fn,
801 dead_letter: self.dead_letter,
802 }
803 }
804}
805
806pub(crate) fn run_sync<S>(mut pipeline: Pipeline<S>) -> Result<RunStats>
812where
813 S: Source + 'static,
814 S::Item: 'static,
815 S::Error: 'static,
816{
817 #[cfg(feature = "std")]
818 let start = std::time::Instant::now();
819
820 let mut stats = RunStats::default();
821
822 loop {
823 match pipeline.source.pull() {
824 Ok(Some(item)) => {
825 stats.items_in = stats.items_in.saturating_add(1);
826 (pipeline.stage_fn)(StageOp::Process(item))?;
827 }
828 Ok(None) => break,
829 Err(e) => {
830 return Err(Error::Source {
831 stage: pipeline.source_id,
832 source: Box::new(e),
833 });
834 }
835 }
836 }
837
838 (pipeline.stage_fn)(StageOp::Flush)?;
839 (pipeline.stage_fn)(StageOp::Close)?;
840 let _ = pipeline.source.close();
841
842 pipeline.dead_letter.finish()?;
843
844 #[cfg(feature = "std")]
845 {
846 stats.duration = start.elapsed();
847 }
848
849 Ok(stats)
850}