acts_next/export/
channel.rs

1use crate::{scheduler::Runtime, utils, Event, Message};
2use std::sync::Arc;
3use tracing::{debug, error, info};
4
5#[derive(Debug, Clone)]
6pub struct ChannelOptions {
7    pub id: String,
8
9    /// need ack the message
10    pub ack: bool,
11
12    /// use the glob pattern to match the message type
13    /// eg. {workflow,step,branch,req,msg}
14    pub r#type: String,
15    /// use the glob pattern to match the message state
16    /// eg. {created,completed}
17    pub state: String,
18    /// use the glob pattern to match the message tag or model tag
19    /// eg. *tag1*
20    pub tag: String,
21    /// use the blob pattern to match the message key
22    /// eg. key1*
23    pub key: String,
24}
25
26impl Default for ChannelOptions {
27    fn default() -> Self {
28        Self {
29            id: utils::shortid(),
30            ack: false,
31            r#type: "*".to_string(),
32            state: "*".to_string(),
33            tag: "*".to_string(),
34            key: "*".to_string(),
35        }
36    }
37}
38
39impl ChannelOptions {
40    pub fn pattern(&self) -> String {
41        format!("{}:{}:{}:{}", self.r#type, self.state, self.tag, self.key)
42    }
43}
44
45/// Just a export struct for the event::Emitter
46///
47pub struct Channel {
48    runtime: Arc<Runtime>,
49    ack: bool,
50    chan_id: String,
51    pattern: String,
52    glob: (
53        globset::GlobMatcher,
54        globset::GlobMatcher,
55        globset::GlobMatcher,
56        globset::GlobMatcher,
57    ),
58}
59
60impl Channel {
61    pub fn new(rt: &Arc<Runtime>) -> Self {
62        Self::channel(rt, &ChannelOptions::default())
63    }
64
65    /// create a emit channel to receive message
66    /// if the message is not received by client, the engine will re-send at the next time interval
67    #[allow(clippy::self_named_constructors)]
68    pub fn channel(rt: &Arc<Runtime>, options: &ChannelOptions) -> Self {
69        debug!("channel: {options:?}");
70        let pat_type = globset::Glob::new(&options.r#type)
71            .unwrap()
72            .compile_matcher();
73        let pat_state = globset::Glob::new(&options.state)
74            .unwrap()
75            .compile_matcher();
76        let pat_tag = globset::Glob::new(&options.tag).unwrap().compile_matcher();
77        let pat_key = globset::Glob::new(&options.key).unwrap().compile_matcher();
78
79        Self {
80            runtime: rt.clone(),
81            ack: options.ack,
82            chan_id: options.id.clone(),
83            pattern: options.pattern(),
84            glob: (pat_type, pat_state, pat_tag, pat_key),
85        }
86    }
87
88    ///  Receive act message
89    ///
90    /// Example
91    /// ```rust,no_run
92    /// use acts_next::{Engine, Act, Workflow, Vars, Message};
93    ///
94    /// #[tokio::main]
95    /// async fn main() {
96    ///     let engine = Engine::new();
97    ///     let workflow = Workflow::new().with_id("m1").with_step(|step| {
98    ///             step.with_id("step1").with_act(Act::new().with_act("irq").with_key("act1"))
99    ///     });
100    ///
101    ///     engine.channel().on_message(move |e| {
102    ///         if e.r#type == "irq" {
103    ///             println!("act message: state={} inputs={:?} outputs={:?}", e.state, e.inputs, e.outputs);
104    ///         }
105    ///     });
106    ///     let exec = engine.executor();
107    ///     exec.model().deploy(&workflow).expect("fail to deploy workflow");
108    ///     let mut vars = Vars::new();
109    ///     vars.insert("pid".into(), "w1".into());
110    ///     exec.proc().start(
111    ///        &workflow.id,
112    ///        &vars,
113    ///    );
114    /// }
115    /// ```
116    pub fn on_message(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
117        let glob = self.glob.clone();
118        let runtime = self.runtime.clone();
119        let ack = self.ack;
120        let chan_id = self.chan_id.clone();
121        let pattern = self.pattern.clone();
122        self.runtime.emitter().on_message(&self.chan_id, move |e| {
123            info!("on_message: chan={} {e:?}", chan_id);
124            if is_match(&glob, e) {
125                store_if(&runtime, ack, &chan_id, &pattern, e);
126                f(e);
127            }
128        });
129    }
130
131    pub fn on_start(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
132        let glob = self.glob.clone();
133        let runtime = self.runtime.clone();
134        let ack = self.ack;
135        let chan_id = self.chan_id.clone();
136        let pattern = self.pattern.clone();
137        self.runtime.emitter().on_start(&self.chan_id, move |e| {
138            if is_match(&glob, e) {
139                store_if(&runtime, ack, &chan_id, &pattern, e);
140                f(e);
141            }
142        });
143    }
144
145    pub fn on_complete(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
146        let glob = self.glob.clone();
147        let runtime = self.runtime.clone();
148        let ack = self.ack;
149        let chan_id = self.chan_id.clone();
150        let pattern = self.pattern.clone();
151        self.runtime.emitter().on_complete(&self.chan_id, move |e| {
152            if is_match(&glob, e) {
153                store_if(&runtime, ack, &chan_id, &pattern, e);
154                f(e);
155            }
156        });
157    }
158
159    pub fn on_error(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
160        let glob = self.glob.clone();
161        let runtime = self.runtime.clone();
162        let ack = self.ack;
163        let chan_id = self.chan_id.clone();
164        let pattern = self.pattern.clone();
165        self.runtime.emitter().on_error(&self.chan_id, move |e| {
166            if is_match(&glob, e) {
167                store_if(&runtime, ack, &chan_id, &pattern, e);
168                f(e);
169            }
170        });
171    }
172
173    pub fn close(&self) {
174        self.runtime.emitter().remove(&self.chan_id);
175    }
176}
177
178fn store_if(runtime: &Arc<Runtime>, ack: bool, chan_id: &str, pattern: &str, message: &Message) {
179    if ack && !chan_id.is_empty() && message.retry_times == 0 {
180        println!("store: {message:?}");
181        let msg = message.into(chan_id, pattern);
182        runtime
183            .cache()
184            .store()
185            .base()
186            .messages()
187            .create(&msg)
188            .unwrap_or_else(|err| {
189                error!("channel.store_if_emit_id: {}", err.to_string());
190                eprintln!("channel.store_if_emit_id: {}", err);
191                false
192            });
193    }
194}
195
196fn is_match(
197    glob: &(
198        globset::GlobMatcher,
199        globset::GlobMatcher,
200        globset::GlobMatcher,
201        globset::GlobMatcher,
202    ),
203    e: &Event<Message>,
204) -> bool {
205    let (pat_type, pat_state, pat_tag, pat_key) = glob;
206    pat_type.is_match(&e.r#type)
207        && pat_state.is_match(e.state.as_ref())
208        && (pat_tag.is_match(&e.tag) || pat_tag.is_match(&e.model.tag))
209        && pat_key.is_match(&e.key)
210}