#[derive(Debug)]
pub enum PipelineError {
Recoverable(String),
Permanent(String),
}
pub trait Step {
type Input;
type Output;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError>;
fn with<P>(self, policy: P) -> P::Decorated
where
P: Policy<Self>,
Self: Sized,
{
policy.apply(self)
}
}
pub trait Policy<S: Step> {
type Decorated: Step<Input = S::Input, Output = S::Output>;
fn apply(self, step: S) -> Self::Decorated;
}
pub struct Pipeline<S> {
steps: S,
}
impl Pipeline<()> {
pub fn builder<I>() -> PipelineBuilder<NoOp<I>> {
PipelineBuilder {
start_node: NoOp::new(),
}
}
}
impl<S> Pipeline<S>
where
S: Step,
{
pub fn run(&self, input: S::Input) -> Result<S::Output, PipelineError> {
self.steps.execute(input)
}
}
impl<S> Pipeline<S>
where
S: Step + 'static,
{
pub fn into_boxed(self) -> Box<dyn Step<Input = S::Input, Output = S::Output>> {
Box::new(self.steps)
}
}
pub struct PipelineBuilder<S> {
start_node: S,
}
impl<S> PipelineBuilder<S>
where
S: Step,
{
pub fn add_stage<A>(self, action: A) -> PipelineBuilder<PipelineStep<S, A>>
where
A: Step<Input = S::Output>,
{
let step = PipelineStep {
current_step: self.start_node,
next_step: action,
};
PipelineBuilder { start_node: step }
}
pub fn add_map<F, O>(
self,
f: F,
) -> PipelineBuilder<PipelineStep<S, ClosureStep<F, S::Output, O>>>
where
F: Fn(S::Output) -> Result<O, PipelineError>,
{
let wrapper = ClosureStep::new(f);
self.add_stage(wrapper)
}
pub fn build(self) -> Pipeline<impl Step<Input = S::Input, Output = S::Output>> {
let final_chain = PipelineStep {
current_step: self.start_node,
next_step: NoOp::new(),
};
Pipeline { steps: final_chain }
}
}
#[doc(hidden)]
pub struct PipelineStep<Current, Next> {
current_step: Current,
next_step: Next,
}
impl<Current, Next> Step for PipelineStep<Current, Next>
where
Current: Step,
Next: Step<Input = Current::Output>,
{
type Input = Current::Input;
type Output = Next::Output;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
let res = self.current_step.execute(input)?;
self.next_step.execute(res)
}
}
#[doc(hidden)]
pub struct ClosureStep<F, I, O> {
closure: F,
_market: std::marker::PhantomData<(I, O)>,
}
impl<F, I, O> ClosureStep<F, I, O>
where
F: Fn(I) -> Result<O, PipelineError>,
{
fn new(closure: F) -> Self {
ClosureStep {
closure,
_market: std::marker::PhantomData,
}
}
}
impl<F, I, O> Step for ClosureStep<F, I, O>
where
F: Fn(I) -> Result<O, PipelineError>,
{
type Input = I;
type Output = O;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
(self.closure)(input)
}
}
#[doc(hidden)]
pub struct NoOp<T> {
_marker: std::marker::PhantomData<T>,
}
impl<T> NoOp<T> {
fn new() -> Self {
Self {
_marker: std::marker::PhantomData,
}
}
}
impl<T> Step for NoOp<T> {
type Input = T;
type Output = T;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
Ok(input)
}
}
pub struct Retry {
max_retries: usize,
}
impl Retry {
pub fn times(n: usize) -> Self {
Self { max_retries: n }
}
}
impl<S: Step> Policy<S> for Retry
where
S::Input: Clone,
{
type Decorated = RetryStep<S>;
fn apply(self, step: S) -> Self::Decorated {
RetryStep {
max_retries: 1 + self.max_retries,
inner: step,
}
}
}
#[doc(hidden)]
pub struct RetryStep<S> {
inner: S,
max_retries: usize,
}
impl<S> Step for RetryStep<S>
where
S: Step,
S::Input: Clone,
{
type Input = S::Input;
type Output = S::Output;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
let mut last_err = None;
for _ in 0..self.max_retries {
match self.inner.execute(input.clone()) {
Ok(output) => return Ok(output),
Err(PipelineError::Permanent(e)) => return Err(PipelineError::Permanent(e)),
Err(PipelineError::Recoverable(e)) => {
last_err = Some(PipelineError::Recoverable(e))
}
}
}
Err(last_err.unwrap_or_else(|| {
PipelineError::Permanent("Retry logic exhausted with no attempts".to_string())
}))
}
}
pub mod prelude {
pub use crate::{Pipeline, PipelineBuilder, PipelineError, Policy, Retry, Step};
}
#[cfg(test)]
mod tests {
use super::*;
struct MultiplyByTwo;
struct SubtractTen;
impl Step for MultiplyByTwo {
type Input = i32;
type Output = i32;
fn execute(&self, input: i32) -> Result<i32, PipelineError> {
Ok(input * 2)
}
}
impl Step for SubtractTen {
type Input = i32;
type Output = i32;
fn execute(&self, input: i32) -> Result<i32, PipelineError> {
Ok(input - 10)
}
}
#[derive(Debug, PartialEq)]
struct RawUser {
username: String,
access_level: u8,
}
#[derive(Debug, PartialEq)]
struct ProcessedUser {
id: usize,
display_name: String,
}
struct SanitizeName;
impl Step for SanitizeName {
type Input = RawUser;
type Output = String;
fn execute(&self, input: RawUser) -> Result<String, PipelineError> {
Ok(input.username.trim().to_lowercase())
}
}
struct CreateProfile;
impl Step for CreateProfile {
type Input = String;
type Output = ProcessedUser;
fn execute(&self, input: String) -> Result<ProcessedUser, PipelineError> {
Ok(ProcessedUser {
id: 101,
display_name: format!("User: {}", input),
})
}
}
struct ValidateId;
impl Step for ValidateId {
type Input = ProcessedUser;
type Output = bool;
fn execute(&self, input: ProcessedUser) -> Result<bool, PipelineError> {
Ok(input.id > 0)
}
}
#[test]
fn test_math_pipeline() {
let pipe = Pipeline::builder::<i32>()
.add_stage(MultiplyByTwo)
.add_stage(SubtractTen)
.build();
assert_eq!(pipe.run(20).unwrap(), 30);
}
#[test]
fn test_heterogeneous_pipeline_vec() {
let pipe_a = Pipeline::builder::<i32>()
.add_stage(MultiplyByTwo)
.add_stage(SubtractTen)
.build()
.into_boxed();
let pipe_b = Pipeline::builder::<i32>()
.add_stage(MultiplyByTwo)
.build()
.into_boxed();
let pipeline_registry: Vec<Box<dyn Step<Input = i32, Output = i32>>> = vec![pipe_a, pipe_b];
let results: Vec<i32> = pipeline_registry
.iter()
.map(|p| p.execute(20).unwrap())
.collect();
assert_eq!(results, vec![30, 40]);
}
#[test]
fn test_recoverable_error_flow() {
struct FailStage;
impl Step for FailStage {
type Input = i32;
type Output = i32;
fn execute(&self, _: i32) -> Result<i32, PipelineError> {
Err(PipelineError::Recoverable("Temporary glitch".to_string()))
}
}
let pipe = Pipeline::builder::<i32>().add_stage(FailStage).build();
let result = pipe.run(10);
match result {
Err(PipelineError::Recoverable(msg)) => assert_eq!(msg, "Temporary glitch"),
_ => panic!("Expected a recoverable error"),
}
}
#[test]
fn test_transformation_chain() {
let user_pipe = Pipeline::builder::<RawUser>()
.add_stage(SanitizeName)
.add_stage(CreateProfile)
.add_stage(ValidateId)
.build();
let input = RawUser {
username: " GUEST_USER ".to_string(),
access_level: 1,
};
assert!(user_pipe.run(input).unwrap());
}
#[test]
fn test_closure_only_pipeline() {
let pipe = Pipeline::builder::<i32>()
.add_map(|x| Ok(x + 5))
.add_map(|x| Ok(x.to_string()))
.build();
let result = pipe.run(10).unwrap();
assert_eq!(result, "15");
}
#[test]
fn test_mixed_struct_and_closure_pipeline() {
let pipe = Pipeline::builder::<i32>()
.add_stage(MultiplyByTwo)
.add_stage(SubtractTen)
.add_map(|x| {
if x < 0 {
Ok(format!("Negative: {}", x))
} else {
Ok(format!("Positive: {}", x))
}
})
.build();
assert_eq!(pipe.run(5).unwrap(), "Positive: 0");
assert_eq!(pipe.run(2).unwrap(), "Negative: -6");
}
#[test]
fn test_retry_logic_success_after_flaking() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct FlakyStep(Arc<AtomicUsize>);
impl Step for FlakyStep {
type Input = i32;
type Output = i32;
fn execute(&self, input: i32) -> Result<i32, PipelineError> {
let attempts = self.0.fetch_add(1, Ordering::SeqCst);
if attempts < 2 {
Err(PipelineError::Recoverable("Flaky".to_string()))
} else {
Ok(input + 1)
}
}
}
let counter = Arc::new(AtomicUsize::new(0));
let pipe = Pipeline::builder::<i32>()
.add_stage(FlakyStep(counter.clone()).with(Retry::times(2)))
.build();
let res = pipe.run(10).unwrap();
assert_eq!(res, 11);
assert_eq!(counter.load(Ordering::SeqCst), 3);
}
#[test]
fn test_retry_logic_exhaustion() {
struct AlwaysFail;
impl Step for AlwaysFail {
type Input = i32;
type Output = i32;
fn execute(&self, _: i32) -> Result<i32, PipelineError> {
Err(PipelineError::Recoverable("Persistent Glitch".to_string()))
}
}
let pipe = Pipeline::builder::<i32>()
.add_stage(AlwaysFail.with(Retry::times(2)))
.build();
match pipe.run(10) {
Err(PipelineError::Recoverable(e)) => assert_eq!(e, "Persistent Glitch"),
_ => panic!("Expected recoverable error after exhaustion"),
}
}
#[test]
fn test_retry_logic_stops_on_permanent() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
struct PermanentFail(Arc<AtomicUsize>);
impl Step for PermanentFail {
type Input = i32;
type Output = i32;
fn execute(&self, _: i32) -> Result<i32, PipelineError> {
self.0.fetch_add(1, Ordering::SeqCst);
Err(PipelineError::Permanent("Fatal".to_string()))
}
}
let counter = Arc::new(AtomicUsize::new(0));
let pipe = Pipeline::builder::<i32>()
.add_stage(PermanentFail(counter.clone()).with(Retry::times(10)))
.build();
let _ = pipe.run(10);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_policy_order_logger_outside_retry() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let step_counter = Arc::new(AtomicUsize::new(0));
let logger_counter = Arc::new(AtomicUsize::new(0));
struct FlakyStep(Arc<AtomicUsize>);
impl Step for FlakyStep {
type Input = i32;
type Output = i32;
fn execute(&self, input: i32) -> Result<i32, PipelineError> {
let count = self.0.fetch_add(1, Ordering::SeqCst);
if count < 2 {
Err(PipelineError::Recoverable("fail".into()))
} else {
Ok(input)
}
}
}
struct MockLogger(Arc<AtomicUsize>);
impl<S: Step> Policy<S> for MockLogger {
type Decorated = MockLoggerStep<S>;
fn apply(self, step: S) -> Self::Decorated {
MockLoggerStep {
inner: step,
counter: self.0,
}
}
}
struct MockLoggerStep<S> {
inner: S,
counter: Arc<AtomicUsize>,
}
impl<S: Step> Step for MockLoggerStep<S> {
type Input = S::Input;
type Output = S::Output;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
self.counter.fetch_add(1, Ordering::SeqCst);
self.inner.execute(input)
}
}
let pipe = Pipeline::builder::<i32>()
.add_stage(
FlakyStep(step_counter.clone())
.with(Retry::times(2)) .with(MockLogger(logger_counter.clone())), )
.build();
let res = pipe.run(10).unwrap();
assert_eq!(step_counter.load(Ordering::SeqCst), 3);
assert_eq!(logger_counter.load(Ordering::SeqCst), 1);
assert_eq!(res, 10);
}
#[test]
fn test_policy_order_logger_inside_retry() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let step_counter = Arc::new(AtomicUsize::new(0));
let logger_counter = Arc::new(AtomicUsize::new(0));
struct FlakyStep(Arc<AtomicUsize>);
impl Step for FlakyStep {
type Input = i32;
type Output = i32;
fn execute(&self, input: i32) -> Result<i32, PipelineError> {
let count = self.0.fetch_add(1, Ordering::SeqCst);
if count < 2 {
Err(PipelineError::Recoverable("fail".into()))
} else {
Ok(input)
}
}
}
struct MockLogger(Arc<AtomicUsize>);
impl<S: Step> Policy<S> for MockLogger {
type Decorated = MockLoggerStep<S>;
fn apply(self, step: S) -> Self::Decorated {
MockLoggerStep {
inner: step,
counter: self.0,
}
}
}
struct MockLoggerStep<S> {
inner: S,
counter: Arc<AtomicUsize>,
}
impl<S: Step> Step for MockLoggerStep<S> {
type Input = S::Input;
type Output = S::Output;
fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
self.counter.fetch_add(1, Ordering::SeqCst);
self.inner.execute(input)
}
}
let pipe = Pipeline::builder::<i32>()
.add_stage(
FlakyStep(step_counter.clone())
.with(MockLogger(logger_counter.clone())) .with(Retry::times(2)), )
.build();
let res = pipe.run(10).unwrap();
assert_eq!(step_counter.load(Ordering::SeqCst), 3);
assert_eq!(logger_counter.load(Ordering::SeqCst), 3);
assert_eq!(res, 10);
}
}