1use crate::{data, utils, TaskState, Vars};
2use core::fmt;
3use serde::{Deserialize, Serialize};
4use std::str::FromStr;
5
6#[derive(
7 Serialize,
8 Deserialize,
9 Debug,
10 Clone,
11 Copy,
12 Default,
13 PartialEq,
14 strum::AsRefStr,
15 strum::EnumString,
16)]
17#[serde(rename_all = "snake_case")]
18#[strum(serialize_all = "snake_case")]
19pub enum MessageState {
20 #[default]
21 None,
22 Created,
23 Completed,
24 Submitted,
25 Backed,
26 Cancelled,
27 Aborted,
28 Skipped,
29 Error,
30 Removed,
31}
32
33#[derive(Default, Serialize, Deserialize, Clone, Debug)]
34pub struct Model {
35 pub id: String,
37
38 pub tag: String,
40
41 pub name: String,
43}
44
45#[derive(Default, Serialize, Deserialize, Clone, Debug)]
46pub struct Message {
47 pub id: String,
49
50 pub tid: String,
52
53 pub name: String,
55
56 pub state: MessageState,
58
59 pub r#type: String,
62
63 pub source: String,
65
66 pub model: Model,
67
68 pub pid: String,
70
71 pub nid: String,
73
74 pub mid: String,
76
77 pub key: String,
80
81 pub inputs: Vars,
83
84 pub outputs: Vars,
86
87 pub tag: String,
90
91 pub start_time: i64,
93
94 pub end_time: i64,
96
97 pub retry_times: i32,
99}
100
101impl Message {
102 pub fn state(&self) -> MessageState {
103 self.state
104 }
105
106 pub fn is_key(&self, key: &str) -> bool {
107 self.key == key
108 }
109
110 pub fn is_state(&self, state: MessageState) -> bool {
111 self.state == state
112 }
113
114 pub fn is_type(&self, t: &str) -> bool {
115 self.r#type == t
116 }
117
118 pub fn is_source(&self, t: &str) -> bool {
119 self.source == t
120 }
121
122 pub fn is_tag(&self, tag: &str) -> bool {
123 self.tag == tag
124 }
125
126 pub fn type_of(&self, mtype: &str) -> Option<&Self> {
127 if self.r#type == mtype {
128 return Some(self);
129 }
130 None
131 }
132
133 pub fn tag_of(&self, tag: &str) -> Option<&Self> {
134 if tag == self.tag {
135 return Some(self);
136 }
137
138 None
139 }
140
141 pub fn key_of(&self, key: &str) -> Option<&Self> {
142 if key == self.key {
143 return Some(self);
144 }
145
146 None
147 }
148
149 pub fn cost(&self) -> i64 {
151 if self.state().is_completed() {
152 return self.end_time - self.start_time;
153 }
154
155 0
156 }
157
158 pub fn into(&self, emit_id: &str, pat: &str) -> data::Message {
159 let value = self.clone();
160 data::Message {
161 id: value.id,
162 tid: value.tid,
163 name: value.name,
164 state: value.state,
165 r#type: value.r#type,
166 source: value.source,
167 model: serde_json::to_string(&value.model).unwrap(),
168 pid: value.pid,
169 nid: value.nid,
170 mid: value.mid,
171 key: value.key,
172 inputs: value.inputs.to_string(),
173 outputs: value.outputs.to_string(),
174 tag: value.tag,
175 start_time: value.start_time,
176 end_time: value.end_time,
177 chan_id: emit_id.to_string(),
178 chan_pattern: pat.to_string(),
179 create_time: utils::time::time_millis(),
180 update_time: 0,
181 retry_times: 0,
182 timestamp: utils::time::timestamp(),
183 status: data::MessageStatus::Created,
184 }
185 }
186}
187
188impl MessageState {
189 pub fn is_completed(&self) -> bool {
190 matches!(
191 self,
192 MessageState::Completed
193 | MessageState::Cancelled
194 | MessageState::Submitted
195 | MessageState::Backed
196 | MessageState::Error
197 | MessageState::Skipped
198 | MessageState::Aborted
199 | MessageState::Removed
200 )
201 }
202}
203
204impl fmt::Display for MessageState {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
206 let s: String = self.into();
207 f.write_str(&s)
208 }
209}
210
211impl From<TaskState> for MessageState {
212 fn from(state: TaskState) -> Self {
213 match state {
214 TaskState::None => MessageState::None,
215 TaskState::Ready | TaskState::Pending | TaskState::Running | TaskState::Interrupt => {
216 MessageState::Created
217 }
218 TaskState::Completed => MessageState::Completed,
219 TaskState::Submitted => MessageState::Submitted,
220 TaskState::Backed => MessageState::Backed,
221 TaskState::Cancelled => MessageState::Cancelled,
222 TaskState::Error => MessageState::Error,
223 TaskState::Aborted => MessageState::Aborted,
224 TaskState::Skipped => MessageState::Skipped,
225 TaskState::Removed => MessageState::Removed,
226 }
227 }
228}
229
230impl From<MessageState> for String {
231 fn from(state: MessageState) -> Self {
232 state.as_ref().to_string()
233 }
234}
235
236impl From<data::Message> for Message {
237 fn from(v: data::Message) -> Self {
238 Self {
239 id: v.id,
240 tid: v.tid,
241 name: v.name,
242 state: v.state,
243 r#type: v.r#type,
244 source: v.source,
245 model: serde_json::from_str(&v.model).unwrap_or_default(),
246 pid: v.pid,
247 nid: v.nid,
248 mid: v.mid,
249 key: v.key,
250 inputs: serde_json::from_str(&v.inputs).unwrap_or_default(),
251 outputs: serde_json::from_str(&v.outputs).unwrap_or_default(),
252 tag: v.tag,
253 start_time: v.start_time,
254 end_time: v.end_time,
255 retry_times: v.retry_times,
256 }
257 }
258}
259
260impl From<String> for MessageState {
261 fn from(str: String) -> Self {
262 Self::from_str(str.as_ref()).unwrap_or_default()
263 }
264}
265
266impl From<&MessageState> for String {
267 fn from(state: &MessageState) -> Self {
268 state.as_ref().to_string()
269 }
270}