1#[derive(Debug)]
12pub enum PipelineError {
13 Recoverable(String),
15 Permanent(String),
17}
18
19pub trait Step {
23 type Input;
25 type Output;
27
28 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError>;
30 fn with<P>(self, policy: P) -> P::Decorated
32 where
33 P: Policy<Self>,
34 Self: Sized,
35 {
36 policy.apply(self)
37 }
38}
39pub trait Policy<S: Step> {
43 type Decorated: Step<Input = S::Input, Output = S::Output>;
44 fn apply(self, step: S) -> Self::Decorated;
45}
46pub struct Pipeline<S> {
50 steps: S,
51}
52
53impl Pipeline<()> {
54 pub fn builder<I>() -> PipelineBuilder<NoOp<I>> {
59 PipelineBuilder {
60 start_node: NoOp::new(),
61 }
62 }
63}
64
65impl<S> Pipeline<S>
66where
67 S: Step,
68{
69 pub fn run(&self, input: S::Input) -> Result<S::Output, PipelineError> {
74 self.steps.execute(input)
75 }
76}
77
78impl<S> Pipeline<S>
79where
80 S: Step + 'static,
81{
82 pub fn into_boxed(self) -> Box<dyn Step<Input = S::Input, Output = S::Output>> {
87 Box::new(self.steps)
88 }
89}
90
91pub struct PipelineBuilder<S> {
93 start_node: S,
94}
95
96impl<S> PipelineBuilder<S>
97where
98 S: Step,
99{
100 pub fn add_stage<A>(self, action: A) -> PipelineBuilder<PipelineStep<S, A>>
102 where
103 A: Step<Input = S::Output>,
104 {
105 let step = PipelineStep {
106 current_step: self.start_node,
107 next_step: action,
108 };
109 PipelineBuilder { start_node: step }
110 }
111
112 pub fn add_map<F, O>(
114 self,
115 f: F,
116 ) -> PipelineBuilder<PipelineStep<S, ClosureStep<F, S::Output, O>>>
117 where
118 F: Fn(S::Output) -> Result<O, PipelineError>,
119 {
120 let wrapper = ClosureStep::new(f);
121 self.add_stage(wrapper)
122 }
123
124 pub fn build(self) -> Pipeline<impl Step<Input = S::Input, Output = S::Output>> {
128 let final_chain = PipelineStep {
129 current_step: self.start_node,
130 next_step: NoOp::new(),
131 };
132
133 Pipeline { steps: final_chain }
134 }
135}
136
137#[doc(hidden)]
139pub struct PipelineStep<Current, Next> {
140 current_step: Current,
141 next_step: Next,
142}
143
144impl<Current, Next> Step for PipelineStep<Current, Next>
145where
146 Current: Step,
147 Next: Step<Input = Current::Output>,
148{
149 type Input = Current::Input;
150 type Output = Next::Output;
151 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
152 let res = self.current_step.execute(input)?;
153 self.next_step.execute(res)
154 }
155}
156#[doc(hidden)]
157pub struct ClosureStep<F, I, O> {
158 closure: F,
159 _market: std::marker::PhantomData<(I, O)>,
160}
161
162impl<F, I, O> ClosureStep<F, I, O>
163where
164 F: Fn(I) -> Result<O, PipelineError>,
165{
166 fn new(closure: F) -> Self {
167 ClosureStep {
168 closure,
169 _market: std::marker::PhantomData,
170 }
171 }
172}
173
174impl<F, I, O> Step for ClosureStep<F, I, O>
175where
176 F: Fn(I) -> Result<O, PipelineError>,
177{
178 type Input = I;
179 type Output = O;
180 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
181 (self.closure)(input)
182 }
183}
184
185#[doc(hidden)]
187pub struct NoOp<T> {
188 _marker: std::marker::PhantomData<T>,
189}
190
191impl<T> NoOp<T> {
192 fn new() -> Self {
193 Self {
194 _marker: std::marker::PhantomData,
195 }
196 }
197}
198
199impl<T> Step for NoOp<T> {
200 type Input = T;
201 type Output = T;
202
203 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
204 Ok(input)
205 }
206}
207
208pub struct Retry {
210 max_retries: usize,
211}
212
213impl Retry {
214 pub fn times(n: usize) -> Self {
216 Self { max_retries: n }
217 }
218}
219
220impl<S: Step> Policy<S> for Retry
221where
222 S::Input: Clone,
223{
224 type Decorated = RetryStep<S>;
225 fn apply(self, step: S) -> Self::Decorated {
226 RetryStep {
227 max_retries: 1 + self.max_retries,
228 inner: step,
229 }
230 }
231}
232
233#[doc(hidden)]
235pub struct RetryStep<S> {
236 inner: S,
237 max_retries: usize,
238}
239
240impl<S> Step for RetryStep<S>
241where
242 S: Step,
243 S::Input: Clone,
244{
245 type Input = S::Input;
246 type Output = S::Output;
247
248 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
249 let mut last_err = None;
250 for _ in 0..self.max_retries {
251 match self.inner.execute(input.clone()) {
252 Ok(output) => return Ok(output),
253 Err(PipelineError::Permanent(e)) => return Err(PipelineError::Permanent(e)),
254 Err(PipelineError::Recoverable(e)) => {
255 last_err = Some(PipelineError::Recoverable(e))
256 }
257 }
258 }
259 Err(last_err.unwrap_or_else(|| {
260 PipelineError::Permanent("Retry logic exhausted with no attempts".to_string())
261 }))
262 }
263}
264pub mod prelude {
265 pub use crate::{Pipeline, PipelineBuilder, PipelineError, Policy, Retry, Step};
266}
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 struct MultiplyByTwo;
272 struct SubtractTen;
273
274 impl Step for MultiplyByTwo {
275 type Input = i32;
276 type Output = i32;
277 fn execute(&self, input: i32) -> Result<i32, PipelineError> {
278 Ok(input * 2)
279 }
280 }
281
282 impl Step for SubtractTen {
283 type Input = i32;
284 type Output = i32;
285 fn execute(&self, input: i32) -> Result<i32, PipelineError> {
286 Ok(input - 10)
287 }
288 }
289
290 #[derive(Debug, PartialEq)]
291 struct RawUser {
292 username: String,
293 access_level: u8,
294 }
295 #[derive(Debug, PartialEq)]
296 struct ProcessedUser {
297 id: usize,
298 display_name: String,
299 }
300
301 struct SanitizeName;
302 impl Step for SanitizeName {
303 type Input = RawUser;
304 type Output = String;
305 fn execute(&self, input: RawUser) -> Result<String, PipelineError> {
306 Ok(input.username.trim().to_lowercase())
307 }
308 }
309
310 struct CreateProfile;
311 impl Step for CreateProfile {
312 type Input = String;
313 type Output = ProcessedUser;
314 fn execute(&self, input: String) -> Result<ProcessedUser, PipelineError> {
315 Ok(ProcessedUser {
316 id: 101,
317 display_name: format!("User: {}", input),
318 })
319 }
320 }
321
322 struct ValidateId;
323 impl Step for ValidateId {
324 type Input = ProcessedUser;
325 type Output = bool;
326 fn execute(&self, input: ProcessedUser) -> Result<bool, PipelineError> {
327 Ok(input.id > 0)
328 }
329 }
330
331 #[test]
332 fn test_math_pipeline() {
333 let pipe = Pipeline::builder::<i32>()
334 .add_stage(MultiplyByTwo)
335 .add_stage(SubtractTen)
336 .build();
337
338 assert_eq!(pipe.run(20).unwrap(), 30);
339 }
340
341 #[test]
342 fn test_heterogeneous_pipeline_vec() {
343 let pipe_a = Pipeline::builder::<i32>()
344 .add_stage(MultiplyByTwo)
345 .add_stage(SubtractTen)
346 .build()
347 .into_boxed();
348
349 let pipe_b = Pipeline::builder::<i32>()
350 .add_stage(MultiplyByTwo)
351 .build()
352 .into_boxed();
353
354 let pipeline_registry: Vec<Box<dyn Step<Input = i32, Output = i32>>> = vec![pipe_a, pipe_b];
355 let results: Vec<i32> = pipeline_registry
356 .iter()
357 .map(|p| p.execute(20).unwrap())
358 .collect();
359 assert_eq!(results, vec![30, 40]);
360 }
361
362 #[test]
363 fn test_recoverable_error_flow() {
364 struct FailStage;
365 impl Step for FailStage {
366 type Input = i32;
367 type Output = i32;
368 fn execute(&self, _: i32) -> Result<i32, PipelineError> {
369 Err(PipelineError::Recoverable("Temporary glitch".to_string()))
370 }
371 }
372
373 let pipe = Pipeline::builder::<i32>().add_stage(FailStage).build();
374 let result = pipe.run(10);
375
376 match result {
377 Err(PipelineError::Recoverable(msg)) => assert_eq!(msg, "Temporary glitch"),
378 _ => panic!("Expected a recoverable error"),
379 }
380 }
381
382 #[test]
383 fn test_transformation_chain() {
384 let user_pipe = Pipeline::builder::<RawUser>()
385 .add_stage(SanitizeName)
386 .add_stage(CreateProfile)
387 .add_stage(ValidateId)
388 .build();
389
390 let input = RawUser {
391 username: " GUEST_USER ".to_string(),
392 access_level: 1,
393 };
394 assert!(user_pipe.run(input).unwrap());
395 }
396
397 #[test]
398 fn test_closure_only_pipeline() {
399 let pipe = Pipeline::builder::<i32>()
400 .add_map(|x| Ok(x + 5))
401 .add_map(|x| Ok(x.to_string()))
402 .build();
403
404 let result = pipe.run(10).unwrap();
405 assert_eq!(result, "15");
406 }
407
408 #[test]
409 fn test_mixed_struct_and_closure_pipeline() {
410 let pipe = Pipeline::builder::<i32>()
411 .add_stage(MultiplyByTwo)
412 .add_stage(SubtractTen)
413 .add_map(|x| {
414 if x < 0 {
415 Ok(format!("Negative: {}", x))
416 } else {
417 Ok(format!("Positive: {}", x))
418 }
419 })
420 .build();
421
422 assert_eq!(pipe.run(5).unwrap(), "Positive: 0");
423 assert_eq!(pipe.run(2).unwrap(), "Negative: -6");
424 }
425 #[test]
426 fn test_retry_logic_success_after_flaking() {
427 use std::sync::Arc;
428 use std::sync::atomic::{AtomicUsize, Ordering};
429
430 struct FlakyStep(Arc<AtomicUsize>);
432 impl Step for FlakyStep {
433 type Input = i32;
434 type Output = i32;
435 fn execute(&self, input: i32) -> Result<i32, PipelineError> {
436 let attempts = self.0.fetch_add(1, Ordering::SeqCst);
437 if attempts < 2 {
438 Err(PipelineError::Recoverable("Flaky".to_string()))
439 } else {
440 Ok(input + 1)
441 }
442 }
443 }
444
445 let counter = Arc::new(AtomicUsize::new(0));
446 let pipe = Pipeline::builder::<i32>()
447 .add_stage(FlakyStep(counter.clone()).with(Retry::times(2)))
449 .build();
450
451 let res = pipe.run(10).unwrap();
452 assert_eq!(res, 11);
453 assert_eq!(counter.load(Ordering::SeqCst), 3);
454 }
455
456 #[test]
457 fn test_retry_logic_exhaustion() {
458 struct AlwaysFail;
459 impl Step for AlwaysFail {
460 type Input = i32;
461 type Output = i32;
462 fn execute(&self, _: i32) -> Result<i32, PipelineError> {
463 Err(PipelineError::Recoverable("Persistent Glitch".to_string()))
464 }
465 }
466
467 let pipe = Pipeline::builder::<i32>()
468 .add_stage(AlwaysFail.with(Retry::times(2)))
469 .build();
470
471 match pipe.run(10) {
472 Err(PipelineError::Recoverable(e)) => assert_eq!(e, "Persistent Glitch"),
473 _ => panic!("Expected recoverable error after exhaustion"),
474 }
475 }
476
477 #[test]
478 fn test_retry_logic_stops_on_permanent() {
479 use std::sync::Arc;
480 use std::sync::atomic::{AtomicUsize, Ordering};
481
482 struct PermanentFail(Arc<AtomicUsize>);
483 impl Step for PermanentFail {
484 type Input = i32;
485 type Output = i32;
486 fn execute(&self, _: i32) -> Result<i32, PipelineError> {
487 self.0.fetch_add(1, Ordering::SeqCst);
488 Err(PipelineError::Permanent("Fatal".to_string()))
489 }
490 }
491
492 let counter = Arc::new(AtomicUsize::new(0));
493 let pipe = Pipeline::builder::<i32>()
494 .add_stage(PermanentFail(counter.clone()).with(Retry::times(10)))
495 .build();
496
497 let _ = pipe.run(10);
498 assert_eq!(counter.load(Ordering::SeqCst), 1);
500 }
501 #[test]
502 fn test_policy_order_logger_outside_retry() {
503 use std::sync::Arc;
504 use std::sync::atomic::{AtomicUsize, Ordering};
505
506 let step_counter = Arc::new(AtomicUsize::new(0));
507 let logger_counter = Arc::new(AtomicUsize::new(0));
508
509 struct FlakyStep(Arc<AtomicUsize>);
510 impl Step for FlakyStep {
511 type Input = i32;
512 type Output = i32;
513 fn execute(&self, input: i32) -> Result<i32, PipelineError> {
514 let count = self.0.fetch_add(1, Ordering::SeqCst);
515 if count < 2 {
516 Err(PipelineError::Recoverable("fail".into()))
517 } else {
518 Ok(input)
519 }
520 }
521 }
522
523 struct MockLogger(Arc<AtomicUsize>);
525 impl<S: Step> Policy<S> for MockLogger {
526 type Decorated = MockLoggerStep<S>;
527 fn apply(self, step: S) -> Self::Decorated {
528 MockLoggerStep {
529 inner: step,
530 counter: self.0,
531 }
532 }
533 }
534 struct MockLoggerStep<S> {
535 inner: S,
536 counter: Arc<AtomicUsize>,
537 }
538 impl<S: Step> Step for MockLoggerStep<S> {
539 type Input = S::Input;
540 type Output = S::Output;
541 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
542 self.counter.fetch_add(1, Ordering::SeqCst);
543 self.inner.execute(input)
544 }
545 }
546
547 let pipe = Pipeline::builder::<i32>()
550 .add_stage(
551 FlakyStep(step_counter.clone())
552 .with(Retry::times(2)) .with(MockLogger(logger_counter.clone())), )
555 .build();
556
557 let res = pipe.run(10).unwrap();
558
559 assert_eq!(step_counter.load(Ordering::SeqCst), 3);
561 assert_eq!(logger_counter.load(Ordering::SeqCst), 1);
563 assert_eq!(res, 10);
564 }
565
566 #[test]
567 fn test_policy_order_logger_inside_retry() {
568 use std::sync::Arc;
569 use std::sync::atomic::{AtomicUsize, Ordering};
570
571 let step_counter = Arc::new(AtomicUsize::new(0));
572 let logger_counter = Arc::new(AtomicUsize::new(0));
573
574 struct FlakyStep(Arc<AtomicUsize>);
575 impl Step for FlakyStep {
576 type Input = i32;
577 type Output = i32;
578 fn execute(&self, input: i32) -> Result<i32, PipelineError> {
579 let count = self.0.fetch_add(1, Ordering::SeqCst);
580 if count < 2 {
581 Err(PipelineError::Recoverable("fail".into()))
582 } else {
583 Ok(input)
584 }
585 }
586 }
587
588 struct MockLogger(Arc<AtomicUsize>);
589 impl<S: Step> Policy<S> for MockLogger {
590 type Decorated = MockLoggerStep<S>;
591 fn apply(self, step: S) -> Self::Decorated {
592 MockLoggerStep {
593 inner: step,
594 counter: self.0,
595 }
596 }
597 }
598 struct MockLoggerStep<S> {
599 inner: S,
600 counter: Arc<AtomicUsize>,
601 }
602 impl<S: Step> Step for MockLoggerStep<S> {
603 type Input = S::Input;
604 type Output = S::Output;
605 fn execute(&self, input: Self::Input) -> Result<Self::Output, PipelineError> {
606 self.counter.fetch_add(1, Ordering::SeqCst);
607 self.inner.execute(input)
608 }
609 }
610
611 let pipe = Pipeline::builder::<i32>()
614 .add_stage(
615 FlakyStep(step_counter.clone())
616 .with(MockLogger(logger_counter.clone())) .with(Retry::times(2)), )
619 .build();
620
621 let res = pipe.run(10).unwrap();
622
623 assert_eq!(step_counter.load(Ordering::SeqCst), 3);
625 assert_eq!(logger_counter.load(Ordering::SeqCst), 3);
627 assert_eq!(res, 10);
628 }
629}