simple_pipeline/
lib.rs

1use async_recursion::async_recursion;
2use async_trait::async_trait;
3use derive_new::new;
4use std::{collections::VecDeque, fmt};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum PipelineError {
9    #[error("Invalid context: {0}")]
10    InvalidContext(String),
11
12    #[error("Internal: {0}")]
13    Internal(String),
14}
15
16/// execution result for the plug
17#[must_use]
18#[derive(new)]
19pub enum PlugResult<Ctx> {
20    Continue(Ctx),
21    #[allow(dead_code)]
22    Terminate(Ctx),
23    NewPipe {
24        ctx: Ctx,
25        plugs: Vec<Box<dyn Plug<Ctx>>>,
26    },
27    Err {
28        ctx: Ctx,
29        err: PipelineError,
30    },
31}
32
33/// plug trait that the building blocks in a pipeline shall implement
34#[async_trait]
35pub trait Plug<Ctx>: fmt::Display + Send + Sync + 'static {
36    async fn call(&self, ctx: Ctx) -> PlugResult<Ctx>;
37}
38
39/// A sequentially executed pipeline
40#[derive(Default)]
41pub struct Pipeline<Ctx> {
42    plugs: VecDeque<Box<dyn Plug<Ctx>>>,
43    executed: Vec<String>,
44}
45
46/// Pipeline response
47#[must_use]
48#[derive(new)]
49pub struct PipelineResponse<Ctx> {
50    pub ctx: Ctx,
51    pub executed: Vec<String>,
52    pub err: Option<PipelineError>,
53}
54
55impl<Ctx> Pipeline<Ctx>
56where
57    Ctx: Send + Sync + 'static,
58{
59    /// create a new pipeline
60    pub fn new(plugs: Vec<Box<dyn Plug<Ctx>>>, executed: Option<Vec<String>>) -> Self {
61        Self {
62            plugs: plugs.into(),
63            executed: executed.unwrap_or_default(),
64        }
65    }
66
67    /// execute the entire pipeline sequentially and run to completion
68    #[async_recursion]
69    pub async fn execute(mut self, ctx: Ctx) -> PipelineResponse<Ctx> {
70        let mut c = ctx;
71        while let Some(plug) = self.plugs.pop_front() {
72            self.add_execution_log(plug.as_ref());
73            match plug.call(c).await {
74                PlugResult::Continue(ctx) => c = ctx,
75                PlugResult::Terminate(ctx) => {
76                    return PipelineResponse::new(ctx, self.executed, None)
77                }
78                PlugResult::NewPipe { ctx, plugs } => {
79                    let pipeline = Self::new(plugs, Some(self.executed.clone()));
80                    return pipeline.execute(ctx).await;
81                }
82                PlugResult::Err { ctx, err } => {
83                    return PipelineResponse::new(ctx, self.executed, Some(err))
84                }
85            }
86        }
87
88        PipelineResponse::new(c, self.executed, None)
89    }
90
91    fn add_execution_log(&mut self, plug: &dyn Plug<Ctx>) {
92        self.executed.push(plug.to_string());
93    }
94}
95
96#[macro_export]
97macro_rules! try_with {
98    ($ctx:ident, $exp:expr) => {
99        match $exp {
100            Ok(v) => v,
101            Err(e) => {
102                return simple_pipeline::PlugResult::Err {
103                    ctx: $ctx,
104                    err: simple_pipeline::PipelineError::Internal(e.to_string()),
105                }
106            }
107        }
108    };
109}
110
111#[macro_export]
112macro_rules! ctx_take {
113    ($plug:ident, $ctx:ident, $name:ident) => {
114        simple_pipeline::try_with!(
115            $ctx,
116            $ctx.$name.take().ok_or_else(|| {
117                simple_pipeline::PipelineError::InvalidContext(format!(
118                    "{}: Cannot take {} for ctx",
119                    $plug.to_string(),
120                    stringify!($name)
121                ))
122            })
123        )
124    };
125}
126
127#[macro_export]
128macro_rules! ctx_ref {
129    ($plug:ident, $ctx:ident, $name:ident) => {
130        simple_pipeline::try_with!(
131            $ctx,
132            $ctx.$name.as_ref().ok_or_else(|| {
133                simple_pipeline::PipelineError::InvalidContext(format!(
134                    "{}: Cannot take {} for ctx",
135                    $plug.to_string(),
136                    stringify!($name)
137                ))
138            })
139        )
140    };
141}
142
143#[macro_export]
144macro_rules! ctx_mut {
145    ($plug:ident, $ctx:ident, $name:ident) => {
146        simple_pipeline::try_with!(
147            $ctx,
148            $ctx.$name.as_mut().ok_or_else(|| {
149                simple_pipeline::PipelineError::InvalidContext(format!(
150                    "{}: Cannot take {} for ctx",
151                    $plug.to_string(),
152                    stringify!($name)
153                ))
154            })
155        )
156    };
157}