1use async_trait::async_trait;
2use busybody::Resolver;
3use futures::future::Future;
4use std::marker::PhantomData;
5
6use crate::{PipeContent, content::PipeState};
7
8#[derive(Clone)]
10pub struct Pipeline<T: Send + Sync + 'static> {
11 phantom: PhantomData<T>,
12 pipe_content: PipeContent,
13 went_through: bool,
14}
15
16impl<T: Clone + Send + Sync + 'static> Pipeline<T> {
17 pub async fn pass(content: T) -> Self {
20 let pipe_content = PipeContent::new(content).await;
21
22 Self {
23 pipe_content,
24 phantom: PhantomData,
25 went_through: false,
26 }
27 }
28
29 pub async fn pass_content(self, content: T) -> Self {
30 self.container().set_type(content).await;
31 self
32 }
33
34 pub async fn through_fn<H, Args, O>(mut self, mut handler: H) -> Self
39 where
40 H: PipeFnHandler<Args, O>,
41 Args: busybody::Resolver + 'static,
42 {
43 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
44 let args = Args::resolve(self.container()).await;
45 handler.pipe_fn_handle(args).await;
46 self.went_through = true;
47 } else {
48 self.went_through = false;
49 }
50
51 self
52 }
53
54 pub async fn next_fn<H, Args>(mut self, mut handler: H) -> Self
60 where
61 H: PipeFnHandler<Args, bool>,
62 Args: busybody::Resolver + 'static,
63 {
64 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
65 let args = Args::resolve(self.container()).await;
66 if !handler.pipe_fn_handle(args).await {
67 self.container().set(PipeState::Stop).await;
68 }
69 self.went_through = true;
70 } else {
71 self.went_through = false;
72 }
73
74 self
75 }
76
77 pub async fn store_fn<H, Args, O: Clone + Send + Sync + 'static>(
79 mut self,
80 mut handler: H,
81 ) -> Self
82 where
83 H: PipeFnHandler<Args, O>,
84 Args: busybody::Resolver + 'static,
85 {
86 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
87 let args = Args::resolve(self.container()).await;
88 self.container()
89 .set_type(handler.pipe_fn_handle(args).await)
90 .await;
91 self.went_through = true;
92 } else {
93 self.went_through = false;
94 }
95
96 self
97 }
98
99 pub async fn some_fn<H, Args, O: Clone + Send + Sync + 'static>(
102 mut self,
103 mut handler: H,
104 ) -> Self
105 where
106 H: PipeFnHandler<Args, Option<O>>,
107 Args: busybody::Resolver + 'static,
108 {
109 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
110 let args = Args::resolve(self.container()).await;
111 let option = handler.pipe_fn_handle(args).await;
112
113 if option.is_none() {
114 self.container().set(PipeState::Stop).await;
115 }
116
117 self.container().set_type(option).await;
118 self.went_through = true;
119 } else {
120 self.went_through = false;
121 }
122
123 self
124 }
125
126 pub async fn ok_fn<
129 H,
130 Args,
131 O: Clone + Send + Sync + 'static,
132 E: Clone + Send + Sync + 'static,
133 >(
134 mut self,
135 mut handler: H,
136 ) -> Self
137 where
138 H: PipeFnHandler<Args, Result<O, E>>,
139 Args: busybody::Resolver + 'static,
140 {
141 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
142 let args = Args::resolve(self.container()).await;
143 let result = handler.pipe_fn_handle(args).await;
144
145 if result.is_err() {
146 self.container().set(PipeState::Stop).await;
147 }
148
149 self.container().set_type(result).await;
150 self.went_through = true;
151 } else {
152 self.went_through = false;
153 }
154
155 self
156 }
157
158 pub async fn through<H, Args, O>(mut self, handler: H) -> Self
161 where
162 H: FamaPipe<Args, O>,
163 Args: busybody::Resolver + 'static,
164 {
165 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
166 let args = Args::resolve(self.container()).await;
167 handler.receive_pipe_content(args).await;
168 self.went_through = true;
169 } else {
170 self.went_through = false;
171 }
172
173 self
174 }
175
176 pub async fn next<H, Args>(mut self, handler: H) -> Self
179 where
180 H: FamaPipe<Args, bool>,
181 Args: busybody::Resolver + 'static,
182 {
183 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
184 let args = Args::resolve(self.container()).await;
185 if !handler.receive_pipe_content(args).await {
186 self.container().set(PipeState::Stop).await;
187 }
188 self.went_through = true;
189 } else {
190 self.went_through = false;
191 }
192
193 self
194 }
195 pub async fn store<H, Args, O: Clone + Send + Sync + 'static>(mut self, handler: H) -> Self
196 where
197 H: FamaPipe<Args, O>,
198 Args: busybody::Resolver + 'static,
199 {
200 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
201 let args = Args::resolve(self.container()).await;
202 self.container()
203 .set_type(handler.receive_pipe_content(args).await)
204 .await;
205 self.went_through = true;
206 } else {
207 self.went_through = false;
208 }
209
210 self
211 }
212
213 pub async fn some<H, Args, O: Clone + Send + Sync + 'static>(mut self, handler: H) -> Self
216 where
217 H: FamaPipe<Args, Option<O>>,
218 Args: Resolver + 'static,
219 {
220 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
221 let args = Args::resolve(self.container()).await;
222 let option = handler.receive_pipe_content(args).await;
223
224 if option.is_none() {
225 self.container().set(PipeState::Stop).await;
226 }
227
228 self.container().set_type(option).await;
229 self.went_through = true;
230 } else {
231 self.went_through = false;
232 }
233
234 self
235 }
236
237 pub async fn ok<H, Args, O: Clone + Send + Sync + 'static, E: Clone + Send + Sync + 'static>(
240 mut self,
241 handler: H,
242 ) -> Self
243 where
244 H: FamaPipe<Args, Result<O, E>>,
245 Args: busybody::Resolver + 'static,
246 {
247 if *self.container().get::<PipeState>().await.unwrap() == PipeState::Run {
248 let args = Args::resolve(self.container()).await;
249 let result = handler.receive_pipe_content(args).await;
250
251 if result.is_err() {
252 self.container().set(PipeState::Stop).await;
253 }
254
255 self.container().set_type(result).await;
256 self.went_through = true;
257 } else {
258 self.went_through = false;
259 }
260
261 self
262 }
263
264 pub async fn deliver(&self) -> T {
266 self.try_to_deliver().await.unwrap()
267 }
268
269 pub async fn try_to_deliver(&self) -> Option<T> {
271 self.container().get_type().await
272 }
273
274 pub async fn deliver_as<R: Clone + 'static>(&self) -> R {
277 self.try_deliver_as().await.unwrap()
278 }
279
280 pub async fn try_deliver_as<R: Clone + 'static>(&self) -> Option<R> {
284 self.container().get_type().await
285 }
286
287 pub fn confirm(&self) -> bool {
289 self.went_through
290 }
291
292 fn container(&self) -> &busybody::ServiceContainer {
293 self.pipe_content.container()
294 }
295}
296
297#[async_trait]
298pub trait FamaPipe<Args, O> {
299 async fn receive_pipe_content(&self, args: Args) -> O;
301
302 fn to_pipe(self) -> Box<Self>
304 where
305 Self: Sized,
306 {
307 Box::new(self)
308 }
309}
310
311pub trait PipeFnHandler<Args, O>: Send + Sync + 'static {
312 type Future: Future<Output = O> + Send;
313
314 fn pipe_fn_handle(&mut self, args: Args) -> Self::Future;
315}
316
317impl<Func, Fut, O> PipeFnHandler<(), O> for Func
318where
319 Func: Send + Sync + FnMut() -> Fut + Send + Sync + 'static,
320 Fut: Future<Output = O> + Send,
321{
322 type Future = Fut;
323 fn pipe_fn_handle(&mut self, _: ()) -> Self::Future {
324 (self)()
325 }
326}
327
328impl<Func, Arg1, Fut, O> PipeFnHandler<(Arg1,), O> for Func
329where
330 Func: Send + Sync + FnMut(Arg1) -> Fut + Send + Sync + 'static,
331 Fut: Future<Output = O> + Send,
332{
333 type Future = Fut;
334 fn pipe_fn_handle(&mut self, (c,): (Arg1,)) -> Self::Future {
335 (self)(c)
336 }
337}
338
339macro_rules! pipe_func{
340 ($($T: ident),*) => {
341 impl<Func, $($T),+, Fut, O> PipeFnHandler <($($T),+), O> for Func
342 where Func: FnMut($($T),+) -> Fut + Send + Sync + 'static,
343 Fut: Future<Output = O> + Send,
344 {
345 type Future = Fut;
346
347 #[allow(non_snake_case)]
348 fn pipe_fn_handle(&mut self, ($($T),+): ($($T),+)) -> Self::Future {
349 (self)($($T),+)
350 }
351 }
352 };
353}
354
355pipe_func! {Arg1, Arg2}
356pipe_func! {Arg1, Arg2, Arg3}
357pipe_func! {Arg1, Arg2, Arg3, Arg4}
358pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5}
359pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6}
360pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7}
361pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8}
362pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9}
363pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10}
364pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11}
365pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12}
366pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13}
367pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14}
368pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14, Arg15}
369pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14, Arg15, Arg16}
370pipe_func! {Arg1, Arg2, Arg3, Arg4, Arg5, Arg6, Arg7, Arg8, Arg9, Arg10, Arg11, Arg12, Arg13, Arg14, Arg15, Arg16, Arg17}
371
372#[cfg(test)]
373mod test {
374 use super::*;
375
376 struct AddOne;
377 #[async_trait]
378 impl FamaPipe<i32, i32> for AddOne {
379 async fn receive_pipe_content(&self, num: i32) -> i32 {
380 num + 1
381 }
382 }
383
384 struct AddTwo;
385 #[async_trait]
386 impl FamaPipe<i32, i32> for AddTwo {
387 async fn receive_pipe_content(&self, num: i32) -> i32 {
388 num + 2
389 }
390 }
391
392 struct StoreAddOne;
393 #[async_trait]
394 impl FamaPipe<(i32, PipeContent), ()> for StoreAddOne {
395 async fn receive_pipe_content(&self, (num, pipe): (i32, PipeContent)) {
396 pipe.store(num + 1).await;
397 }
398 }
399
400 struct StoreAddTwo;
401 #[async_trait]
402 impl FamaPipe<(i32, PipeContent), ()> for StoreAddTwo {
403 async fn receive_pipe_content(&self, (num, pipe): (i32, PipeContent)) {
404 pipe.store(num + 2).await;
405 }
406 }
407
408 struct ValidateCount;
409 #[async_trait]
410 impl FamaPipe<i32, bool> for ValidateCount {
411 async fn receive_pipe_content(&self, num: i32) -> bool {
412 num >= 6
413 }
414 }
415
416 #[tokio::test]
417 async fn test_through() {
418 let result = Pipeline::pass(0)
419 .await
420 .through(StoreAddOne)
421 .await
422 .through(StoreAddOne)
423 .await
424 .through(StoreAddTwo)
425 .await
426 .through(StoreAddTwo)
427 .await
428 .deliver()
429 .await;
430
431 assert_eq!(result, 6);
432 }
433
434 #[tokio::test]
435 async fn test_store() {
436 let result = Pipeline::pass(0)
437 .await
438 .store(AddOne)
439 .await
440 .store(AddOne)
441 .await
442 .store(AddTwo)
443 .await
444 .store(AddTwo)
445 .await
446 .deliver()
447 .await;
448
449 assert_eq!(result, 6);
450 }
451
452 #[tokio::test]
453 async fn test_next() {
454 let result = Pipeline::pass(0)
455 .await
456 .store(AddOne)
457 .await
458 .store(AddOne)
459 .await
460 .store(AddTwo)
461 .await
462 .next(ValidateCount)
463 .await
464 .store(AddTwo)
465 .await
466 .deliver()
467 .await;
468
469 assert_eq!(result, 4);
470
471 let result = Pipeline::pass(0)
472 .await
473 .store(AddOne)
474 .await
475 .store(AddOne)
476 .await
477 .store(AddTwo)
478 .await
479 .store(AddTwo)
480 .await
481 .store(AddTwo)
482 .await
483 .next(ValidateCount)
484 .await
485 .store(AddTwo)
486 .await
487 .deliver()
488 .await;
489
490 assert_eq!(result, 10);
491 }
492
493 #[tokio::test]
494 async fn test_through_fn1() {
495 let result: bool = Pipeline::pass(33)
496 .await
497 .through_fn(|num: i32, pipe: PipeContent| async move {
498 pipe.store(num + 2).await;
499 })
500 .await
501 .through_fn(|num: i32, pipe: PipeContent| async move {
502 pipe.store(num == 35).await;
503 })
504 .await
505 .deliver_as()
506 .await;
507
508 assert_eq!(result, true);
509 }
510
511 #[tokio::test]
512 async fn test_next_fn() {
513 let result: bool = Pipeline::pass(33)
514 .await
515 .next_fn(|num: i32, pipe: PipeContent| async move {
516 pipe.store(num + 2).await;
517 true
518 })
519 .await
520 .next_fn(|num: i32| async move { num != 35 })
521 .await
522 .next_fn(|num: i32| async move { num == 35 })
523 .await
524 .confirm();
525
526 assert_eq!(result, false);
527 }
528
529 #[tokio::test]
530 async fn test_store_fn() {
531 let total = Pipeline::pass(0)
532 .await
533 .store_fn(|num: i32| async move { num + 1 })
534 .await
535 .store_fn(|num: i32| async move { num + 4 })
536 .await
537 .store_fn(|num: i32| async move { num * 5 })
538 .await
539 .deliver()
540 .await;
541
542 assert_eq!(total, 25);
543 }
544
545 #[tokio::test]
546 async fn test_store2_fn() {
547 let total = Pipeline::<i32>::pass(0)
548 .await
549 .store_fn(|num: i32| async move { num + 1 })
550 .await
551 .store_fn(|num: i32| async move { num + 4 })
552 .await
553 .store_fn(|num: i32| async move { num * 5 })
554 .await
555 .deliver()
556 .await;
557
558 assert_eq!(total, 25);
559 }
560
561 #[tokio::test]
562 async fn test_some_flow_fn() {
563 let result1 = Pipeline::pass(0)
564 .await
565 .some_fn(|n: i32| async move { if n > 10 { Some(n) } else { None } })
566 .await
567 .deliver_as::<Option<i32>>()
568 .await;
569
570 assert_eq!(result1.is_some(), false);
571
572 let result2 = Pipeline::pass(100)
573 .await
574 .some_fn(|n: i32| async move { if n > 10 { Some(n) } else { None } })
575 .await
576 .deliver_as::<Option<i32>>()
577 .await;
578
579 assert_eq!(result2.is_some(), true);
580 }
581
582 #[tokio::test]
583 async fn test_some_flow() {
584 struct SomeI32;
585 #[async_trait::async_trait]
586 impl FamaPipe<i32, Option<i32>> for SomeI32 {
587 async fn receive_pipe_content(&self, n: i32) -> Option<i32> {
588 if n > 10 { Some(n) } else { None }
589 }
590 }
591 let result1 = Pipeline::pass(0)
592 .await
593 .some(SomeI32)
594 .await
595 .deliver_as::<Option<i32>>()
596 .await;
597
598 assert_eq!(result1.is_some(), false);
599
600 let result2 = Pipeline::pass(100)
601 .await
602 .some(SomeI32)
603 .await
604 .deliver_as::<Option<i32>>()
605 .await;
606
607 assert_eq!(result2.is_some(), true);
608 }
609
610 #[tokio::test]
611 async fn test_result_flow_fn() {
612 let result1 = Pipeline::pass(0)
613 .await
614 .ok_fn(|n: i32| async move { if n > 10 { Ok::<i32, ()>(n) } else { Err(()) } })
615 .await
616 .deliver_as::<Result<i32, ()>>()
617 .await;
618
619 assert_eq!(result1.is_err(), true);
620
621 let result2 = Pipeline::pass(100)
622 .await
623 .ok_fn(|n: i32| async move { if n > 10 { Ok::<i32, ()>(n) } else { Err(()) } })
624 .await
625 .deliver_as::<Result<i32, ()>>()
626 .await;
627
628 assert_eq!(result2.is_ok(), true);
629 }
630
631 #[tokio::test]
632 async fn test_result_flow() {
633 struct SomeI32;
634 #[async_trait::async_trait]
635 impl FamaPipe<i32, Result<i32, ()>> for SomeI32 {
636 async fn receive_pipe_content(&self, n: i32) -> Result<i32, ()> {
637 if n > 10 { Ok(n) } else { Err(()) }
638 }
639 }
640 let result1 = Pipeline::pass(0)
641 .await
642 .ok(SomeI32)
643 .await
644 .deliver_as::<Result<i32, ()>>()
645 .await;
646
647 assert_eq!(result1.is_err(), true);
648
649 let result2 = Pipeline::pass(100)
650 .await
651 .ok(SomeI32)
652 .await
653 .deliver_as::<Result<i32, ()>>()
654 .await;
655
656 assert_eq!(result2.is_ok(), true);
657 }
658
659 #[tokio::test]
660 async fn test_deliver_as() {
661 let result: bool = Pipeline::pass(0)
662 .await
663 .store_fn(|num: i32| async move { num + 1 })
664 .await
665 .store_fn(|num: i32| async move { num + 4 })
666 .await
667 .store_fn(|num: i32| async move { num * 5 })
668 .await
669 .store_fn(|num: i32| async move { num == 25 })
670 .await
671 .deliver_as()
672 .await;
673
674 assert_eq!(result, true);
675 }
676}