Skip to main content

nps_nop/
frames.rs

1// Copyright 2026 INNO LOTUS PTY LTD
2// SPDX-License-Identifier: Apache-2.0
3
4use nps_core::codec::FrameDict;
5use nps_core::error::{NpsError, NpsResult};
6use nps_core::frames::FrameType;
7use serde_json::{json, Value};
8
9fn get_str<'a>(d: &'a FrameDict, k: &str) -> NpsResult<&'a str> {
10    d.get(k)
11        .and_then(Value::as_str)
12        .ok_or_else(|| NpsError::Frame(format!("missing field: {k}")))
13}
14
15fn opt_str<'a>(d: &'a FrameDict, k: &str) -> Option<&'a str> {
16    d.get(k).and_then(Value::as_str)
17}
18
19fn opt_u64(d: &FrameDict, k: &str) -> Option<u64> {
20    d.get(k).and_then(Value::as_u64)
21}
22
23fn opt_i64(d: &FrameDict, k: &str) -> Option<i64> {
24    d.get(k).and_then(Value::as_i64)
25}
26
27// ── TaskFrame ─────────────────────────────────────────────────────────────────
28
29#[derive(Debug, Clone)]
30pub struct TaskFrame {
31    pub task_id: String,
32    pub dag: Value,
33    pub timeout_ms: Option<u64>,
34    pub callback_url: Option<String>,
35    pub context: Option<Value>,
36    pub priority: Option<String>,
37    pub depth: Option<i64>,
38    pub compensation_policy: Option<String>,
39    /// NOP v0.7 — how long the result is cached; default 3600 s.
40    pub result_ttl_seconds: u64,
41}
42
43impl TaskFrame {
44    pub fn frame_type() -> FrameType {
45        FrameType::Task
46    }
47
48    pub fn to_dict(&self) -> FrameDict {
49        let mut m = serde_json::Map::new();
50        m.insert("task_id".into(), json!(self.task_id));
51        m.insert("dag".into(), self.dag.clone());
52        if let Some(v) = self.timeout_ms {
53            m.insert("timeout_ms".into(), json!(v));
54        }
55        if let Some(v) = &self.callback_url {
56            m.insert("callback_url".into(), json!(v));
57        }
58        if let Some(v) = &self.context {
59            m.insert("context".into(), v.clone());
60        }
61        if let Some(v) = &self.priority {
62            m.insert("priority".into(), json!(v));
63        }
64        if let Some(v) = self.depth {
65            m.insert("depth".into(), json!(v));
66        }
67        if let Some(v) = &self.compensation_policy {
68            m.insert("compensation_policy".into(), json!(v));
69        }
70        if self.result_ttl_seconds != 3600 {
71            m.insert("result_ttl_seconds".into(), json!(self.result_ttl_seconds));
72        }
73        m
74    }
75
76    pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
77        Ok(TaskFrame {
78            task_id: get_str(d, "task_id")?.to_string(),
79            dag: d.get("dag").cloned().unwrap_or(Value::Null),
80            timeout_ms: opt_u64(d, "timeout_ms"),
81            callback_url: opt_str(d, "callback_url").map(str::to_string),
82            context: d.get("context").cloned(),
83            priority: opt_str(d, "priority").map(str::to_string),
84            depth: opt_i64(d, "depth"),
85            compensation_policy: opt_str(d, "compensation_policy").map(str::to_string),
86            result_ttl_seconds: opt_u64(d, "result_ttl_seconds").unwrap_or(3600),
87        })
88    }
89}
90
91// ── DelegateFrame ─────────────────────────────────────────────────────────────
92
93#[derive(Debug, Clone)]
94pub struct DelegateFrame {
95    pub task_id: String,
96    pub subtask_id: String,
97    pub action: String,
98    pub target_nid: String,
99    pub inputs: Option<Value>,
100    pub config: Option<Value>,
101    pub idempotency_key: Option<String>,
102    pub target_cluster_anchor: Option<String>,
103}
104
105impl DelegateFrame {
106    pub fn frame_type() -> FrameType {
107        FrameType::Delegate
108    }
109
110    pub fn to_dict(&self) -> FrameDict {
111        let mut m = serde_json::Map::new();
112        m.insert("task_id".into(), json!(self.task_id));
113        m.insert("subtask_id".into(), json!(self.subtask_id));
114        m.insert("action".into(), json!(self.action));
115        m.insert("target_nid".into(), json!(self.target_nid));
116        if let Some(v) = &self.inputs {
117            m.insert("inputs".into(), v.clone());
118        }
119        if let Some(v) = &self.config {
120            m.insert("config".into(), v.clone());
121        }
122        if let Some(v) = &self.idempotency_key {
123            m.insert("idempotency_key".into(), json!(v));
124        }
125        if let Some(v) = &self.target_cluster_anchor {
126            m.insert("target_cluster_anchor".into(), json!(v));
127        }
128        m
129    }
130
131    pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
132        Ok(DelegateFrame {
133            task_id: get_str(d, "task_id")?.to_string(),
134            subtask_id: get_str(d, "subtask_id")?.to_string(),
135            action: get_str(d, "action")?.to_string(),
136            target_nid: get_str(d, "target_nid")?.to_string(),
137            inputs: d.get("inputs").cloned(),
138            config: d.get("config").cloned(),
139            idempotency_key: opt_str(d, "idempotency_key").map(str::to_string),
140            target_cluster_anchor: opt_str(d, "target_cluster_anchor").map(str::to_string),
141        })
142    }
143}
144
145// ── SyncFrame ─────────────────────────────────────────────────────────────────
146
147#[derive(Debug, Clone)]
148pub struct SyncFrame {
149    pub task_id: String,
150    pub sync_id: String,
151    pub subtask_ids: Vec<String>,
152    pub min_required: i64,
153    pub aggregate: String,
154    pub timeout_ms: Option<u64>,
155}
156
157impl SyncFrame {
158    pub fn frame_type() -> FrameType {
159        FrameType::Sync
160    }
161
162    pub fn to_dict(&self) -> FrameDict {
163        let mut m = serde_json::Map::new();
164        m.insert("task_id".into(), json!(self.task_id));
165        m.insert("sync_id".into(), json!(self.sync_id));
166        m.insert("subtask_ids".into(), json!(self.subtask_ids));
167        m.insert("min_required".into(), json!(self.min_required));
168        m.insert("aggregate".into(), json!(self.aggregate));
169        if let Some(v) = self.timeout_ms {
170            m.insert("timeout_ms".into(), json!(v));
171        }
172        m
173    }
174
175    pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
176        let subtask_ids = d
177            .get("subtask_ids")
178            .and_then(Value::as_array)
179            .map(|a| {
180                a.iter()
181                    .filter_map(Value::as_str)
182                    .map(str::to_string)
183                    .collect()
184            })
185            .unwrap_or_default();
186        Ok(SyncFrame {
187            task_id: get_str(d, "task_id")?.to_string(),
188            sync_id: get_str(d, "sync_id")?.to_string(),
189            subtask_ids,
190            min_required: opt_i64(d, "min_required").unwrap_or(0),
191            aggregate: opt_str(d, "aggregate").unwrap_or("merge").to_string(),
192            timeout_ms: opt_u64(d, "timeout_ms"),
193        })
194    }
195}
196
197// ── AlignStreamFrame ──────────────────────────────────────────────────────────
198
199#[derive(Debug, Clone)]
200pub struct AlignStreamFrame {
201    pub sync_id: String,
202    pub task_id: String,
203    pub subtask_id: String,
204    pub seq: u64,
205    pub is_final: bool,
206    pub source_nid: Option<String>,
207    pub result: Option<Value>,
208    pub error: Option<Value>,
209    pub window_size: Option<u64>,
210    pub ack_seq: Option<u64>,
211    pub nak_seq: Option<u64>,
212}
213
214impl AlignStreamFrame {
215    pub fn frame_type() -> FrameType {
216        FrameType::AlignStream
217    }
218
219    pub fn error_code(&self) -> Option<&str> {
220        self.error
221            .as_ref()
222            .and_then(|e| e.get("error_code"))
223            .and_then(Value::as_str)
224    }
225
226    pub fn error_message(&self) -> Option<&str> {
227        self.error
228            .as_ref()
229            .and_then(|e| e.get("message"))
230            .and_then(Value::as_str)
231    }
232
233    pub fn to_dict(&self) -> FrameDict {
234        let mut m = serde_json::Map::new();
235        m.insert("sync_id".into(), json!(self.sync_id));
236        m.insert("task_id".into(), json!(self.task_id));
237        m.insert("subtask_id".into(), json!(self.subtask_id));
238        m.insert("seq".into(), json!(self.seq));
239        m.insert("is_final".into(), json!(self.is_final));
240        if let Some(v) = &self.source_nid {
241            m.insert("source_nid".into(), json!(v));
242        }
243        if let Some(v) = &self.result {
244            m.insert("result".into(), v.clone());
245        }
246        if let Some(v) = &self.error {
247            m.insert("error".into(), v.clone());
248        }
249        if let Some(v) = self.window_size {
250            m.insert("window_size".into(), json!(v));
251        }
252        if let Some(v) = self.ack_seq {
253            m.insert("ack_seq".into(), json!(v));
254        }
255        if let Some(v) = self.nak_seq {
256            m.insert("nak_seq".into(), json!(v));
257        }
258        m
259    }
260
261    pub fn from_dict(d: &FrameDict) -> NpsResult<Self> {
262        Ok(AlignStreamFrame {
263            sync_id: get_str(d, "sync_id")?.to_string(),
264            task_id: get_str(d, "task_id")?.to_string(),
265            subtask_id: get_str(d, "subtask_id")?.to_string(),
266            seq: opt_u64(d, "seq").unwrap_or(0),
267            is_final: d.get("is_final").and_then(Value::as_bool).unwrap_or(false),
268            source_nid: opt_str(d, "source_nid").map(str::to_string),
269            result: d.get("result").cloned(),
270            error: d.get("error").cloned(),
271            window_size: opt_u64(d, "window_size"),
272            ack_seq: opt_u64(d, "ack_seq"),
273            nak_seq: opt_u64(d, "nak_seq"),
274        })
275    }
276}