rush_core/
task_pool.rs

1use crate::Rush;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6#[derive(Debug)]
7pub struct MulMsg {
8    pub result: anyhow::Result<bool>,
9    pub rule_name: String,
10}
11impl MulMsg {
12    pub fn new(result: anyhow::Result<bool>, rule_name: String) -> Self {
13        Self { result, rule_name }
14    }
15}
16
17#[derive(Debug)]
18pub struct MultiRush {
19    rush: Arc<Rush>,
20}
21impl MultiRush {
22    pub async fn multi_flow<Obj: Serialize, Out: for<'de> Deserialize<'de>>(
23        &self,
24        obj: Obj,
25    ) -> anyhow::Result<Out> {
26        let obj: Value = serde_json::to_value(obj)?;
27        let (send, mut recv) = mpsc::channel(self.rush.nodes.len());
28        let obj = Arc::new(obj);
29        for (k, _) in self.rush.nodes.iter() {
30            let rh = self.rush.clone();
31            let rule_name = k.to_string();
32            let obj = obj.clone();
33            let send = send.clone();
34            tokio::spawn(async move {
35                let cs = if let Some(i) = rh.nodes.get(rule_name.as_str()) {
36                    i
37                } else {
38                    return; //not to here
39                };
40                for i in cs.iter() {
41                    match i.when(rh.functions.share(), &obj) {
42                        Ok(b) => {
43                            if !b {
44                                if let Err(e) =
45                                    send.send(MulMsg::new(Ok(b), rule_name.clone())).await
46                                {
47                                    println!("rush.multi_flow false recv is close:{}", e);
48                                }
49                                return;
50                            }
51                        }
52                        Err(e) => {
53                            if let Err(e) = send.send(MulMsg::new(Err(e), rule_name.clone())).await
54                            {
55                                println!("rush.multi_flow  error recv is close:{}", e);
56                            }
57                            return;
58                        }
59                    }
60                }
61                if let Err(e) = send.send(MulMsg::new(Ok(true), rule_name)).await {
62                    println!("rush.multi_flow over recv is close:{}", e);
63                }
64            });
65        }
66        let mut rules = vec![];
67        for _ in 0..self.rush.nodes.len() {
68            if let Some(i) = recv.recv().await {
69                if i.result? {
70                    rules.push(i.rule_name);
71                }
72            } else {
73                println!("rush.multi_flow should is not null");
74                break;
75            }
76        }
77        drop(send);
78        let val = self.rush.execute(&obj, rules)?;
79        let val = serde_json::from_value(val)?;
80        Ok(val)
81    }
82}
83
84impl From<Rush> for MultiRush {
85    fn from(value: Rush) -> Self {
86        Self {
87            rush: Arc::new(value),
88        }
89    }
90}