Skip to main content

acts_next/event/
message.rs

1use crate::{TaskState, Vars, data, utils};
2use core::fmt;
3use serde::{Deserialize, Serialize};
4use std::str::FromStr;
5
6#[derive(
7    Serialize,
8    Deserialize,
9    Debug,
10    Clone,
11    Copy,
12    Default,
13    PartialEq,
14    strum::AsRefStr,
15    strum::EnumString,
16)]
17#[serde(rename_all = "snake_case")]
18#[strum(serialize_all = "snake_case")]
19pub enum MessageState {
20    #[default]
21    None,
22    Created,
23    Completed,
24    Submitted,
25    Backed,
26    Cancelled,
27    Aborted,
28    Skipped,
29    Error,
30    Removed,
31}
32
33#[derive(Default, Serialize, Deserialize, Clone, Debug)]
34pub struct Model {
35    /// workflow id
36    pub id: String,
37
38    /// workflow tag
39    pub tag: String,
40
41    /// workflow name
42    pub name: String,
43}
44
45#[derive(Default, Serialize, Deserialize, Clone, Debug)]
46pub struct Message {
47    /// message id
48    pub id: String,
49
50    /// task id
51    pub tid: String,
52
53    /// node name or action name
54    pub name: String,
55
56    /// task action state
57    pub state: MessageState,
58
59    /// message type
60    /// workflow | step | branch | act
61    pub r#type: String,
62
63    pub model: Model,
64
65    /// process id
66    pub pid: String,
67
68    /// node id
69    pub nid: String,
70
71    /// model id
72    pub mid: String,
73
74    /// node id or act
75    /// if the key is empty, just using nid as the key
76    pub key: String,
77
78    /// used package name
79    pub uses: String,
80
81    /// from the task inputs
82    pub inputs: Vars,
83
84    /// set the outputs vars when complete the action
85    pub outputs: Vars,
86
87    /// tag to distinguish different message
88    /// it is from node tag or group tag
89    pub tag: String,
90
91    /// task start time in million second
92    pub start_time: i64,
93
94    /// task end time in million second
95    pub end_time: i64,
96
97    /// record the message retry times
98    pub retry_times: i32,
99}
100
101impl Message {
102    pub fn state(&self) -> MessageState {
103        self.state
104    }
105
106    pub fn is_key(&self, key: &str) -> bool {
107        self.key == key
108    }
109
110    pub fn is_uses(&self, uses: &str) -> bool {
111        self.uses == uses
112    }
113
114    pub fn is_irq(&self) -> bool {
115        self.uses == "acts.core.irq"
116    }
117
118    pub fn is_msg(&self) -> bool {
119        self.uses == "acts.core.msg"
120    }
121
122    pub fn is_state(&self, state: MessageState) -> bool {
123        self.state == state
124    }
125
126    pub fn is_type(&self, t: &str) -> bool {
127        self.r#type == t
128    }
129
130    pub fn is_tag(&self, tag: &str) -> bool {
131        self.tag == tag
132    }
133
134    pub fn type_of(&self, mtype: &str) -> Option<&Self> {
135        if self.r#type == mtype {
136            return Some(self);
137        }
138        None
139    }
140
141    pub fn tag_of(&self, tag: &str) -> Option<&Self> {
142        if tag == self.tag {
143            return Some(self);
144        }
145
146        None
147    }
148
149    pub fn key_of(&self, key: &str) -> Option<&Self> {
150        if key == self.key {
151            return Some(self);
152        }
153
154        None
155    }
156
157    /// workflow cost in million seconds
158    pub fn cost(&self) -> i64 {
159        if self.state().is_completed() {
160            return self.end_time - self.start_time;
161        }
162
163        0
164    }
165
166    pub fn into(&self, emit_id: &str, pat: &str) -> data::Message {
167        let value = self.clone();
168        data::Message {
169            id: value.id,
170            tid: value.tid,
171            name: value.name,
172            state: value.state,
173            r#type: value.r#type,
174            model: serde_json::to_string(&value.model).unwrap(),
175            pid: value.pid,
176            nid: value.nid,
177            mid: value.mid,
178            key: value.key,
179            uses: value.uses,
180            inputs: value.inputs.to_string(),
181            outputs: value.outputs.to_string(),
182            tag: value.tag,
183            start_time: value.start_time,
184            end_time: value.end_time,
185            chan_id: emit_id.to_string(),
186            chan_pattern: pat.to_string(),
187            create_time: utils::time::time_millis(),
188            update_time: 0,
189            retry_times: 0,
190            timestamp: utils::time::timestamp(),
191            status: data::MessageStatus::Created,
192        }
193    }
194}
195
196impl MessageState {
197    pub fn is_completed(&self) -> bool {
198        matches!(
199            self,
200            MessageState::Completed
201                | MessageState::Cancelled
202                | MessageState::Submitted
203                | MessageState::Backed
204                | MessageState::Error
205                | MessageState::Skipped
206                | MessageState::Aborted
207                | MessageState::Removed
208        )
209    }
210}
211
212impl fmt::Display for MessageState {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
214        let s: String = self.into();
215        f.write_str(&s)
216    }
217}
218
219impl From<TaskState> for MessageState {
220    fn from(state: TaskState) -> Self {
221        match state {
222            TaskState::None => MessageState::None,
223            TaskState::Ready | TaskState::Pending | TaskState::Running | TaskState::Interrupt => {
224                MessageState::Created
225            }
226            TaskState::Completed => MessageState::Completed,
227            TaskState::Submitted => MessageState::Submitted,
228            TaskState::Backed => MessageState::Backed,
229            TaskState::Cancelled => MessageState::Cancelled,
230            TaskState::Error => MessageState::Error,
231            TaskState::Aborted => MessageState::Aborted,
232            TaskState::Skipped => MessageState::Skipped,
233            TaskState::Removed => MessageState::Removed,
234        }
235    }
236}
237
238impl From<MessageState> for String {
239    fn from(state: MessageState) -> Self {
240        state.as_ref().to_string()
241    }
242}
243
244impl From<data::Message> for Message {
245    fn from(v: data::Message) -> Self {
246        Self {
247            id: v.id,
248            tid: v.tid,
249            name: v.name,
250            state: v.state,
251            r#type: v.r#type,
252            model: serde_json::from_str(&v.model).unwrap_or_default(),
253            pid: v.pid,
254            nid: v.nid,
255            mid: v.mid,
256            key: v.key,
257            uses: v.uses,
258            inputs: serde_json::from_str(&v.inputs).unwrap_or_default(),
259            outputs: serde_json::from_str(&v.outputs).unwrap_or_default(),
260            tag: v.tag,
261            start_time: v.start_time,
262            end_time: v.end_time,
263            retry_times: v.retry_times,
264        }
265    }
266}
267
268impl From<String> for MessageState {
269    fn from(str: String) -> Self {
270        Self::from_str(str.as_ref()).unwrap_or_default()
271    }
272}
273
274impl From<&MessageState> for String {
275    fn from(state: &MessageState) -> Self {
276        state.as_ref().to_string()
277    }
278}