Skip to main content

fama/
pipeline_builder.rs

1use std::sync::Arc;
2
3use futures::future::BoxFuture;
4use tokio::sync::RwLock;
5
6use crate::{Pipeline, pipeline::PipeFnHandler};
7
8type PipeList<T> =
9    Arc<RwLock<Vec<Box<dyn FnMut(Pipeline<T>) -> BoxFuture<'static, Pipeline<T>> + Send + Sync>>>>;
10
11/// PipelineBuilder provides flexibility and extensibility to your pipelines
12///
13/// Pipes/function can be appended to your type pipeline from other places in your code or even
14/// across crates.
15///
16/// ```rust
17///# #![allow(dead_code)]
18///# use fama::PipelineBuilder;
19///
20/// #[derive(Default, Clone)]
21/// struct MeaningOfLife(i32);
22///
23///
24///
25/// #[tokio::main]
26/// async fn main() {
27///
28///    let builder = PipelineBuilder::<MeaningOfLife>::new();
29///
30///    builder.register(|pipeline| {
31///       Box::pin(async {
32///         pipeline.store_fn(|mut instance: MeaningOfLife| async {
33///             instance.0 = 42;
34///            instance
35///         }).await
36///      })
37///    }).await;
38///  
39///    let life = builder.build(MeaningOfLife::default()).await.deliver().await;
40///    assert_eq!(life.0, 42);
41/// }
42///
43/// ```
44/// You can implement the PipelineBuilderTrait for your type as well
45///
46///
47/// ```rust
48///# #![allow(dead_code)]
49///# use fama::{PipelineBuilder, PipelineBuilderTrait};
50///
51/// #[derive(Default, Clone)]
52/// struct MeaningOfLife(i32);
53///
54///
55/// #[fama::async_trait]
56/// impl PipelineBuilderTrait for MeaningOfLife {
57///
58///    // we are overriding the default implementation of this method in order
59///    // to append our pipe
60///    async fn setup_pipeline_builder(builder: PipelineBuilder<Self>) -> PipelineBuilder<Self> {
61///       builder.register(|pipeline| {
62///          Box::pin(async {
63///               pipeline.store_fn(|mut instance: MeaningOfLife| async {
64///                    instance.0 = 42;
65///                    instance
66///              }).await
67///          })
68///        }).await;
69///
70///        builder
71///    }
72///    
73/// }
74///
75///
76/// #[tokio::main]
77/// async fn main() {
78///
79///    let new_life = MeaningOfLife(0);
80///
81///    // Register/append a pipe/function to the pipeline
82///    MeaningOfLife::pipeline_builder().await
83///      .register(|pipeline| {
84///           Box::pin(async {
85///              pipeline.store_fn(|mut instance: MeaningOfLife| async {
86///                  if instance.0 == 0  {
87///                      instance.0 = 42 ;
88///                   } else {
89///                      instance.0 = instance.0 * 2;
90///                   }
91///                   instance
92///              }).await
93///          })
94///       }).await;
95///  
96///    let life = new_life.pipeline().await.deliver().await;
97///    assert_eq!(life.0, 84);
98/// }
99///
100/// ```
101#[derive(Clone)]
102pub struct PipelineBuilder<T: Clone + Send + Sync + 'static> {
103    pipes: PipeList<T>,
104}
105
106impl<T: Clone + Send + Sync + 'static> PipelineBuilder<T> {
107    pub fn new() -> Self {
108        futures::executor::block_on(async {
109            let (_, instance) = Self::initial().await;
110            instance
111        })
112    }
113
114    pub async fn initial() -> (bool, Self) {
115        if let Some(instance) = busybody::helpers::service_container()
116            .get_type::<Self>()
117            .await
118        {
119            (false, instance)
120        } else {
121            let instance = Self {
122                pipes: Arc::default(),
123            };
124            busybody::helpers::service_container()
125                .set_type(instance.clone())
126                .await;
127            (true, instance)
128        }
129    }
130
131    pub async fn register<F>(&self, callback: F) -> &Self
132    where
133        F: FnMut(Pipeline<T>) -> BoxFuture<'static, Pipeline<T>> + Send + Sync + 'static,
134    {
135        let mut lock = self.pipes.write().await;
136        lock.push(Box::new(callback));
137
138        self
139    }
140
141    pub async fn build(&self, content: T) -> Pipeline<T> {
142        let mut pipeline = Pipeline::pass(content).await;
143        let mut lock = self.pipes.write().await;
144        for pipe in lock.iter_mut() {
145            pipeline = pipe.pipe_fn_handle((pipeline,)).await;
146        }
147
148        pipeline
149    }
150}
151
152impl<T: Clone + Send + Sync + 'static> Default for PipelineBuilder<T> {
153    fn default() -> Self {
154        Self {
155            pipes: Default::default(),
156        }
157    }
158}
159
160#[busybody::async_trait]
161pub trait PipelineBuilderTrait: Clone + Send + Sync {
162    /// Will be called the first time an instance of the builder is instantiated
163    /// Use this method to prepend pipes
164    async fn setup_pipeline_builder(builder: PipelineBuilder<Self>) -> PipelineBuilder<Self> {
165        builder
166    }
167
168    /// Returns the pipe builder instance for this type
169    async fn pipeline_builder() -> PipelineBuilder<Self> {
170        let (is_initial, builder) = PipelineBuilder::<Self>::initial().await;
171        if is_initial {
172            return Self::setup_pipeline_builder(builder).await;
173        }
174
175        builder
176    }
177
178    /// Pass the current instance of this type through the pipeline
179    async fn pipeline(self) -> Pipeline<Self> {
180        Self::pipeline_builder().await.build(self).await
181    }
182}
183
184#[cfg(test)]
185mod test {
186    use super::*;
187
188    #[derive(Debug, Clone, Default)]
189    struct NewUser {
190        id: Option<String>,
191        role: Option<Vec<String>>,
192    }
193
194    #[crate::async_trait]
195    impl busybody::Resolver for NewUser {
196        async fn resolve(c: &busybody::ServiceContainer) -> Self {
197            c.get_type().await.unwrap_or_default()
198        }
199    }
200
201    #[crate::async_trait]
202    impl PipelineBuilderTrait for NewUser {
203        // implementing this method is optional
204        async fn setup_pipeline_builder(builder: PipelineBuilder<Self>) -> PipelineBuilder<Self> {
205            builder
206                .register(|pipeline| {
207                    Box::pin(async {
208                        pipeline
209                            .store_fn(|mut user: NewUser| async {
210                                user.id = Some(format!("USR-{}", 200));
211                                user.role = Some(vec!["Teacher".to_string()]);
212
213                                user
214                            })
215                            .await
216                    })
217                })
218                .await;
219            builder
220        }
221    }
222
223    #[tokio::test]
224    async fn test_multiple_instances() {
225        let builder = PipelineBuilder::<NewUser>::new();
226        let builder2 = NewUser::pipeline_builder().await;
227
228        builder2
229            .register(|pipeline| {
230                Box::pin(async {
231                    pipeline
232                        .store_fn(|mut user: NewUser| async {
233                            user.id = Some("changed".to_string());
234                            user
235                        })
236                        .await
237                })
238            })
239            .await;
240
241        let user_a = builder.build(NewUser::default()).await.deliver().await;
242        let user_b = NewUser::default().pipeline().await.deliver().await;
243
244        assert_eq!(user_a.id, user_b.id);
245    }
246}