acts/event/
message.rs

1use crate::{data, utils, TaskState, Vars};
2use core::fmt;
3use serde::{Deserialize, Serialize};
4use std::str::FromStr;
5
6#[derive(
7    Serialize,
8    Deserialize,
9    Debug,
10    Clone,
11    Copy,
12    Default,
13    PartialEq,
14    strum::AsRefStr,
15    strum::EnumString,
16)]
17#[serde(rename_all = "snake_case")]
18#[strum(serialize_all = "snake_case")]
19pub enum MessageState {
20    #[default]
21    None,
22    Created,
23    Completed,
24    Submitted,
25    Backed,
26    Cancelled,
27    Aborted,
28    Skipped,
29    Error,
30    Removed,
31}
32
33#[derive(Default, Serialize, Deserialize, Clone, Debug)]
34pub struct Model {
35    /// workflow id
36    pub id: String,
37
38    /// workflow tag
39    pub tag: String,
40
41    /// workflow name
42    pub name: String,
43}
44
45#[derive(Default, Serialize, Deserialize, Clone, Debug)]
46pub struct Message {
47    /// message id
48    pub id: String,
49
50    /// task id
51    pub tid: String,
52
53    /// node name or action name
54    pub name: String,
55
56    /// task action state
57    pub state: MessageState,
58
59    /// message type
60    /// workflow | step | branch | msg | irq
61    pub r#type: String,
62
63    // node kind
64    pub source: String,
65
66    pub model: Model,
67
68    /// process id
69    pub pid: String,
70
71    /// node id
72    pub nid: String,
73
74    /// model id
75    pub mid: String,
76
77    /// node id or act key
78    /// if the key is empty, just using nid as the key
79    pub key: String,
80
81    /// from the task inputs
82    pub inputs: Vars,
83
84    /// set the outputs vars when complete the action
85    pub outputs: Vars,
86
87    /// tag to distinguish different message
88    /// it is from node tag or group tag
89    pub tag: String,
90
91    /// task start time in million second
92    pub start_time: i64,
93
94    /// task end time in million second
95    pub end_time: i64,
96
97    /// record the message retry times
98    pub retry_times: i32,
99}
100
101impl Message {
102    pub fn state(&self) -> MessageState {
103        self.state
104    }
105
106    pub fn is_key(&self, key: &str) -> bool {
107        self.key == key
108    }
109
110    pub fn is_state(&self, state: MessageState) -> bool {
111        self.state == state
112    }
113
114    pub fn is_type(&self, t: &str) -> bool {
115        self.r#type == t
116    }
117
118    pub fn is_source(&self, t: &str) -> bool {
119        self.source == t
120    }
121
122    pub fn is_tag(&self, tag: &str) -> bool {
123        self.tag == tag
124    }
125
126    pub fn type_of(&self, mtype: &str) -> Option<&Self> {
127        if self.r#type == mtype {
128            return Some(self);
129        }
130        None
131    }
132
133    pub fn tag_of(&self, tag: &str) -> Option<&Self> {
134        if tag == self.tag {
135            return Some(self);
136        }
137
138        None
139    }
140
141    pub fn key_of(&self, key: &str) -> Option<&Self> {
142        if key == self.key {
143            return Some(self);
144        }
145
146        None
147    }
148
149    /// workflow cost in million seconds
150    pub fn cost(&self) -> i64 {
151        if self.state().is_completed() {
152            return self.end_time - self.start_time;
153        }
154
155        0
156    }
157
158    pub fn into(&self, emit_id: &str, pat: &str) -> data::Message {
159        let value = self.clone();
160        data::Message {
161            id: value.id,
162            tid: value.tid,
163            name: value.name,
164            state: value.state,
165            r#type: value.r#type,
166            source: value.source,
167            model: serde_json::to_string(&value.model).unwrap(),
168            pid: value.pid,
169            nid: value.nid,
170            mid: value.mid,
171            key: value.key,
172            inputs: value.inputs.to_string(),
173            outputs: value.outputs.to_string(),
174            tag: value.tag,
175            start_time: value.start_time,
176            end_time: value.end_time,
177            chan_id: emit_id.to_string(),
178            chan_pattern: pat.to_string(),
179            create_time: utils::time::time_millis(),
180            update_time: 0,
181            retry_times: 0,
182            timestamp: utils::time::timestamp(),
183            status: data::MessageStatus::Created,
184        }
185    }
186}
187
188impl MessageState {
189    pub fn is_completed(&self) -> bool {
190        matches!(
191            self,
192            MessageState::Completed
193                | MessageState::Cancelled
194                | MessageState::Submitted
195                | MessageState::Backed
196                | MessageState::Error
197                | MessageState::Skipped
198                | MessageState::Aborted
199                | MessageState::Removed
200        )
201    }
202}
203
204impl fmt::Display for MessageState {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
206        let s: String = self.into();
207        f.write_str(&s)
208    }
209}
210
211impl From<TaskState> for MessageState {
212    fn from(state: TaskState) -> Self {
213        match state {
214            TaskState::None => MessageState::None,
215            TaskState::Ready | TaskState::Pending | TaskState::Running | TaskState::Interrupt => {
216                MessageState::Created
217            }
218            TaskState::Completed => MessageState::Completed,
219            TaskState::Submitted => MessageState::Submitted,
220            TaskState::Backed => MessageState::Backed,
221            TaskState::Cancelled => MessageState::Cancelled,
222            TaskState::Error => MessageState::Error,
223            TaskState::Aborted => MessageState::Aborted,
224            TaskState::Skipped => MessageState::Skipped,
225            TaskState::Removed => MessageState::Removed,
226        }
227    }
228}
229
230impl From<MessageState> for String {
231    fn from(state: MessageState) -> Self {
232        state.as_ref().to_string()
233    }
234}
235
236impl From<data::Message> for Message {
237    fn from(v: data::Message) -> Self {
238        Self {
239            id: v.id,
240            tid: v.tid,
241            name: v.name,
242            state: v.state,
243            r#type: v.r#type,
244            source: v.source,
245            model: serde_json::from_str(&v.model).unwrap_or_default(),
246            pid: v.pid,
247            nid: v.nid,
248            mid: v.mid,
249            key: v.key,
250            inputs: serde_json::from_str(&v.inputs).unwrap_or_default(),
251            outputs: serde_json::from_str(&v.outputs).unwrap_or_default(),
252            tag: v.tag,
253            start_time: v.start_time,
254            end_time: v.end_time,
255            retry_times: v.retry_times,
256        }
257    }
258}
259
260impl From<String> for MessageState {
261    fn from(str: String) -> Self {
262        Self::from_str(str.as_ref()).unwrap_or_default()
263    }
264}
265
266impl From<&MessageState> for String {
267    fn from(state: &MessageState) -> Self {
268        state.as_ref().to_string()
269    }
270}