Skip to main content

acts_next/export/
channel.rs

1use crate::{Event, Message, scheduler::Runtime, utils};
2use std::sync::Arc;
3use tracing::{debug, error, info};
4
5#[derive(Debug, Clone)]
6pub struct ChannelOptions {
7    pub id: String,
8
9    /// need ack the message
10    pub ack: bool,
11
12    /// use the glob pattern to match the message type
13    /// eg. {workflow,step,branch,req,msg}
14    pub r#type: String,
15    /// use the glob pattern to match the message state
16    /// eg. {created,completed}
17    pub state: String,
18    /// use the glob pattern to match the message tag or model tag
19    /// eg. *tag1*
20    pub tag: String,
21    /// use the blob pattern to match the message key
22    /// eg. key1*
23    pub key: String,
24
25    /// use the glob pattern to match the message uses
26    pub uses: String,
27}
28
29impl Default for ChannelOptions {
30    fn default() -> Self {
31        Self {
32            id: utils::shortid(),
33            ack: false,
34            r#type: "*".to_string(),
35            state: "*".to_string(),
36            tag: "*".to_string(),
37            key: "*".to_string(),
38            uses: "*".to_string(),
39        }
40    }
41}
42
43impl ChannelOptions {
44    pub fn pattern(&self) -> String {
45        format!("{}:{}:{}:{}", self.r#type, self.state, self.tag, self.key)
46    }
47}
48
49/// Just a export struct for the event::Emitter
50///
51pub struct Channel {
52    runtime: Arc<Runtime>,
53    ack: bool,
54    chan_id: String,
55    pattern: String,
56    glob: (
57        globset::GlobMatcher,
58        globset::GlobMatcher,
59        globset::GlobMatcher,
60        globset::GlobMatcher,
61        globset::GlobMatcher,
62    ),
63}
64
65impl Channel {
66    pub fn new(rt: &Arc<Runtime>) -> Self {
67        Self::channel(rt, &ChannelOptions::default())
68    }
69
70    /// create a emit channel to receive message
71    /// if the message is not received by client, the engine will re-send at the next time interval
72    #[allow(clippy::self_named_constructors)]
73    pub fn channel(rt: &Arc<Runtime>, options: &ChannelOptions) -> Self {
74        debug!("channel: {options:?}");
75        let pat_type = globset::Glob::new(&options.r#type)
76            .unwrap()
77            .compile_matcher();
78        let pat_state = globset::Glob::new(&options.state)
79            .unwrap()
80            .compile_matcher();
81        let pat_tag = globset::Glob::new(&options.tag).unwrap().compile_matcher();
82        let pat_key = globset::Glob::new(&options.key).unwrap().compile_matcher();
83        let pat_uses = globset::Glob::new(&options.uses).unwrap().compile_matcher();
84        Self {
85            runtime: rt.clone(),
86            ack: options.ack,
87            chan_id: options.id.clone(),
88            pattern: options.pattern(),
89            glob: (pat_type, pat_state, pat_tag, pat_key, pat_uses),
90        }
91    }
92
93    ///  Receive act message
94    ///
95    /// Example
96    /// ```rust,no_run
97    /// use acts::{Engine, Act, Workflow, Vars, Message};
98    ///
99    /// #[tokio::main]
100    /// async fn main() {
101    ///     let engine = Engine::new().start();
102    ///     let workflow = Workflow::new().with_id("m1").with_step(|step| {
103    ///             step.with_id("step1").with_act(Act::irq(|act| act.with_key("act1")))
104    ///     });
105    ///
106    ///     engine.channel().on_message(move |e| {
107    ///         if e.is_irq() {
108    ///             println!("act message: state={} inputs={:?} outputs={:?}", e.state, e.inputs, e.outputs);
109    ///         }
110    ///     });
111    ///     let exec = engine.executor();
112    ///     exec.model().deploy(&workflow).expect("fail to deploy workflow");
113    ///     let mut vars = Vars::new();
114    ///     vars.insert("pid".into(), "w1".into());
115    ///     exec.proc().start(
116    ///        &workflow.id,
117    ///        &vars,
118    ///    );
119    /// }
120    /// ```
121    pub fn on_message(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
122        let glob = self.glob.clone();
123        let runtime = self.runtime.clone();
124        let ack = self.ack;
125        let chan_id = self.chan_id.clone();
126        let pattern = self.pattern.clone();
127        self.runtime.emitter().on_message(&self.chan_id, move |e| {
128            info!("on_message: chan={} {e:?}", chan_id);
129            if is_match(&glob, e) {
130                store_if(&runtime, ack, &chan_id, &pattern, e);
131                f(e);
132            }
133        });
134    }
135
136    pub fn on_start(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
137        let glob = self.glob.clone();
138        let runtime = self.runtime.clone();
139        let ack = self.ack;
140        let chan_id = self.chan_id.clone();
141        let pattern = self.pattern.clone();
142        self.runtime.emitter().on_start(&self.chan_id, move |e| {
143            if is_match(&glob, e) {
144                store_if(&runtime, ack, &chan_id, &pattern, e);
145                f(e);
146            }
147        });
148    }
149
150    pub fn on_complete(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
151        let glob = self.glob.clone();
152        let runtime = self.runtime.clone();
153        let ack = self.ack;
154        let chan_id = self.chan_id.clone();
155        let pattern = self.pattern.clone();
156        self.runtime.emitter().on_complete(&self.chan_id, move |e| {
157            if is_match(&glob, e) {
158                store_if(&runtime, ack, &chan_id, &pattern, e);
159                f(e);
160            }
161        });
162    }
163
164    pub fn on_error(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
165        let glob = self.glob.clone();
166        let runtime = self.runtime.clone();
167        let ack = self.ack;
168        let chan_id = self.chan_id.clone();
169        let pattern = self.pattern.clone();
170        self.runtime.emitter().on_error(&self.chan_id, move |e| {
171            if is_match(&glob, e) {
172                store_if(&runtime, ack, &chan_id, &pattern, e);
173                f(e);
174            }
175        });
176    }
177
178    pub fn close(&self) {
179        self.runtime.emitter().remove(&self.chan_id);
180    }
181}
182
183fn store_if(runtime: &Arc<Runtime>, ack: bool, chan_id: &str, pattern: &str, message: &Message) {
184    if ack && !chan_id.is_empty() && message.retry_times == 0 {
185        info!("store: {message:?}");
186        let msg = message.into(chan_id, pattern);
187        runtime
188            .cache()
189            .store()
190            .messages()
191            .create(&msg)
192            .unwrap_or_else(|err| {
193                error!("channel.store_if_emit_id: {}", err.to_string());
194                eprintln!("channel.store_if_emit_id: {}", err);
195                false
196            });
197    }
198}
199
200fn is_match(
201    glob: &(
202        globset::GlobMatcher,
203        globset::GlobMatcher,
204        globset::GlobMatcher,
205        globset::GlobMatcher,
206        globset::GlobMatcher,
207    ),
208    e: &Event<Message>,
209) -> bool {
210    let (pat_type, pat_state, pat_tag, pat_key, pat_uses) = glob;
211    pat_type.is_match(&e.r#type)
212        && pat_state.is_match(e.state.as_ref())
213        && (pat_tag.is_match(&e.tag) || pat_tag.is_match(&e.model.tag))
214        && pat_key.is_match(&e.key)
215        && pat_uses.is_match(&e.uses)
216}