fama/
pipeline.rs

1use async_trait::async_trait;
2use busybody::Resolver;
3use futures::future::Future;
4use std::marker::PhantomData;
5
6use crate::{PipeContent, content::PipeState};
7
8/// The pipes manager
9#[derive(Clone)]
10pub struct Pipeline<T: Send + Sync + 'static> {
11    phantom: PhantomData<T>,
12    pipe_content: PipeContent,
13    went_through: bool,
14}
15
16impl<T: Clone + Send + Sync + 'static> Pipeline<T> {
17    /// Accepts the pipeline content/input.
18    /// This is the beginning of the pipeline
19    pub async fn pass(content: T) -> Self {
20        let pipe_content = PipeContent::new(content).await;
21
22        Self {
23            pipe_content,
24            phantom: PhantomData,
25            went_through: false,
26        }
27    }
28
29    pub async fn pass_content(self, content: T) -> Self {
30        self.container().set_type(content).await;
31        self
32    }
33
34    /// Accepts a closure or function as a pipe.
35    /// The closure can accept zero or more arguments.
36    /// Unlike a struct pipe, a closure does not have to use a tuple
37    /// for multiple arguments. Arguments can be up to 17
38    pub async fn through_fn<H, Args, O>(mut self, mut handler: H) -> Self
39    where
40        H: PipeFnHandler<Args, O>,
41        Args: busybody::Resolver + 'static,
42    {
43        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
44            let args = Args::resolve(self.container()).await;
45            handler.pipe_fn_handle(args).await;
46            self.went_through = true;
47        } else {
48            self.went_through = false;
49        }
50
51        self
52    }
53
54    /// Accepts a closure or function as a pipe.
55    /// The closure can accept zero or more arguments.
56    /// Unlike a struct pipe, a closure does not have to use a tuple
57    /// for multiple arguments. Arguments can be up to 17
58    /// Closure must return a boolean. `False` will stop the pipe flow
59    pub async fn next_fn<H, Args>(mut self, mut handler: H) -> Self
60    where
61        H: PipeFnHandler<Args, bool>,
62        Args: busybody::Resolver + 'static,
63    {
64        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
65            let args = Args::resolve(self.container()).await;
66            if !handler.pipe_fn_handle(args).await {
67                self.container().set(PipeState::Stop).await;
68            }
69            self.went_through = true;
70        } else {
71            self.went_through = false;
72        }
73
74        self
75    }
76
77    /// Stores the result from the pipe handler
78    pub async fn store_fn<H, Args, O: Clone + Send + Sync + 'static>(
79        mut self,
80        mut handler: H,
81    ) -> Self
82    where
83        H: PipeFnHandler<Args, O>,
84        Args: busybody::Resolver + 'static,
85    {
86        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
87            let args = Args::resolve(self.container()).await;
88            self.container()
89                .set_type(handler.pipe_fn_handle(args).await)
90                .await;
91            self.went_through = true;
92        } else {
93            self.went_through = false;
94        }
95
96        self
97    }
98
99    // Stores Option<T> returned by the handler
100    // If option is `none` the pipe flow is stopped
101    pub async fn some_fn<H, Args, O: Clone + Send + Sync + 'static>(
102        mut self,
103        mut handler: H,
104    ) -> Self
105    where
106        H: PipeFnHandler<Args, Option<O>>,
107        Args: busybody::Resolver + 'static,
108    {
109        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
110            let args = Args::resolve(self.container()).await;
111            let option = handler.pipe_fn_handle(args).await;
112
113            if option.is_none() {
114                self.container().set(PipeState::Stop).await;
115            }
116
117            self.container().set_type(option).await;
118            self.went_through = true;
119        } else {
120            self.went_through = false;
121        }
122
123        self
124    }
125
126    // Stores Result<T, E> returned by the handler
127    // If result is `err` the pipe flow is stopped
128    pub async fn ok_fn<
129        H,
130        Args,
131        O: Clone + Send + Sync + 'static,
132        E: Clone + Send + Sync + 'static,
133    >(
134        mut self,
135        mut handler: H,
136    ) -> Self
137    where
138        H: PipeFnHandler<Args, Result<O, E>>,
139        Args: busybody::Resolver + 'static,
140    {
141        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
142            let args = Args::resolve(self.container()).await;
143            let result = handler.pipe_fn_handle(args).await;
144
145            if result.is_err() {
146                self.container().set(PipeState::Stop).await;
147            }
148
149            self.container().set_type(result).await;
150            self.went_through = true;
151        } else {
152            self.went_through = false;
153        }
154
155        self
156    }
157
158    /// Accepts an instance of a struct that implements `fama::FamaPipe`
159    /// The returned result will be store for the next pipe handlers
160    pub async fn through<H, Args, O>(mut self, handler: H) -> Self
161    where
162        H: FamaPipe<Args, O>,
163        Args: busybody::Resolver + 'static,
164    {
165        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
166            let args = Args::resolve(self.container()).await;
167            handler.receive_pipe_content(args).await;
168            self.went_through = true;
169        } else {
170            self.went_through = false;
171        }
172
173        self
174    }
175
176    /// Accepts an instance of a struct that implements `fama::FamaPipe`
177    /// Must return a boolean. `False` will halt the flow
178    pub async fn next<H, Args>(mut self, handler: H) -> Self
179    where
180        H: FamaPipe<Args, bool>,
181        Args: busybody::Resolver + 'static,
182    {
183        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
184            let args = Args::resolve(self.container()).await;
185            if !handler.receive_pipe_content(args).await {
186                self.container().set(PipeState::Stop).await;
187            }
188            self.went_through = true;
189        } else {
190            self.went_through = false;
191        }
192
193        self
194    }
195    pub async fn store<H, Args, O: Clone + Send + Sync + 'static>(mut self, handler: H) -> Self
196    where
197        H: FamaPipe<Args, O>,
198        Args: busybody::Resolver + 'static,
199    {
200        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
201            let args = Args::resolve(self.container()).await;
202            self.container()
203                .set_type(handler.receive_pipe_content(args).await)
204                .await;
205            self.went_through = true;
206        } else {
207            self.went_through = false;
208        }
209
210        self
211    }
212
213    // Stores Option<T> returned by the handler
214    // If option is `none` the pipe flow is stopped
215    pub async fn some<H, Args, O: Clone + Send + Sync + 'static>(mut self, handler: H) -> Self
216    where
217        H: FamaPipe<Args, Option<O>>,
218        Args: Resolver + 'static,
219    {
220        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
221            let args = Args::resolve(self.container()).await;
222            let option = handler.receive_pipe_content(args).await;
223
224            if option.is_none() {
225                self.container().set(PipeState::Stop).await;
226            }
227
228            self.container().set_type(option).await;
229            self.went_through = true;
230        } else {
231            self.went_through = false;
232        }
233
234        self
235    }
236
237    // Stores Result<T, E> returned by the handler
238    // If result is `err` the pipe flow is stopped
239    pub async fn ok<H, Args, O: Clone + Send + Sync + 'static, E: Clone + Send + Sync + 'static>(
240        mut self,
241        handler: H,
242    ) -> Self
243    where
244        H: FamaPipe<Args, Result<O, E>>,
245        Args: busybody::Resolver + 'static,
246    {
247        if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
248            let args = Args::resolve(self.container()).await;
249            let result = handler.receive_pipe_content(args).await;
250
251            if result.is_err() {
252                self.container().set(PipeState::Stop).await;
253            }
254
255            self.container().set_type(result).await;
256            self.went_through = true;
257        } else {
258            self.went_through = false;
259        }
260
261        self
262    }
263
264    /// Returns the passed variable
265    pub async fn deliver(&self) -> T {
266        self.try_to_deliver().await.unwrap()
267    }
268
269    /// Returns the passed variable wrapped in an `Option<T>`
270    pub async fn try_to_deliver(&self) -> Option<T> {
271        self.container().get_type().await
272    }
273
274    /// Returns a different type that may have been set
275    /// by one of the pipes
276    pub async fn deliver_as<R: Clone + 'static>(&self) -> R {
277        self.try_deliver_as().await.unwrap()
278    }
279
280    /// Returns a different type that may have been set
281    /// by one of the pipes. The returned type will be wrapped
282    /// in an `Option<T>`
283    pub async fn try_deliver_as<R: Clone + 'static>(&self) -> Option<R> {
284        self.container().get_type().await
285    }
286
287    /// Returns true if the content went through all the registered pipes
288    pub fn confirm(&self) -> bool {
289        self.went_through
290    }
291
292    fn container(&self) -> &busybody::ServiceContainer {
293        self.pipe_content.container()
294    }
295}
296
297#[async_trait]
298pub trait FamaPipe<Args, O> {
299    /// Where a pipe logic resides
300    async fn receive_pipe_content(&self, args: Args) -> O;
301
302    /// Wraps the type in a Box
303    fn to_pipe(self) -> Box<Self>
304    where
305        Self: Sized,
306    {
307        Box::new(self)
308    }
309}
310
311pub trait PipeFnHandler<Args, O>: Send + Sync + 'static {
312    type Future: Future<Output = O> + Send;
313
314    fn pipe_fn_handle(&mut self, args: Args) -> Self::Future;
315}
316
317impl<Func, Fut, O> PipeFnHandler<(), O> for Func
318where
319    Func: Send + Sync + FnMut() -> Fut + Send + Sync + 'static,
320    Fut: Future<Output = O> + Send,
321{
322    type Future = Fut;
323    fn pipe_fn_handle(&mut self, _: ()) -> Self::Future {
324        (self)()
325    }
326}
327
328impl<Func, Arg1, Fut, O> PipeFnHandler<(Arg1,), O> for Func
329where
330    Func: Send + Sync + FnMut(Arg1) -> Fut + Send + Sync + 'static,
331    Fut: Future<Output = O> + Send,
332{
333    type Future = Fut;
334    fn pipe_fn_handle(&mut self, (c,): (Arg1,)) -> Self::Future {
335        (self)(c)
336    }
337}
338
339macro_rules! pipe_func{
340    ($($T: ident),*) => {
341        impl<Func, $($T),+, Fut, O> PipeFnHandler <($($T),+), O> for Func
342         where Func: FnMut($($T),+) -> Fut + Send + Sync + 'static,
343         Fut: Future<Output = O> + Send,
344        {
345            type Future = Fut;
346
347            #[allow(non_snake_case)]
348            fn pipe_fn_handle(&mut self, ($($T),+): ($($T),+)) -> Self::Future {
349                (self)($($T),+)
350            }
351        }
352    };
353}
354
355pipe_func! {Arg1, Arg2}
356pipe_func! {Arg1, Arg2, Arg3}
357pipe_func! {Arg1, Arg2, Arg3, Arg4}
358pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5}
359pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6}
360pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7}
361pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8}
362pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9}
363pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10}
364pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11}
365pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12}
366pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13}
367pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14}
368pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14, Arg15}
369pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14, Arg15, Arg16}
370pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14, Arg15, Arg16, Arg17}
371
372#[cfg(test)]
373mod test {
374    use super::*;
375
376    struct AddOne;
377    #[async_trait]
378    impl FamaPipe<i32, i32> for AddOne {
379        async fn receive_pipe_content(&self, num: i32) -> i32 {
380            num + 1
381        }
382    }
383
384    struct AddTwo;
385    #[async_trait]
386    impl FamaPipe<i32, i32> for AddTwo {
387        async fn receive_pipe_content(&self, num: i32) -> i32 {
388            num + 2
389        }
390    }
391
392    struct StoreAddOne;
393    #[async_trait]
394    impl FamaPipe<(i32, PipeContent), ()> for StoreAddOne {
395        async fn receive_pipe_content(&self, (num, pipe): (i32, PipeContent)) {
396            pipe.store(num + 1).await;
397        }
398    }
399
400    struct StoreAddTwo;
401    #[async_trait]
402    impl FamaPipe<(i32, PipeContent), ()> for StoreAddTwo {
403        async fn receive_pipe_content(&self, (num, pipe): (i32, PipeContent)) {
404            pipe.store(num + 2).await;
405        }
406    }
407
408    struct ValidateCount;
409    #[async_trait]
410    impl FamaPipe<i32, bool> for ValidateCount {
411        async fn receive_pipe_content(&self, num: i32) -> bool {
412            num >= 6
413        }
414    }
415
416    #[tokio::test]
417    async fn test_through() {
418        let result = Pipeline::pass(0)
419            .await
420            .through(StoreAddOne)
421            .await
422            .through(StoreAddOne)
423            .await
424            .through(StoreAddTwo)
425            .await
426            .through(StoreAddTwo)
427            .await
428            .deliver()
429            .await;
430
431        assert_eq!(result, 6);
432    }
433
434    #[tokio::test]
435    async fn test_store() {
436        let result = Pipeline::pass(0)
437            .await
438            .store(AddOne)
439            .await
440            .store(AddOne)
441            .await
442            .store(AddTwo)
443            .await
444            .store(AddTwo)
445            .await
446            .deliver()
447            .await;
448
449        assert_eq!(result, 6);
450    }
451
452    #[tokio::test]
453    async fn test_next() {
454        let result = Pipeline::pass(0)
455            .await
456            .store(AddOne)
457            .await
458            .store(AddOne)
459            .await
460            .store(AddTwo)
461            .await
462            .next(ValidateCount)
463            .await
464            .store(AddTwo)
465            .await
466            .deliver()
467            .await;
468
469        assert_eq!(result, 4);
470
471        let result = Pipeline::pass(0)
472            .await
473            .store(AddOne)
474            .await
475            .store(AddOne)
476            .await
477            .store(AddTwo)
478            .await
479            .store(AddTwo)
480            .await
481            .store(AddTwo)
482            .await
483            .next(ValidateCount)
484            .await
485            .store(AddTwo)
486            .await
487            .deliver()
488            .await;
489
490        assert_eq!(result, 10);
491    }
492
493    #[tokio::test]
494    async fn test_through_fn1() {
495        let result: bool = Pipeline::pass(33)
496            .await
497            .through_fn(|num: i32, pipe: PipeContent| async move {
498                pipe.store(num + 2).await;
499            })
500            .await
501            .through_fn(|num: i32, pipe: PipeContent| async move {
502                pipe.store(num == 35).await;
503            })
504            .await
505            .deliver_as()
506            .await;
507
508        assert_eq!(result, true);
509    }
510
511    #[tokio::test]
512    async fn test_next_fn() {
513        let result: bool = Pipeline::pass(33)
514            .await
515            .next_fn(|num: i32, pipe: PipeContent| async move {
516                pipe.store(num + 2).await;
517                true
518            })
519            .await
520            .next_fn(|num: i32| async move { num != 35 })
521            .await
522            .next_fn(|num: i32| async move { num == 35 })
523            .await
524            .confirm();
525
526        assert_eq!(result, false);
527    }
528
529    #[tokio::test]
530    async fn test_store_fn() {
531        let total = Pipeline::pass(0)
532            .await
533            .store_fn(|num: i32| async move { num + 1 })
534            .await
535            .store_fn(|num: i32| async move { num + 4 })
536            .await
537            .store_fn(|num: i32| async move { num * 5 })
538            .await
539            .deliver()
540            .await;
541
542        assert_eq!(total, 25);
543    }
544
545    #[tokio::test]
546    async fn test_store2_fn() {
547        let total = Pipeline::<i32>::pass(0)
548            .await
549            .store_fn(|num: i32| async move { num + 1 })
550            .await
551            .store_fn(|num: i32| async move { num + 4 })
552            .await
553            .store_fn(|num: i32| async move { num * 5 })
554            .await
555            .deliver()
556            .await;
557
558        assert_eq!(total, 25);
559    }
560
561    #[tokio::test]
562    async fn test_some_flow_fn() {
563        let result1 = Pipeline::pass(0)
564            .await
565            .some_fn(|n: i32| async move { if n > 10 { Some(n) } else { None } })
566            .await
567            .deliver_as::<Option<i32>>()
568            .await;
569
570        assert_eq!(result1.is_some(), false);
571
572        let result2 = Pipeline::pass(100)
573            .await
574            .some_fn(|n: i32| async move { if n > 10 { Some(n) } else { None } })
575            .await
576            .deliver_as::<Option<i32>>()
577            .await;
578
579        assert_eq!(result2.is_some(), true);
580    }
581
582    #[tokio::test]
583    async fn test_some_flow() {
584        struct SomeI32;
585        #[async_trait::async_trait]
586        impl FamaPipe<i32, Option<i32>> for SomeI32 {
587            async fn receive_pipe_content(&self, n: i32) -> Option<i32> {
588                if n > 10 { Some(n) } else { None }
589            }
590        }
591        let result1 = Pipeline::pass(0)
592            .await
593            .some(SomeI32)
594            .await
595            .deliver_as::<Option<i32>>()
596            .await;
597
598        assert_eq!(result1.is_some(), false);
599
600        let result2 = Pipeline::pass(100)
601            .await
602            .some(SomeI32)
603            .await
604            .deliver_as::<Option<i32>>()
605            .await;
606
607        assert_eq!(result2.is_some(), true);
608    }
609
610    #[tokio::test]
611    async fn test_result_flow_fn() {
612        let result1 = Pipeline::pass(0)
613            .await
614            .ok_fn(|n: i32| async move { if n > 10 { Ok::<i32, ()>(n) } else { Err(()) } })
615            .await
616            .deliver_as::<Result<i32, ()>>()
617            .await;
618
619        assert_eq!(result1.is_err(), true);
620
621        let result2 = Pipeline::pass(100)
622            .await
623            .ok_fn(|n: i32| async move { if n > 10 { Ok::<i32, ()>(n) } else { Err(()) } })
624            .await
625            .deliver_as::<Result<i32, ()>>()
626            .await;
627
628        assert_eq!(result2.is_ok(), true);
629    }
630
631    #[tokio::test]
632    async fn test_result_flow() {
633        struct SomeI32;
634        #[async_trait::async_trait]
635        impl FamaPipe<i32, Result<i32, ()>> for SomeI32 {
636            async fn receive_pipe_content(&self, n: i32) -> Result<i32, ()> {
637                if n > 10 { Ok(n) } else { Err(()) }
638            }
639        }
640        let result1 = Pipeline::pass(0)
641            .await
642            .ok(SomeI32)
643            .await
644            .deliver_as::<Result<i32, ()>>()
645            .await;
646
647        assert_eq!(result1.is_err(), true);
648
649        let result2 = Pipeline::pass(100)
650            .await
651            .ok(SomeI32)
652            .await
653            .deliver_as::<Result<i32, ()>>()
654            .await;
655
656        assert_eq!(result2.is_ok(), true);
657    }
658
659    #[tokio::test]
660    async fn test_deliver_as() {
661        let result: bool = Pipeline::pass(0)
662            .await
663            .store_fn(|num: i32| async move { num + 1 })
664            .await
665            .store_fn(|num: i32| async move { num + 4 })
666            .await
667            .store_fn(|num: i32| async move { num * 5 })
668            .await
669            .store_fn(|num: i32| async move { num == 25 })
670            .await
671            .deliver_as()
672            .await;
673
674        assert_eq!(result, true);
675    }
676}