1use async_recursion::async_recursion;
2use async_trait::async_trait;
3use derive_new::new;
4use std::{collections::VecDeque, fmt};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
8pub enum PipelineError {
9 #[error("Invalid context: {0}")]
10 InvalidContext(String),
11
12 #[error("Internal: {0}")]
13 Internal(String),
14}
15
16#[must_use]
18#[derive(new)]
19pub enum PlugResult<Ctx> {
20 Continue(Ctx),
21 #[allow(dead_code)]
22 Terminate(Ctx),
23 NewPipe {
24 ctx: Ctx,
25 plugs: Vec<Box<dyn Plug<Ctx>>>,
26 },
27 Err {
28 ctx: Ctx,
29 err: PipelineError,
30 },
31}
32
33#[async_trait]
35pub trait Plug<Ctx>: fmt::Display + Send + Sync + 'static {
36 async fn call(&self, ctx: Ctx) -> PlugResult<Ctx>;
37}
38
39#[derive(Default)]
41pub struct Pipeline<Ctx> {
42 plugs: VecDeque<Box<dyn Plug<Ctx>>>,
43 executed: Vec<String>,
44}
45
46#[must_use]
48#[derive(new)]
49pub struct PipelineResponse<Ctx> {
50 pub ctx: Ctx,
51 pub executed: Vec<String>,
52 pub err: Option<PipelineError>,
53}
54
55impl<Ctx> Pipeline<Ctx>
56where
57 Ctx: Send + Sync + 'static,
58{
59 pub fn new(plugs: Vec<Box<dyn Plug<Ctx>>>, executed: Option<Vec<String>>) -> Self {
61 Self {
62 plugs: plugs.into(),
63 executed: executed.unwrap_or_default(),
64 }
65 }
66
67 #[async_recursion]
69 pub async fn execute(mut self, ctx: Ctx) -> PipelineResponse<Ctx> {
70 let mut c = ctx;
71 while let Some(plug) = self.plugs.pop_front() {
72 self.add_execution_log(plug.as_ref());
73 match plug.call(c).await {
74 PlugResult::Continue(ctx) => c = ctx,
75 PlugResult::Terminate(ctx) => {
76 return PipelineResponse::new(ctx, self.executed, None)
77 }
78 PlugResult::NewPipe { ctx, plugs } => {
79 let pipeline = Self::new(plugs, Some(self.executed.clone()));
80 return pipeline.execute(ctx).await;
81 }
82 PlugResult::Err { ctx, err } => {
83 return PipelineResponse::new(ctx, self.executed, Some(err))
84 }
85 }
86 }
87
88 PipelineResponse::new(c, self.executed, None)
89 }
90
91 fn add_execution_log(&mut self, plug: &dyn Plug<Ctx>) {
92 self.executed.push(plug.to_string());
93 }
94}
95
96#[macro_export]
97macro_rules! try_with {
98 ($ctx:ident, $exp:expr) => {
99 match $exp {
100 Ok(v) => v,
101 Err(e) => {
102 return simple_pipeline::PlugResult::Err {
103 ctx: $ctx,
104 err: simple_pipeline::PipelineError::Internal(e.to_string()),
105 }
106 }
107 }
108 };
109}
110
111#[macro_export]
112macro_rules! ctx_take {
113 ($plug:ident, $ctx:ident, $name:ident) => {
114 simple_pipeline::try_with!(
115 $ctx,
116 $ctx.$name.take().ok_or_else(|| {
117 simple_pipeline::PipelineError::InvalidContext(format!(
118 "{}: Cannot take {} for ctx",
119 $plug.to_string(),
120 stringify!($name)
121 ))
122 })
123 )
124 };
125}
126
127#[macro_export]
128macro_rules! ctx_ref {
129 ($plug:ident, $ctx:ident, $name:ident) => {
130 simple_pipeline::try_with!(
131 $ctx,
132 $ctx.$name.as_ref().ok_or_else(|| {
133 simple_pipeline::PipelineError::InvalidContext(format!(
134 "{}: Cannot take {} for ctx",
135 $plug.to_string(),
136 stringify!($name)
137 ))
138 })
139 )
140 };
141}
142
143#[macro_export]
144macro_rules! ctx_mut {
145 ($plug:ident, $ctx:ident, $name:ident) => {
146 simple_pipeline::try_with!(
147 $ctx,
148 $ctx.$name.as_mut().ok_or_else(|| {
149 simple_pipeline::PipelineError::InvalidContext(format!(
150 "{}: Cannot take {} for ctx",
151 $plug.to_string(),
152 stringify!($name)
153 ))
154 })
155 )
156 };
157}