Skip to main content

pipe_io/
pipeline.rs

1//! [`Pipeline`], [`PipelineBuilder`], and the synchronous run loop.
2//!
3//! The builder accumulates a closure that, given a sink-side
4//! [`BoxedStageFn`] handling `T`, returns a source-side
5//! [`BoxedStageFn`] handling `S::Item`. Each chained builder method
6//! wraps the next closure in one more layer. At `.sink()`, the
7//! accumulated closure is invoked with a sink-terminated closure to
8//! materialize the full chain.
9
10// The builder methods return `impl FnOnce(...) -> ...` types. Clippy
11// flags these as "type_complexity", but they are inherent to the
12// type-state builder pattern - the only alternative is a `Box<dyn
13// FnOnce>` per stage, which costs an extra allocation per call.
14//
15// `from_iter` is a deliberate alias matching the iterator-construction
16// idiom; the standard `FromIterator::from_iter` signature does not fit
17// the source-builder shape.
18#![allow(clippy::type_complexity, clippy::should_implement_trait)]
19
20use alloc::boxed::Box;
21use core::marker::PhantomData;
22
23use crate::batch::{Batch, BatchPolicy, BatchStage, BatchStageBytes, ByteSize};
24use crate::driver::{RunStats, SyncDriver};
25use crate::emit::{Emit, EmitError};
26use crate::error::{Error, ErrorPolicy, Result, StageError, StageFailure};
27use crate::sink::Sink;
28use crate::source::{IterSource, Source};
29use crate::stage::Stage;
30use crate::stage_id::StageId;
31
32/// Item operation handed to every stage closure inside a built
33/// pipeline. The driver invokes the chain with `Process(item)` for
34/// each source item, then `Flush`, then `Close`.
35///
36/// This is an implementation detail of the builder type machinery.
37/// It must be `pub` because it appears in the `impl Trait` return
38/// types of the builder methods; consumers do not interact with it.
39#[doc(hidden)]
40pub enum StageOp<T> {
41    Process(T),
42    Flush,
43    Close,
44}
45
46/// Type-erased per-stage closure. The whole chain collapses into a
47/// nested set of these at `.sink()` time.
48///
49/// Implementation detail; `pub` only because it appears in the
50/// `impl Trait` return types of the builder methods.
51#[doc(hidden)]
52pub type BoxedStageFn<T> = Box<dyn FnMut(StageOp<T>) -> Result<()> + Send + 'static>;
53
54/// Internal [`Emit`] adapter that forwards into the next stage's
55/// [`BoxedStageFn`]. Caches any downstream error so the driver can
56/// surface it even when the upstream stage swallows
57/// [`EmitError::Closed`].
58struct StageEmit<'a, U> {
59    next_fn: &'a mut BoxedStageFn<U>,
60    cached_err: Option<Error>,
61}
62
63// ---------------------------------------------------------------------
64// Dead-letter routing
65// ---------------------------------------------------------------------
66
67/// Operations handed to the dead-letter sink closure.
68#[doc(hidden)]
69#[cfg(feature = "std")]
70pub enum DeadLetterOp {
71    /// Route a stage failure.
72    Send(StageFailure),
73    /// Flush the dead-letter sink.
74    Flush,
75    /// Close the dead-letter sink.
76    Close,
77}
78
79/// Boxed dead-letter sink closure.
80#[doc(hidden)]
81#[cfg(feature = "std")]
82pub type DeadLetterFn = Box<dyn FnMut(DeadLetterOp) -> Result<()> + Send + 'static>;
83
84/// Shared, cloneable handle to the dead-letter sink. Stages capture
85/// this at build time and route through it when their `ErrorPolicy`
86/// is [`ErrorPolicy::DeadLetter`]. The sink is installed (or replaced)
87/// when [`PipelineBuilder::dead_letter`] is called; if no sink is
88/// installed by `run` time, `DeadLetter` routes are silent drops.
89#[cfg(feature = "std")]
90#[derive(Clone, Default)]
91pub(crate) struct DeadLetter {
92    inner: std::sync::Arc<std::sync::Mutex<Option<DeadLetterFn>>>,
93}
94
95#[cfg(feature = "std")]
96impl DeadLetter {
97    fn new() -> Self {
98        Self::default()
99    }
100
101    fn install(&self, f: DeadLetterFn) {
102        *self.inner.lock().expect("dead-letter mutex poisoned") = Some(f);
103    }
104
105    fn route(&self, failure: StageFailure) -> Result<()> {
106        let mut guard = self.inner.lock().expect("dead-letter mutex poisoned");
107        match guard.as_mut() {
108            Some(f) => f(DeadLetterOp::Send(failure)),
109            None => Ok(()),
110        }
111    }
112
113    fn finish(&self) -> Result<()> {
114        let mut guard = self.inner.lock().expect("dead-letter mutex poisoned");
115        if let Some(f) = guard.as_mut() {
116            f(DeadLetterOp::Flush)?;
117            f(DeadLetterOp::Close)?;
118        }
119        Ok(())
120    }
121}
122
123/// No-std fallback. Always behaves as if no dead-letter sink is
124/// installed; routes are silent drops. `ErrorPolicy::DeadLetter`
125/// under no_std is therefore equivalent to `ErrorPolicy::Continue`.
126#[cfg(not(feature = "std"))]
127#[derive(Clone, Default)]
128pub(crate) struct DeadLetter;
129
130#[cfg(not(feature = "std"))]
131impl DeadLetter {
132    fn new() -> Self {
133        Self
134    }
135
136    fn route(&self, _failure: StageFailure) -> Result<()> {
137        Ok(())
138    }
139
140    fn finish(&self) -> Result<()> {
141        Ok(())
142    }
143}
144
145impl<'a, U> Emit for StageEmit<'a, U> {
146    type Item = U;
147
148    fn emit(&mut self, item: U) -> core::result::Result<(), EmitError> {
149        if self.cached_err.is_some() {
150            return Err(EmitError::Closed);
151        }
152        match (self.next_fn)(StageOp::Process(item)) {
153            Ok(()) => Ok(()),
154            Err(e) => {
155                self.cached_err = Some(e);
156                Err(EmitError::Closed)
157            }
158        }
159    }
160}
161
162/// A built pipeline. Run with [`Pipeline::run`] (sync) or
163/// [`Pipeline::run_threaded`] (std).
164pub struct Pipeline<S>
165where
166    S: Source,
167{
168    pub(crate) source: S,
169    pub(crate) source_id: StageId,
170    pub(crate) stage_fn: BoxedStageFn<S::Item>,
171    pub(crate) dead_letter: DeadLetter,
172}
173
174impl<S> Pipeline<S>
175where
176    S: Source + 'static,
177    S::Item: Send + 'static,
178    S::Error: Send + 'static,
179{
180    /// Start a new pipeline from a [`Source`].
181    pub fn from_source(
182        source: S,
183    ) -> PipelineBuilder<
184        S::Item,
185        S,
186        impl FnOnce(BoxedStageFn<S::Item>) -> BoxedStageFn<S::Item> + Send + 'static,
187    > {
188        PipelineBuilder {
189            source,
190            source_id: StageId::new("source"),
191            finalize: identity_finalize::<S::Item>,
192            error_policy: ErrorPolicy::FailFast,
193            pending_stage_id: None,
194            dead_letter: DeadLetter::new(),
195            _marker: PhantomData,
196        }
197    }
198
199    /// Run the pipeline to completion on the calling thread.
200    ///
201    /// # Errors
202    ///
203    /// Returns the first error produced by the source, any stage, or
204    /// the sink.
205    pub fn run(self) -> Result<RunStats> {
206        SyncDriver::new().run(self)
207    }
208
209    /// Run the pipeline with an explicit [`crate::driver::Driver`].
210    ///
211    /// Use this for built-in drivers when you want to be explicit
212    /// (`pipeline.run_with(ThreadedDriver::new())`) or for custom
213    /// executors (your own [`crate::driver::Driver`] impl).
214    ///
215    /// The [`crate::driver::Driver`] trait carries `Send` bounds on
216    /// the source and its item/error types; if your pipeline cannot
217    /// satisfy `Send`, call [`crate::driver::SyncDriver::run`]
218    /// directly instead (its inherent method has looser bounds).
219    ///
220    /// # Errors
221    ///
222    /// Returns the first error produced by the source, any stage, or
223    /// the sink.
224    pub fn run_with<D>(self, driver: D) -> Result<RunStats>
225    where
226        D: crate::driver::Driver,
227        S: Send,
228        S::Item: Send,
229        S::Error: Send,
230    {
231        driver.run(self)
232    }
233
234    /// Run the pipeline to completion on a spawned thread.
235    ///
236    /// # Errors
237    ///
238    /// Returns the first error produced by the source, any stage, or
239    /// the sink. Returns [`Error::Cancelled`] if the worker thread
240    /// panics.
241    #[cfg(feature = "std")]
242    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
243    pub fn run_threaded(self) -> Result<RunStats>
244    where
245        S: Send,
246        S::Item: Send,
247        S::Error: Send,
248    {
249        crate::driver::ThreadedDriver::new().run(self)
250    }
251}
252
253impl Pipeline<IterSource<core::iter::Empty<()>>> {
254    /// Start a new pipeline from any [`IntoIterator`].
255    ///
256    /// # Example
257    ///
258    /// ```
259    /// use pipe_io::{Pipeline, sink::VecSink};
260    /// let sink = VecSink::<i32>::new();
261    /// let handle = sink.handle();
262    /// Pipeline::from_iter(0..3).sink(sink).run().unwrap();
263    /// assert_eq!(handle.take(), vec![0, 1, 2]);
264    /// ```
265    pub fn from_iter<II>(
266        iter: II,
267    ) -> PipelineBuilder<
268        II::Item,
269        IterSource<II::IntoIter>,
270        impl FnOnce(BoxedStageFn<II::Item>) -> BoxedStageFn<II::Item> + Send + 'static,
271    >
272    where
273        II: IntoIterator,
274        II::Item: Send + 'static,
275        II::IntoIter: Send + 'static,
276    {
277        Pipeline::from_source(IterSource::new(iter))
278    }
279}
280
281/// Typed builder. Each transformer changes the carrier type `T`.
282pub struct PipelineBuilder<T, S, Acc>
283where
284    S: Source,
285    Acc: FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static,
286{
287    source: S,
288    source_id: StageId,
289    finalize: Acc,
290    error_policy: ErrorPolicy,
291    pending_stage_id: Option<StageId>,
292    dead_letter: DeadLetter,
293    _marker: PhantomData<fn() -> T>,
294}
295
296fn identity_finalize<T: 'static + Send>(f: BoxedStageFn<T>) -> BoxedStageFn<T> {
297    f
298}
299
300/// Resolve a stage error according to the active [`ErrorPolicy`].
301///
302/// Used by both `try_map` and `.stage()` to keep error handling
303/// consistent. `FailFast` returns the wrapped error; `Continue`
304/// swallows it; `DeadLetter` routes a [`StageFailure`] through the
305/// shared [`DeadLetter`] handle (or drops silently if no sink is
306/// installed).
307fn handle_stage_error<E: StageError>(
308    policy: ErrorPolicy,
309    stage_id: StageId,
310    err: E,
311    dead_letter: &DeadLetter,
312) -> Result<()> {
313    match policy {
314        ErrorPolicy::FailFast => Err(Error::Stage {
315            stage: stage_id,
316            source: Box::new(err),
317        }),
318        ErrorPolicy::Continue => Ok(()),
319        ErrorPolicy::DeadLetter => dead_letter.route(StageFailure::new(stage_id, Box::new(err))),
320    }
321}
322
323impl<T, S, Acc> PipelineBuilder<T, S, Acc>
324where
325    S: Source + 'static,
326    S::Item: Send + 'static,
327    S::Error: Send + 'static,
328    T: Send + 'static,
329    Acc: FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static,
330{
331    /// Label the next stage with a [`StageId`]. Applies to the next
332    /// `.map` / `.filter` / `.batch` / `.sink` / etc. call.
333    #[must_use]
334    pub fn stage_id<I: Into<StageId>>(mut self, id: I) -> Self {
335        self.pending_stage_id = Some(id.into());
336        self
337    }
338
339    /// Set the [`ErrorPolicy`] applied to stages added after this
340    /// call (until overridden).
341    #[must_use]
342    pub fn on_error(mut self, policy: ErrorPolicy) -> Self {
343        self.error_policy = policy;
344        self
345    }
346
347    /// Install a dead-letter sink that receives [`StageFailure`]
348    /// records produced by stages whose [`ErrorPolicy`] is
349    /// [`ErrorPolicy::DeadLetter`].
350    ///
351    /// The sink can be installed before or after the failing stages;
352    /// stages capture a shared handle at build time and resolve the
353    /// installed sink at run time. If `run` is called and no
354    /// dead-letter sink has been installed, `DeadLetter` failures
355    /// silently drop (same as [`ErrorPolicy::Continue`]).
356    ///
357    /// Calling `.dead_letter` more than once replaces the previous
358    /// sink. Errors raised by the dead-letter sink itself bubble up
359    /// from `run` as [`crate::Error::Sink`].
360    #[cfg(feature = "std")]
361    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
362    pub fn dead_letter<Sk>(self, sink: Sk) -> Self
363    where
364        Sk: Sink<Item = StageFailure> + Send + 'static,
365        Sk::Error: 'static,
366    {
367        let mut sink = sink;
368        let sink_id = StageId::new("dead_letter");
369        let f: DeadLetterFn = Box::new(move |op| match op {
370            DeadLetterOp::Send(failure) => sink.write(failure).map_err(|e| Error::Sink {
371                stage: sink_id,
372                source: Box::new(e),
373            }),
374            DeadLetterOp::Flush => sink.flush().map_err(|e| Error::Sink {
375                stage: sink_id,
376                source: Box::new(e),
377            }),
378            DeadLetterOp::Close => sink.close().map_err(|e| Error::Sink {
379                stage: sink_id,
380                source: Box::new(e),
381            }),
382        });
383        self.dead_letter.install(f);
384        self
385    }
386
387    /// Apply a 1:1 transform.
388    pub fn map<U, F>(
389        self,
390        mut f: F,
391    ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
392    where
393        U: Send + 'static,
394        F: FnMut(T) -> U + Send + 'static,
395    {
396        let old_finalize = self.finalize;
397        let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
398            let mut next = next;
399            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
400                StageOp::Process(item) => next(StageOp::Process(f(item))),
401                StageOp::Flush => next(StageOp::Flush),
402                StageOp::Close => next(StageOp::Close),
403            });
404            old_finalize(t_fn)
405        };
406        PipelineBuilder {
407            source: self.source,
408            source_id: self.source_id,
409            finalize: new_finalize,
410            error_policy: self.error_policy,
411            pending_stage_id: None,
412            dead_letter: self.dead_letter,
413            _marker: PhantomData,
414        }
415    }
416
417    /// Apply a predicate. Items for which `pred` returns `true` pass
418    /// through; the rest are dropped.
419    pub fn filter<F>(
420        self,
421        mut pred: F,
422    ) -> PipelineBuilder<T, S, impl FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static>
423    where
424        F: FnMut(&T) -> bool + Send + 'static,
425    {
426        let old_finalize = self.finalize;
427        let new_finalize = move |next: BoxedStageFn<T>| -> BoxedStageFn<S::Item> {
428            let mut next = next;
429            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
430                StageOp::Process(item) => {
431                    if pred(&item) {
432                        next(StageOp::Process(item))
433                    } else {
434                        Ok(())
435                    }
436                }
437                StageOp::Flush => next(StageOp::Flush),
438                StageOp::Close => next(StageOp::Close),
439            });
440            old_finalize(t_fn)
441        };
442        PipelineBuilder {
443            source: self.source,
444            source_id: self.source_id,
445            finalize: new_finalize,
446            error_policy: self.error_policy,
447            pending_stage_id: None,
448            dead_letter: self.dead_letter,
449            _marker: PhantomData,
450        }
451    }
452
453    /// Map and filter in one step.
454    pub fn filter_map<U, F>(
455        self,
456        mut f: F,
457    ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
458    where
459        U: Send + 'static,
460        F: FnMut(T) -> Option<U> + Send + 'static,
461    {
462        let old_finalize = self.finalize;
463        let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
464            let mut next = next;
465            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
466                StageOp::Process(item) => match f(item) {
467                    Some(out) => next(StageOp::Process(out)),
468                    None => Ok(()),
469                },
470                StageOp::Flush => next(StageOp::Flush),
471                StageOp::Close => next(StageOp::Close),
472            });
473            old_finalize(t_fn)
474        };
475        PipelineBuilder {
476            source: self.source,
477            source_id: self.source_id,
478            finalize: new_finalize,
479            error_policy: self.error_policy,
480            pending_stage_id: None,
481            dead_letter: self.dead_letter,
482            _marker: PhantomData,
483        }
484    }
485
486    /// Emit zero or more items per input.
487    pub fn flat_map<U, F, II>(
488        self,
489        mut f: F,
490    ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
491    where
492        U: Send + 'static,
493        II: IntoIterator<Item = U>,
494        F: FnMut(T) -> II + Send + 'static,
495    {
496        let old_finalize = self.finalize;
497        let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
498            let mut next = next;
499            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
500                StageOp::Process(item) => {
501                    for out in f(item) {
502                        next(StageOp::Process(out))?;
503                    }
504                    Ok(())
505                }
506                StageOp::Flush => next(StageOp::Flush),
507                StageOp::Close => next(StageOp::Close),
508            });
509            old_finalize(t_fn)
510        };
511        PipelineBuilder {
512            source: self.source,
513            source_id: self.source_id,
514            finalize: new_finalize,
515            error_policy: self.error_policy,
516            pending_stage_id: None,
517            dead_letter: self.dead_letter,
518            _marker: PhantomData,
519        }
520    }
521
522    /// Observe items without modifying the stream.
523    pub fn inspect<F>(
524        self,
525        mut f: F,
526    ) -> PipelineBuilder<T, S, impl FnOnce(BoxedStageFn<T>) -> BoxedStageFn<S::Item> + Send + 'static>
527    where
528        F: FnMut(&T) + Send + 'static,
529    {
530        let old_finalize = self.finalize;
531        let new_finalize = move |next: BoxedStageFn<T>| -> BoxedStageFn<S::Item> {
532            let mut next = next;
533            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
534                StageOp::Process(item) => {
535                    f(&item);
536                    next(StageOp::Process(item))
537                }
538                StageOp::Flush => next(StageOp::Flush),
539                StageOp::Close => next(StageOp::Close),
540            });
541            old_finalize(t_fn)
542        };
543        PipelineBuilder {
544            source: self.source,
545            source_id: self.source_id,
546            finalize: new_finalize,
547            error_policy: self.error_policy,
548            pending_stage_id: None,
549            dead_letter: self.dead_letter,
550            _marker: PhantomData,
551        }
552    }
553
554    /// Fallible 1:1 transform. Honors the active [`ErrorPolicy`].
555    pub fn try_map<U, F, E>(
556        self,
557        mut f: F,
558    ) -> PipelineBuilder<U, S, impl FnOnce(BoxedStageFn<U>) -> BoxedStageFn<S::Item> + Send + 'static>
559    where
560        U: Send + 'static,
561        E: StageError,
562        F: FnMut(T) -> core::result::Result<U, E> + Send + 'static,
563    {
564        let stage_id = self.pending_stage_id.unwrap_or(StageId::new("try_map"));
565        let policy = self.error_policy;
566        let old_finalize = self.finalize;
567        let dead_letter = self.dead_letter.clone();
568        let new_finalize = move |next: BoxedStageFn<U>| -> BoxedStageFn<S::Item> {
569            let mut next = next;
570            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
571                StageOp::Process(item) => match f(item) {
572                    Ok(out) => next(StageOp::Process(out)),
573                    Err(e) => handle_stage_error(policy, stage_id, e, &dead_letter),
574                },
575                StageOp::Flush => next(StageOp::Flush),
576                StageOp::Close => next(StageOp::Close),
577            });
578            old_finalize(t_fn)
579        };
580        PipelineBuilder {
581            source: self.source,
582            source_id: self.source_id,
583            finalize: new_finalize,
584            error_policy: self.error_policy,
585            pending_stage_id: None,
586            dead_letter: self.dead_letter,
587            _marker: PhantomData,
588        }
589    }
590
591    /// Plug in a custom [`Stage`].
592    pub fn stage<St>(
593        self,
594        mut stage: St,
595    ) -> PipelineBuilder<
596        St::Output,
597        S,
598        impl FnOnce(BoxedStageFn<St::Output>) -> BoxedStageFn<S::Item> + Send + 'static,
599    >
600    where
601        St: Stage<Input = T> + Send + 'static,
602        St::Output: Send + 'static,
603        St::Error: 'static,
604    {
605        let stage_id = self.pending_stage_id.unwrap_or(StageId::new("stage"));
606        let policy = self.error_policy;
607        let old_finalize = self.finalize;
608        let dead_letter = self.dead_letter.clone();
609        let new_finalize = move |next: BoxedStageFn<St::Output>| -> BoxedStageFn<S::Item> {
610            let mut next = next;
611            let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
612                StageOp::Process(item) => {
613                    let mut adapter = StageEmit {
614                        next_fn: &mut next,
615                        cached_err: None,
616                    };
617                    let stage_result = stage.process(item, &mut adapter);
618                    if let Some(err) = adapter.cached_err {
619                        return Err(err);
620                    }
621                    match stage_result {
622                        Ok(()) => Ok(()),
623                        Err(e) => handle_stage_error(policy, stage_id, e, &dead_letter),
624                    }
625                }
626                StageOp::Flush => {
627                    let mut adapter = StageEmit {
628                        next_fn: &mut next,
629                        cached_err: None,
630                    };
631                    let stage_result = stage.flush(&mut adapter);
632                    if let Some(err) = adapter.cached_err {
633                        return Err(err);
634                    }
635                    match stage_result {
636                        Ok(()) => next(StageOp::Flush),
637                        Err(e) => handle_stage_error(policy, stage_id, e, &dead_letter),
638                    }
639                }
640                StageOp::Close => next(StageOp::Close),
641            });
642            old_finalize(t_fn)
643        };
644        PipelineBuilder {
645            source: self.source,
646            source_id: self.source_id,
647            finalize: new_finalize,
648            error_policy: self.error_policy,
649            pending_stage_id: None,
650            dead_letter: self.dead_letter,
651            _marker: PhantomData,
652        }
653    }
654
655    /// Group items into [`Batch<T>`] according to `policy`. The policy
656    /// must have at least one trigger configured ([`BatchPolicy::max_items`]
657    /// or [`BatchPolicy::max_age`]).
658    ///
659    /// # Panics
660    ///
661    /// Panics if `policy` has no configured trigger or has a
662    /// `max_bytes` trigger without `T: ByteSize` (use
663    /// [`PipelineBuilder::batch_bytes`] for byte-aware batching).
664    pub fn batch(
665        mut self,
666        policy: BatchPolicy,
667    ) -> PipelineBuilder<
668        Batch<T>,
669        S,
670        impl FnOnce(BoxedStageFn<Batch<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
671    > {
672        assert!(
673            policy.has_trigger(),
674            "BatchPolicy must have at least one trigger configured"
675        );
676        assert!(
677            policy.bytes_limit().is_none(),
678            "BatchPolicy::max_bytes requires PipelineBuilder::batch_bytes (T: ByteSize)"
679        );
680        let id = self
681            .pending_stage_id
682            .take()
683            .unwrap_or(StageId::new("batch"));
684        self.stage_id(id).stage(BatchStage::<T>::new(policy))
685    }
686
687    /// Byte-aware batching. Required when `policy` has a
688    /// [`BatchPolicy::max_bytes`] trigger.
689    ///
690    /// # Panics
691    ///
692    /// Panics if `policy` has no configured trigger.
693    pub fn batch_bytes(
694        mut self,
695        policy: BatchPolicy,
696    ) -> PipelineBuilder<
697        Batch<T>,
698        S,
699        impl FnOnce(BoxedStageFn<Batch<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
700    >
701    where
702        T: ByteSize,
703    {
704        assert!(
705            policy.has_trigger(),
706            "BatchPolicy must have at least one trigger configured"
707        );
708        let id = self
709            .pending_stage_id
710            .take()
711            .unwrap_or(StageId::new("batch"));
712        self.stage_id(id).stage(BatchStageBytes::<T>::new(policy))
713    }
714
715    /// Install a windowing stage using the default
716    /// [`crate::SystemClock`]. The carrier type changes from `T` to
717    /// [`crate::Window<T>`] after the call.
718    ///
719    /// `T: Clone` is required because sliding windows duplicate items
720    /// across overlapping windows. Consumers with non-Clone types and
721    /// tumbling semantics can use `.batch()` with `BatchPolicy::max_age`
722    /// as a substitute.
723    #[cfg(feature = "std")]
724    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
725    pub fn window(
726        mut self,
727        policy: crate::window::WindowPolicy,
728    ) -> PipelineBuilder<
729        crate::window::Window<T>,
730        S,
731        impl FnOnce(BoxedStageFn<crate::window::Window<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
732    >
733    where
734        T: Clone,
735    {
736        let id = self
737            .pending_stage_id
738            .take()
739            .unwrap_or(StageId::new("window"));
740        self.stage_id(id).stage(
741            crate::window::WindowStage::<T, crate::window::SystemClock>::new(
742                policy,
743                crate::window::SystemClock,
744            ),
745        )
746    }
747
748    /// Install a windowing stage with a user-supplied [`crate::Clock`].
749    /// Useful for deterministic tests and for hosts that have their
750    /// own monotonic time source.
751    #[cfg(feature = "std")]
752    #[cfg_attr(docsrs, doc(cfg(feature = "std")))]
753    pub fn window_with<C>(
754        mut self,
755        policy: crate::window::WindowPolicy,
756        clock: C,
757    ) -> PipelineBuilder<
758        crate::window::Window<T>,
759        S,
760        impl FnOnce(BoxedStageFn<crate::window::Window<T>>) -> BoxedStageFn<S::Item> + Send + 'static,
761    >
762    where
763        T: Clone,
764        C: crate::window::Clock + 'static,
765    {
766        let id = self
767            .pending_stage_id
768            .take()
769            .unwrap_or(StageId::new("window"));
770        self.stage_id(id)
771            .stage(crate::window::WindowStage::<T, C>::new(policy, clock))
772    }
773
774    /// Terminate the pipeline with a [`Sink`].
775    pub fn sink<Sk>(self, sink: Sk) -> Pipeline<S>
776    where
777        Sk: Sink<Item = T> + Send + 'static,
778        Sk::Error: 'static,
779    {
780        let mut sink = sink;
781        let sink_id = self.pending_stage_id.unwrap_or(StageId::new("sink"));
782        let t_fn: BoxedStageFn<T> = Box::new(move |op| match op {
783            StageOp::Process(item) => sink.write(item).map_err(|e| Error::Sink {
784                stage: sink_id,
785                source: Box::new(e),
786            }),
787            StageOp::Flush => sink.flush().map_err(|e| Error::Sink {
788                stage: sink_id,
789                source: Box::new(e),
790            }),
791            StageOp::Close => sink.close().map_err(|e| Error::Sink {
792                stage: sink_id,
793                source: Box::new(e),
794            }),
795        });
796        let item_fn: BoxedStageFn<S::Item> = (self.finalize)(t_fn);
797        Pipeline {
798            source: self.source,
799            source_id: self.source_id,
800            stage_fn: item_fn,
801            dead_letter: self.dead_letter,
802        }
803    }
804}
805
806// ---------------------------------------------------------------------
807// Synchronous run loop. Used by both SyncDriver and ThreadedDriver
808// (the latter just wraps this in a spawned thread).
809// ---------------------------------------------------------------------
810
811pub(crate) fn run_sync<S>(mut pipeline: Pipeline<S>) -> Result<RunStats>
812where
813    S: Source + 'static,
814    S::Item: 'static,
815    S::Error: 'static,
816{
817    #[cfg(feature = "std")]
818    let start = std::time::Instant::now();
819
820    let mut stats = RunStats::default();
821
822    loop {
823        match pipeline.source.pull() {
824            Ok(Some(item)) => {
825                stats.items_in = stats.items_in.saturating_add(1);
826                (pipeline.stage_fn)(StageOp::Process(item))?;
827            }
828            Ok(None) => break,
829            Err(e) => {
830                return Err(Error::Source {
831                    stage: pipeline.source_id,
832                    source: Box::new(e),
833                });
834            }
835        }
836    }
837
838    (pipeline.stage_fn)(StageOp::Flush)?;
839    (pipeline.stage_fn)(StageOp::Close)?;
840    let _ = pipeline.source.close();
841
842    pipeline.dead_letter.finish()?;
843
844    #[cfg(feature = "std")]
845    {
846        stats.duration = start.elapsed();
847    }
848
849    Ok(stats)
850}