fama/pipeline_builder.rs
1use std::sync::Arc;
2
3use futures::future::BoxFuture;
4use tokio::sync::RwLock;
5
6use crate::{Pipeline, pipeline::PipeFnHandler};
7
8type PipeList<T> =
9 Arc<RwLock<Vec<Box<dyn FnMut(Pipeline<T>) -> BoxFuture<'static, Pipeline<T>> + Send + Sync>>>>;
10
11/// PipelineBuilder provides flexibility and extensibility to your pipelines
12///
13/// Pipes/function can be appended to your type pipeline from other places in your code or even
14/// across crates.
15///
16/// ```rust
17///# #![allow(dead_code)]
18///# use fama::PipelineBuilder;
19///
20/// #[derive(Default, Clone)]
21/// struct MeaningOfLife(i32);
22///
23///
24///
25/// #[tokio::main]
26/// async fn main() {
27///
28/// let builder = PipelineBuilder::<MeaningOfLife>::new();
29///
30/// builder.register(|pipeline| {
31/// Box::pin(async {
32/// pipeline.store_fn(|mut instance: MeaningOfLife| async {
33/// instance.0 = 42;
34/// instance
35/// }).await
36/// })
37/// }).await;
38///
39/// let life = builder.build(MeaningOfLife::default()).await.deliver().await;
40/// assert_eq!(life.0, 42);
41/// }
42///
43/// ```
44/// You can implement the PipelineBuilderTrait for your type as well
45///
46///
47/// ```rust
48///# #![allow(dead_code)]
49///# use fama::{PipelineBuilder, PipelineBuilderTrait};
50///
51/// #[derive(Default, Clone)]
52/// struct MeaningOfLife(i32);
53///
54///
55/// #[fama::async_trait]
56/// impl PipelineBuilderTrait for MeaningOfLife {
57///
58/// // we are overriding the default implementation of this method in order
59/// // to append our pipe
60/// async fn setup_pipeline_builder(builder: PipelineBuilder<Self>) -> PipelineBuilder<Self> {
61/// builder.register(|pipeline| {
62/// Box::pin(async {
63/// pipeline.store_fn(|mut instance: MeaningOfLife| async {
64/// instance.0 = 42;
65/// instance
66/// }).await
67/// })
68/// }).await;
69///
70/// builder
71/// }
72///
73/// }
74///
75///
76/// #[tokio::main]
77/// async fn main() {
78///
79/// let new_life = MeaningOfLife(0);
80///
81/// // Register/append a pipe/function to the pipeline
82/// MeaningOfLife::pipeline_builder().await
83/// .register(|pipeline| {
84/// Box::pin(async {
85/// pipeline.store_fn(|mut instance: MeaningOfLife| async {
86/// if instance.0 == 0 {
87/// instance.0 = 42 ;
88/// } else {
89/// instance.0 = instance.0 * 2;
90/// }
91/// instance
92/// }).await
93/// })
94/// }).await;
95///
96/// let life = new_life.pipeline().await.deliver().await;
97/// assert_eq!(life.0, 84);
98/// }
99///
100/// ```
101#[derive(Clone)]
102pub struct PipelineBuilder<T: Clone + Send + Sync + 'static> {
103 pipes: PipeList<T>,
104}
105
106impl<T: Clone + Send + Sync + 'static> PipelineBuilder<T> {
107 pub fn new() -> Self {
108 futures::executor::block_on(async {
109 let (_, instance) = Self::initial().await;
110 instance
111 })
112 }
113
114 pub async fn initial() -> (bool, Self) {
115 if let Some(instance) = busybody::helpers::service_container()
116 .get_type::<Self>()
117 .await
118 {
119 (false, instance)
120 } else {
121 let instance = Self {
122 pipes: Arc::default(),
123 };
124 busybody::helpers::service_container()
125 .set_type(instance.clone())
126 .await;
127 (true, instance)
128 }
129 }
130
131 pub async fn register<F>(&self, callback: F) -> &Self
132 where
133 F: FnMut(Pipeline<T>) -> BoxFuture<'static, Pipeline<T>> + Send + Sync + 'static,
134 {
135 let mut lock = self.pipes.write().await;
136 lock.push(Box::new(callback));
137
138 self
139 }
140
141 pub async fn build(&self, content: T) -> Pipeline<T> {
142 let mut pipeline = Pipeline::pass(content).await;
143 let mut lock = self.pipes.write().await;
144 for pipe in lock.iter_mut() {
145 pipeline = pipe.pipe_fn_handle((pipeline,)).await;
146 }
147
148 pipeline
149 }
150}
151
152impl<T: Clone + Send + Sync + 'static> Default for PipelineBuilder<T> {
153 fn default() -> Self {
154 Self {
155 pipes: Default::default(),
156 }
157 }
158}
159
160#[busybody::async_trait]
161pub trait PipelineBuilderTrait: Clone + Send + Sync {
162 /// Will be called the first time an instance of the builder is instantiated
163 /// Use this method to prepend pipes
164 async fn setup_pipeline_builder(builder: PipelineBuilder<Self>) -> PipelineBuilder<Self> {
165 builder
166 }
167
168 /// Returns the pipe builder instance for this type
169 async fn pipeline_builder() -> PipelineBuilder<Self> {
170 let (is_initial, builder) = PipelineBuilder::<Self>::initial().await;
171 if is_initial {
172 return Self::setup_pipeline_builder(builder).await;
173 }
174
175 builder
176 }
177
178 /// Pass the current instance of this type through the pipeline
179 async fn pipeline(self) -> Pipeline<Self> {
180 Self::pipeline_builder().await.build(self).await
181 }
182}
183
184#[cfg(test)]
185mod test {
186 use super::*;
187
188 #[derive(Debug, Clone, Default)]
189 struct NewUser {
190 id: Option<String>,
191 role: Option<Vec<String>>,
192 }
193
194 #[crate::async_trait]
195 impl busybody::Resolver for NewUser {
196 async fn resolve(c: &busybody::ServiceContainer) -> Self {
197 c.get_type().await.unwrap_or_default()
198 }
199 }
200
201 #[crate::async_trait]
202 impl PipelineBuilderTrait for NewUser {
203 // implementing this method is optional
204 async fn setup_pipeline_builder(builder: PipelineBuilder<Self>) -> PipelineBuilder<Self> {
205 builder
206 .register(|pipeline| {
207 Box::pin(async {
208 pipeline
209 .store_fn(|mut user: NewUser| async {
210 user.id = Some(format!("USR-{}", 200));
211 user.role = Some(vec!["Teacher".to_string()]);
212
213 user
214 })
215 .await
216 })
217 })
218 .await;
219 builder
220 }
221 }
222
223 #[tokio::test]
224 async fn test_multiple_instances() {
225 let builder = PipelineBuilder::<NewUser>::new();
226 let builder2 = NewUser::pipeline_builder().await;
227
228 builder2
229 .register(|pipeline| {
230 Box::pin(async {
231 pipeline
232 .store_fn(|mut user: NewUser| async {
233 user.id = Some("changed".to_string());
234 user
235 })
236 .await
237 })
238 })
239 .await;
240
241 let user_a = builder.build(NewUser::default()).await.deliver().await;
242 let user_b = NewUser::default().pipeline().await.deliver().await;
243
244 assert_eq!(user_a.id, user_b.id);
245 }
246}