1use crate::Rush;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6#[derive(Debug)]
7pub struct MulMsg {
8 pub result: anyhow::Result<bool>,
9 pub rule_name: String,
10}
11impl MulMsg {
12 pub fn new(result: anyhow::Result<bool>, rule_name: String) -> Self {
13 Self { result, rule_name }
14 }
15}
16
17#[derive(Debug)]
18pub struct MultiRush {
19 rush: Arc<Rush>,
20}
21impl MultiRush {
22 pub async fn multi_flow<Obj: Serialize, Out: for<'de> Deserialize<'de>>(
23 &self,
24 obj: Obj,
25 ) -> anyhow::Result<Out> {
26 let obj: Value = serde_json::to_value(obj)?;
27 let (send, mut recv) = mpsc::channel(self.rush.nodes.len());
28 let obj = Arc::new(obj);
29 for (k, _) in self.rush.nodes.iter() {
30 let rh = self.rush.clone();
31 let rule_name = k.to_string();
32 let obj = obj.clone();
33 let send = send.clone();
34 tokio::spawn(async move {
35 let cs = if let Some(i) = rh.nodes.get(rule_name.as_str()) {
36 i
37 } else {
38 return; };
40 for i in cs.iter() {
41 match i.when(rh.functions.share(), &obj) {
42 Ok(b) => {
43 if !b {
44 if let Err(e) =
45 send.send(MulMsg::new(Ok(b), rule_name.clone())).await
46 {
47 println!("rush.multi_flow false recv is close:{}", e);
48 }
49 return;
50 }
51 }
52 Err(e) => {
53 if let Err(e) = send.send(MulMsg::new(Err(e), rule_name.clone())).await
54 {
55 println!("rush.multi_flow error recv is close:{}", e);
56 }
57 return;
58 }
59 }
60 }
61 if let Err(e) = send.send(MulMsg::new(Ok(true), rule_name)).await {
62 println!("rush.multi_flow over recv is close:{}", e);
63 }
64 });
65 }
66 let mut rules = vec![];
67 for _ in 0..self.rush.nodes.len() {
68 if let Some(i) = recv.recv().await {
69 if i.result? {
70 rules.push(i.rule_name);
71 }
72 } else {
73 println!("rush.multi_flow should is not null");
74 break;
75 }
76 }
77 drop(send);
78 let val = self.rush.execute(&obj, rules)?;
79 let val = serde_json::from_value(val)?;
80 Ok(val)
81 }
82}
83
84impl From<Rush> for MultiRush {
85 fn from(value: Rush) -> Self {
86 Self {
87 rush: Arc::new(value),
88 }
89 }
90}