Skip to main content

static_conduit/
lib.rs

1//! # Conduit
2//!
3//! `conduit` is a type-safe, zero-cost pipeline engine designed for structured
4//! data transformation.
5//!
6//! It uses a recursive, static-dispatch architecture to ensure that the
7//! entire transformation chain is validated at compile-time with no
8//! runtime overhead.
9
10/// Represents the various error states a pipeline stage can encounter.
11#[derive(Debug)]
12pub enum PipelineError {
13    /// An error that might be resolved by retrying the operation (e.g., a network timeout).
14    Recoverable(String),
15    /// An error that cannot be resolved by retrying (e.g., a validation failure).
16    Permanent(String),
17}
18
19/// The core trait for all transformation logic.
20///
21/// Any struct implementing `Step` can be plugged into a [`Pipeline`].
22pub trait Step {
23    /// The type of data this stage accepts.
24    type Input;
25    /// The type of data this stage produces.
26    type Output;
27
28    /// Executes the logic for this specific stage.
29    fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError>;
30    /// Decorates this step with a policy
31    fn with<P>(self, policy: P) -> P::Decorated
32    where
33        P: Policy<Self>,
34        Self: Sized,
35    {
36        policy.apply(self)
37    }
38}
39/// Policy trait, decorates a step with some extra behavior, such as retrying or logging
40///
41/// Any struct implementing `Policy` can be plugged into a [`Pipeline`].
42pub trait Policy<S: Step> {
43    type Decorated: Step<Input = S::Input, Output = S::Output>;
44    fn apply(self, step: S) -> Self::Decorated;
45}
46/// A completed execution chain that can process data.
47///
48/// Use [`Pipeline::builder`] to construct a new instance.
49pub struct Pipeline<S> {
50    steps: S,
51}
52
53impl Pipeline<()> {
54    /// Starts the construction of a new pipeline.
55    ///
56    /// To ensure type safety, the user must specify the initial input type:
57    /// `Pipeline::builder::<i32>()`
58    pub fn builder<I>() -> PipelineBuilder<NoOp<I>> {
59        PipelineBuilder {
60            start_node: NoOp::new(),
61        }
62    }
63}
64
65impl<S> Pipeline<S>
66where
67    S: Step,
68{
69    /// Feeds data into the start of the pipeline and returns the final result.
70    ///
71    /// This method executes all stages sequentially. Execution stops upon completion
72    /// or on PipelineError
73    pub fn run(&self, input: S::Input) -> Result<S::Output, PipelineError> {
74        self.steps.execute(input)
75    }
76}
77
78impl<S> Pipeline<S>
79where
80    S: Step + 'static,
81{
82    /// Erases the concrete type of the pipeline, returning a boxed trait object.
83    /// This is useful for storing different pipelines in a single collection as long as they share the same
84    /// Input and Output types. The matching types can be bypassed by creating an
85    /// Enum Wrapper of known Stages.
86    pub fn into_boxed(self) -> Box<dyn Step<Input = S::Input, Output = S::Output>> {
87        Box::new(self.steps)
88    }
89}
90
91/// A utility for assembling [`Step`] implementations into a linear chain.
92pub struct PipelineBuilder<S> {
93    start_node: S,
94}
95
96impl<S> PipelineBuilder<S>
97where
98    S: Step,
99{
100    /// Appends a new stage to the end of the current pipeline.
101    pub fn add_stage<A>(self, action: A) -> PipelineBuilder<PipelineStep<S, A>>
102    where
103        A: Step<Input = S::Output>,
104    {
105        let step = PipelineStep {
106            current_step: self.start_node,
107            next_step: action,
108        };
109        PipelineBuilder { start_node: step }
110    }
111
112    /// Appends a closure-based transformation to the pipeline.
113    pub fn add_map<F, O>(
114        self,
115        f: F,
116    ) -> PipelineBuilder<PipelineStep<S, ClosureStep<F, S::Output, O>>>
117    where
118        F: Fn(S::Output) -> Result<O, PipelineError>,
119    {
120        let wrapper = ClosureStep::new(f);
121        self.add_stage(wrapper)
122    }
123
124    /// Seals the pipeline and returns a runnable [`Pipeline`] instance.
125    ///
126    /// This finalizes the internal recursive structure and adds a termination node.
127    pub fn build(self) -> Pipeline<impl Step<Input = S::Input, Output = S::Output>> {
128        let final_chain = PipelineStep {
129            current_step: self.start_node,
130            next_step: NoOp::new(),
131        };
132
133        Pipeline { steps: final_chain }
134    }
135}
136
137/// An internal wrapper that links two stages together.
138#[doc(hidden)]
139pub struct PipelineStep<Current, Next> {
140    current_step: Current,
141    next_step: Next,
142}
143
144impl<Current, Next> Step for PipelineStep<Current, Next>
145where
146    Current: Step,
147    Next: Step<Input = Current::Output>,
148{
149    type Input = Current::Input;
150    type Output = Next::Output;
151    fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
152        let res = self.current_step.execute(input)?;
153        self.next_step.execute(res)
154    }
155}
156#[doc(hidden)]
157pub struct ClosureStep<F, I, O> {
158    closure: F,
159    _market: std::marker::PhantomData<(I, O)>,
160}
161
162impl<F, I, O> ClosureStep<F, I, O>
163where
164    F: Fn(I) -> Result<O, PipelineError>,
165{
166    fn new(closure: F) -> Self {
167        ClosureStep {
168            closure,
169            _market: std::marker::PhantomData,
170        }
171    }
172}
173
174impl<F, I, O> Step for ClosureStep<F, I, O>
175where
176    F: Fn(I) -> Result<O, PipelineError>,
177{
178    type Input = I;
179    type Output = O;
180    fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
181        (self.closure)(input)
182    }
183}
184
185/// An internal marker stage that terminates or starts a pipeline chain.
186#[doc(hidden)]
187pub struct NoOp<T> {
188    _marker: std::marker::PhantomData<T>,
189}
190
191impl<T> NoOp<T> {
192    fn new() -> Self {
193        Self {
194            _marker: std::marker::PhantomData,
195        }
196    }
197}
198
199impl<T> Step for NoOp<T> {
200    type Input = T;
201    type Output = T;
202
203    fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
204        Ok(input)
205    }
206}
207
208/// A configuration for retrying a [`Step`] when encountering recoverable errors.
209pub struct Retry {
210    max_retries: usize,
211}
212
213impl Retry {
214    /// Configures a policy to retry a step a specific number of times.
215    pub fn times(n: usize) -> Self {
216        Self { max_retries: n }
217    }
218}
219
220impl<S: Step> Policy<S> for Retry
221where
222    S::Input: Clone,
223{
224    type Decorated = RetryStep<S>;
225    fn apply(self, step: S) -> Self::Decorated {
226        RetryStep {
227            max_retries: 1 + self.max_retries,
228            inner: step,
229        }
230    }
231}
232
233/// An internal wrapper that executes retry logic for a decorated step.
234#[doc(hidden)]
235pub struct RetryStep<S> {
236    inner: S,
237    max_retries: usize,
238}
239
240impl<S> Step for RetryStep<S>
241where
242    S: Step,
243    S::Input: Clone,
244{
245    type Input = S::Input;
246    type Output = S::Output;
247
248    fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
249        let mut last_err = None;
250        for _ in 0..self.max_retries {
251            match self.inner.execute(input.clone()) {
252                Ok(output) => return Ok(output),
253                Err(PipelineError::Permanent(e)) => return Err(PipelineError::Permanent(e)),
254                Err(PipelineError::Recoverable(e)) => {
255                    last_err = Some(PipelineError::Recoverable(e))
256                }
257            }
258        }
259        Err(last_err.unwrap_or_else(|| {
260            PipelineError::Permanent("Retry logic exhausted with no attempts".to_string())
261        }))
262    }
263}
264pub mod prelude {
265    pub use crate::{Pipeline, PipelineBuilder, PipelineError, Policy, Retry, Step};
266}
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    struct MultiplyByTwo;
272    struct SubtractTen;
273
274    impl Step for MultiplyByTwo {
275        type Input = i32;
276        type Output = i32;
277        fn execute(&self, input: i32) -> Result<i32, PipelineError> {
278            Ok(input * 2)
279        }
280    }
281
282    impl Step for SubtractTen {
283        type Input = i32;
284        type Output = i32;
285        fn execute(&self, input: i32) -> Result<i32, PipelineError> {
286            Ok(input - 10)
287        }
288    }
289
290    #[derive(Debug, PartialEq)]
291    struct RawUser {
292        username: String,
293        access_level: u8,
294    }
295    #[derive(Debug, PartialEq)]
296    struct ProcessedUser {
297        id: usize,
298        display_name: String,
299    }
300
301    struct SanitizeName;
302    impl Step for SanitizeName {
303        type Input = RawUser;
304        type Output = String;
305        fn execute(&self, input: RawUser) -> Result<String, PipelineError> {
306            Ok(input.username.trim().to_lowercase())
307        }
308    }
309
310    struct CreateProfile;
311    impl Step for CreateProfile {
312        type Input = String;
313        type Output = ProcessedUser;
314        fn execute(&self, input: String) -> Result<ProcessedUser, PipelineError> {
315            Ok(ProcessedUser {
316                id: 101,
317                display_name: format!("User: {}", input),
318            })
319        }
320    }
321
322    struct ValidateId;
323    impl Step for ValidateId {
324        type Input = ProcessedUser;
325        type Output = bool;
326        fn execute(&self, input: ProcessedUser) -> Result<bool, PipelineError> {
327            Ok(input.id > 0)
328        }
329    }
330
331    #[test]
332    fn test_math_pipeline() {
333        let pipe = Pipeline::builder::<i32>()
334            .add_stage(MultiplyByTwo)
335            .add_stage(SubtractTen)
336            .build();
337
338        assert_eq!(pipe.run(20).unwrap(), 30);
339    }
340
341    #[test]
342    fn test_heterogeneous_pipeline_vec() {
343        let pipe_a = Pipeline::builder::<i32>()
344            .add_stage(MultiplyByTwo)
345            .add_stage(SubtractTen)
346            .build()
347            .into_boxed();
348
349        let pipe_b = Pipeline::builder::<i32>()
350            .add_stage(MultiplyByTwo)
351            .build()
352            .into_boxed();
353
354        let pipeline_registry: Vec<Box<dyn Step<Input = i32, Output = i32>>> = vec![pipe_a, pipe_b];
355        let results: Vec<i32> = pipeline_registry
356            .iter()
357            .map(|p| p.execute(20).unwrap())
358            .collect();
359        assert_eq!(results, vec![30, 40]);
360    }
361
362    #[test]
363    fn test_recoverable_error_flow() {
364        struct FailStage;
365        impl Step for FailStage {
366            type Input = i32;
367            type Output = i32;
368            fn execute(&self, _: i32) -> Result<i32, PipelineError> {
369                Err(PipelineError::Recoverable("Temporary glitch".to_string()))
370            }
371        }
372
373        let pipe = Pipeline::builder::<i32>().add_stage(FailStage).build();
374        let result = pipe.run(10);
375
376        match result {
377            Err(PipelineError::Recoverable(msg)) => assert_eq!(msg, "Temporary glitch"),
378            _ => panic!("Expected a recoverable error"),
379        }
380    }
381
382    #[test]
383    fn test_transformation_chain() {
384        let user_pipe = Pipeline::builder::<RawUser>()
385            .add_stage(SanitizeName)
386            .add_stage(CreateProfile)
387            .add_stage(ValidateId)
388            .build();
389
390        let input = RawUser {
391            username: "  GUEST_USER  ".to_string(),
392            access_level: 1,
393        };
394        assert!(user_pipe.run(input).unwrap());
395    }
396
397    #[test]
398    fn test_closure_only_pipeline() {
399        let pipe = Pipeline::builder::<i32>()
400            .add_map(|x| Ok(x + 5))
401            .add_map(|x| Ok(x.to_string()))
402            .build();
403
404        let result = pipe.run(10).unwrap();
405        assert_eq!(result, "15");
406    }
407
408    #[test]
409    fn test_mixed_struct_and_closure_pipeline() {
410        let pipe = Pipeline::builder::<i32>()
411            .add_stage(MultiplyByTwo)
412            .add_stage(SubtractTen)
413            .add_map(|x| {
414                if x < 0 {
415                    Ok(format!("Negative: {}", x))
416                } else {
417                    Ok(format!("Positive: {}", x))
418                }
419            })
420            .build();
421
422        assert_eq!(pipe.run(5).unwrap(), "Positive: 0");
423        assert_eq!(pipe.run(2).unwrap(), "Negative: -6");
424    }
425    #[test]
426    fn test_retry_logic_success_after_flaking() {
427        use std::sync::Arc;
428        use std::sync::atomic::{AtomicUsize, Ordering};
429
430        // We use Arc/Atomic to track calls across the cloned executions
431        struct FlakyStep(Arc<AtomicUsize>);
432        impl Step for FlakyStep {
433            type Input = i32;
434            type Output = i32;
435            fn execute(&self, input: i32) -> Result<i32, PipelineError> {
436                let attempts = self.0.fetch_add(1, Ordering::SeqCst);
437                if attempts < 2 {
438                    Err(PipelineError::Recoverable("Flaky".to_string()))
439                } else {
440                    Ok(input + 1)
441                }
442            }
443        }
444
445        let counter = Arc::new(AtomicUsize::new(0));
446        let pipe = Pipeline::builder::<i32>()
447            // Retry 2 times means 3 total attempts
448            .add_stage(FlakyStep(counter.clone()).with(Retry::times(2)))
449            .build();
450
451        let res = pipe.run(10).unwrap();
452        assert_eq!(res, 11);
453        assert_eq!(counter.load(Ordering::SeqCst), 3);
454    }
455
456    #[test]
457    fn test_retry_logic_exhaustion() {
458        struct AlwaysFail;
459        impl Step for AlwaysFail {
460            type Input = i32;
461            type Output = i32;
462            fn execute(&self, _: i32) -> Result<i32, PipelineError> {
463                Err(PipelineError::Recoverable("Persistent Glitch".to_string()))
464            }
465        }
466
467        let pipe = Pipeline::builder::<i32>()
468            .add_stage(AlwaysFail.with(Retry::times(2)))
469            .build();
470
471        match pipe.run(10) {
472            Err(PipelineError::Recoverable(e)) => assert_eq!(e, "Persistent Glitch"),
473            _ => panic!("Expected recoverable error after exhaustion"),
474        }
475    }
476
477    #[test]
478    fn test_retry_logic_stops_on_permanent() {
479        use std::sync::Arc;
480        use std::sync::atomic::{AtomicUsize, Ordering};
481
482        struct PermanentFail(Arc<AtomicUsize>);
483        impl Step for PermanentFail {
484            type Input = i32;
485            type Output = i32;
486            fn execute(&self, _: i32) -> Result<i32, PipelineError> {
487                self.0.fetch_add(1, Ordering::SeqCst);
488                Err(PipelineError::Permanent("Fatal".to_string()))
489            }
490        }
491
492        let counter = Arc::new(AtomicUsize::new(0));
493        let pipe = Pipeline::builder::<i32>()
494            .add_stage(PermanentFail(counter.clone()).with(Retry::times(10)))
495            .build();
496
497        let _ = pipe.run(10);
498        // Even with 10 retries, it should stop after the 1st attempt
499        assert_eq!(counter.load(Ordering::SeqCst), 1);
500    }
501    #[test]
502    fn test_policy_order_logger_outside_retry() {
503        use std::sync::Arc;
504        use std::sync::atomic::{AtomicUsize, Ordering};
505
506        let step_counter = Arc::new(AtomicUsize::new(0));
507        let logger_counter = Arc::new(AtomicUsize::new(0));
508
509        struct FlakyStep(Arc<AtomicUsize>);
510        impl Step for FlakyStep {
511            type Input = i32;
512            type Output = i32;
513            fn execute(&self, input: i32) -> Result<i32, PipelineError> {
514                let count = self.0.fetch_add(1, Ordering::SeqCst);
515                if count < 2 {
516                    Err(PipelineError::Recoverable("fail".into()))
517                } else {
518                    Ok(input)
519                }
520            }
521        }
522
523        // 3. Define a Mock Logger Policy
524        struct MockLogger(Arc<AtomicUsize>);
525        impl<S: Step> Policy<S> for MockLogger {
526            type Decorated = MockLoggerStep<S>;
527            fn apply(self, step: S) -> Self::Decorated {
528                MockLoggerStep {
529                    inner: step,
530                    counter: self.0,
531                }
532            }
533        }
534        struct MockLoggerStep<S> {
535            inner: S,
536            counter: Arc<AtomicUsize>,
537        }
538        impl<S: Step> Step for MockLoggerStep<S> {
539            type Input = S::Input;
540            type Output = S::Output;
541            fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
542                self.counter.fetch_add(1, Ordering::SeqCst);
543                self.inner.execute(input)
544            }
545        }
546
547        // TEST: [Logger [Retry [Step]]]
548        // The logger wraps the entire Retry block.
549        let pipe = Pipeline::builder::<i32>()
550            .add_stage(
551                FlakyStep(step_counter.clone())
552                    .with(Retry::times(2)) // Inner layer
553                    .with(MockLogger(logger_counter.clone())), // Outer layer
554            )
555            .build();
556
557        let res = pipe.run(10).unwrap();
558
559        // The step was attempted 3 times (2 fails + 1 success)
560        assert_eq!(step_counter.load(Ordering::SeqCst), 3);
561        // The Logger was only called ONCE because it sits outside the retry loop
562        assert_eq!(logger_counter.load(Ordering::SeqCst), 1);
563        assert_eq!(res, 10);
564    }
565
566    #[test]
567    fn test_policy_order_logger_inside_retry() {
568        use std::sync::Arc;
569        use std::sync::atomic::{AtomicUsize, Ordering};
570
571        let step_counter = Arc::new(AtomicUsize::new(0));
572        let logger_counter = Arc::new(AtomicUsize::new(0));
573
574        struct FlakyStep(Arc<AtomicUsize>);
575        impl Step for FlakyStep {
576            type Input = i32;
577            type Output = i32;
578            fn execute(&self, input: i32) -> Result<i32, PipelineError> {
579                let count = self.0.fetch_add(1, Ordering::SeqCst);
580                if count < 2 {
581                    Err(PipelineError::Recoverable("fail".into()))
582                } else {
583                    Ok(input)
584                }
585            }
586        }
587
588        struct MockLogger(Arc<AtomicUsize>);
589        impl<S: Step> Policy<S> for MockLogger {
590            type Decorated = MockLoggerStep<S>;
591            fn apply(self, step: S) -> Self::Decorated {
592                MockLoggerStep {
593                    inner: step,
594                    counter: self.0,
595                }
596            }
597        }
598        struct MockLoggerStep<S> {
599            inner: S,
600            counter: Arc<AtomicUsize>,
601        }
602        impl<S: Step> Step for MockLoggerStep<S> {
603            type Input = S::Input;
604            type Output = S::Output;
605            fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
606                self.counter.fetch_add(1, Ordering::SeqCst);
607                self.inner.execute(input)
608            }
609        }
610
611        // TEST: [Retry [Logger [Step]]]
612        // The Retry loop wraps the Logger.
613        let pipe = Pipeline::builder::<i32>()
614            .add_stage(
615                FlakyStep(step_counter.clone())
616                    .with(MockLogger(logger_counter.clone())) // Inner layer
617                    .with(Retry::times(2)), // Outer layer
618            )
619            .build();
620
621        let res = pipe.run(10).unwrap();
622
623        // The step was attempted 3 times
624        assert_eq!(step_counter.load(Ordering::SeqCst), 3);
625        // The Logger was also called 3 times because it is INSIDE the retry loop
626        assert_eq!(logger_counter.load(Ordering::SeqCst), 3);
627        assert_eq!(res, 10);
628    }
629}