1use nps_core::codec::FrameDict;
5use nps_core::error::{NpsError, NpsResult};
6use nps_core::frames::FrameType;
7use serde_json::{json, Value};
8
9fn get_str<'a>(d: &'a FrameDict, k: &str) -> NpsResult<&'a str> {
10 d.get(k)
11 .and_then(Value::as_str)
12 .ok_or_else(|| NpsError::Frame(format!("missing field: {k}")))
13}
14
15fn opt_str<'a>(d: &'a FrameDict, k: &str) -> Option<&'a str> {
16 d.get(k).and_then(Value::as_str)
17}
18
19fn opt_u64(d: &FrameDict, k: &str) -> Option<u64> {
20 d.get(k).and_then(Value::as_u64)
21}
22
23fn opt_i64(d: &FrameDict, k: &str) -> Option<i64> {
24 d.get(k).and_then(Value::as_i64)
25}
26
27#[derive(Debug, Clone)]
30pub struct TaskFrame {
31 pub task_id: String,
32 pub dag: Value,
33 pub timeout_ms: Option<u64>,
34 pub callback_url: Option<String>,
35 pub context: Option<Value>,
36 pub priority: Option<String>,
37 pub depth: Option<i64>,
38 pub compensation_policy: Option<String>,
39 pub result_ttl_seconds: u64,
41}
42
43impl TaskFrame {
44 pub fn frame_type() -> FrameType {
45 FrameType::Task
46 }
47
48 pub fn to_dict(&self) -> FrameDict {
49 let mut m = serde_json::Map::new();
50 m.insert("task_id".into(), json!(self.task_id));
51 m.insert("dag".into(), self.dag.clone());
52 if let Some(v) = self.timeout_ms {
53 m.insert("timeout_ms".into(), json!(v));
54 }
55 if let Some(v) = &self.callback_url {
56 m.insert("callback_url".into(), json!(v));
57 }
58 if let Some(v) = &self.context {
59 m.insert("context".into(), v.clone());
60 }
61 if let Some(v) = &self.priority {
62 m.insert("priority".into(), json!(v));
63 }
64 if let Some(v) = self.depth {
65 m.insert("depth".into(), json!(v));
66 }
67 if let Some(v) = &self.compensation_policy {
68 m.insert("compensation_policy".into(), json!(v));
69 }
70 if self.result_ttl_seconds != 3600 {
71 m.insert("result_ttl_seconds".into(), json!(self.result_ttl_seconds));
72 }
73 m
74 }
75
76 pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
77 Ok(TaskFrame {
78 task_id: get_str(d, "task_id")?.to_string(),
79 dag: d.get("dag").cloned().unwrap_or(Value::Null),
80 timeout_ms: opt_u64(d, "timeout_ms"),
81 callback_url: opt_str(d, "callback_url").map(str::to_string),
82 context: d.get("context").cloned(),
83 priority: opt_str(d, "priority").map(str::to_string),
84 depth: opt_i64(d, "depth"),
85 compensation_policy: opt_str(d, "compensation_policy").map(str::to_string),
86 result_ttl_seconds: opt_u64(d, "result_ttl_seconds").unwrap_or(3600),
87 })
88 }
89}
90
91#[derive(Debug, Clone)]
94pub struct DelegateFrame {
95 pub task_id: String,
96 pub subtask_id: String,
97 pub action: String,
98 pub target_nid: String,
99 pub inputs: Option<Value>,
100 pub config: Option<Value>,
101 pub idempotency_key: Option<String>,
102 pub target_cluster_anchor: Option<String>,
103}
104
105impl DelegateFrame {
106 pub fn frame_type() -> FrameType {
107 FrameType::Delegate
108 }
109
110 pub fn to_dict(&self) -> FrameDict {
111 let mut m = serde_json::Map::new();
112 m.insert("task_id".into(), json!(self.task_id));
113 m.insert("subtask_id".into(), json!(self.subtask_id));
114 m.insert("action".into(), json!(self.action));
115 m.insert("target_nid".into(), json!(self.target_nid));
116 if let Some(v) = &self.inputs {
117 m.insert("inputs".into(), v.clone());
118 }
119 if let Some(v) = &self.config {
120 m.insert("config".into(), v.clone());
121 }
122 if let Some(v) = &self.idempotency_key {
123 m.insert("idempotency_key".into(), json!(v));
124 }
125 if let Some(v) = &self.target_cluster_anchor {
126 m.insert("target_cluster_anchor".into(), json!(v));
127 }
128 m
129 }
130
131 pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
132 Ok(DelegateFrame {
133 task_id: get_str(d, "task_id")?.to_string(),
134 subtask_id: get_str(d, "subtask_id")?.to_string(),
135 action: get_str(d, "action")?.to_string(),
136 target_nid: get_str(d, "target_nid")?.to_string(),
137 inputs: d.get("inputs").cloned(),
138 config: d.get("config").cloned(),
139 idempotency_key: opt_str(d, "idempotency_key").map(str::to_string),
140 target_cluster_anchor: opt_str(d, "target_cluster_anchor").map(str::to_string),
141 })
142 }
143}
144
145#[derive(Debug, Clone)]
148pub struct SyncFrame {
149 pub task_id: String,
150 pub sync_id: String,
151 pub subtask_ids: Vec<String>,
152 pub min_required: i64,
153 pub aggregate: String,
154 pub timeout_ms: Option<u64>,
155}
156
157impl SyncFrame {
158 pub fn frame_type() -> FrameType {
159 FrameType::Sync
160 }
161
162 pub fn to_dict(&self) -> FrameDict {
163 let mut m = serde_json::Map::new();
164 m.insert("task_id".into(), json!(self.task_id));
165 m.insert("sync_id".into(), json!(self.sync_id));
166 m.insert("subtask_ids".into(), json!(self.subtask_ids));
167 m.insert("min_required".into(), json!(self.min_required));
168 m.insert("aggregate".into(), json!(self.aggregate));
169 if let Some(v) = self.timeout_ms {
170 m.insert("timeout_ms".into(), json!(v));
171 }
172 m
173 }
174
175 pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
176 let subtask_ids = d
177 .get("subtask_ids")
178 .and_then(Value::as_array)
179 .map(|a| {
180 a.iter()
181 .filter_map(Value::as_str)
182 .map(str::to_string)
183 .collect()
184 })
185 .unwrap_or_default();
186 Ok(SyncFrame {
187 task_id: get_str(d, "task_id")?.to_string(),
188 sync_id: get_str(d, "sync_id")?.to_string(),
189 subtask_ids,
190 min_required: opt_i64(d, "min_required").unwrap_or(0),
191 aggregate: opt_str(d, "aggregate").unwrap_or("merge").to_string(),
192 timeout_ms: opt_u64(d, "timeout_ms"),
193 })
194 }
195}
196
197#[derive(Debug, Clone)]
200pub struct AlignStreamFrame {
201 pub sync_id: String,
202 pub task_id: String,
203 pub subtask_id: String,
204 pub seq: u64,
205 pub is_final: bool,
206 pub source_nid: Option<String>,
207 pub result: Option<Value>,
208 pub error: Option<Value>,
209 pub window_size: Option<u64>,
210 pub ack_seq: Option<u64>,
211 pub nak_seq: Option<u64>,
212}
213
214impl AlignStreamFrame {
215 pub fn frame_type() -> FrameType {
216 FrameType::AlignStream
217 }
218
219 pub fn error_code(&self) -> Option<&str> {
220 self.error
221 .as_ref()
222 .and_then(|e| e.get("error_code"))
223 .and_then(Value::as_str)
224 }
225
226 pub fn error_message(&self) -> Option<&str> {
227 self.error
228 .as_ref()
229 .and_then(|e| e.get("message"))
230 .and_then(Value::as_str)
231 }
232
233 pub fn to_dict(&self) -> FrameDict {
234 let mut m = serde_json::Map::new();
235 m.insert("sync_id".into(), json!(self.sync_id));
236 m.insert("task_id".into(), json!(self.task_id));
237 m.insert("subtask_id".into(), json!(self.subtask_id));
238 m.insert("seq".into(), json!(self.seq));
239 m.insert("is_final".into(), json!(self.is_final));
240 if let Some(v) = &self.source_nid {
241 m.insert("source_nid".into(), json!(v));
242 }
243 if let Some(v) = &self.result {
244 m.insert("result".into(), v.clone());
245 }
246 if let Some(v) = &self.error {
247 m.insert("error".into(), v.clone());
248 }
249 if let Some(v) = self.window_size {
250 m.insert("window_size".into(), json!(v));
251 }
252 if let Some(v) = self.ack_seq {
253 m.insert("ack_seq".into(), json!(v));
254 }
255 if let Some(v) = self.nak_seq {
256 m.insert("nak_seq".into(), json!(v));
257 }
258 m
259 }
260
261 pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
262 Ok(AlignStreamFrame {
263 sync_id: get_str(d, "sync_id")?.to_string(),
264 task_id: get_str(d, "task_id")?.to_string(),
265 subtask_id: get_str(d, "subtask_id")?.to_string(),
266 seq: opt_u64(d, "seq").unwrap_or(0),
267 is_final: d.get("is_final").and_then(Value::as_bool).unwrap_or(false),
268 source_nid: opt_str(d, "source_nid").map(str::to_string),
269 result: d.get("result").cloned(),
270 error: d.get("error").cloned(),
271 window_size: opt_u64(d, "window_size"),
272 ack_seq: opt_u64(d, "ack_seq"),
273 nak_seq: opt_u64(d, "nak_seq"),
274 })
275 }
276}