1use std::collections::HashMap;
7
8use actionqueue_core::ids::TaskId;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use worldinterface_core::flowspec::{FlowSpec, NodeType};
12use worldinterface_core::id::{FlowRunId, NodeId};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum TaskType {
18 Coordinator,
19 Step,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CoordinatorPayload {
26 pub task_type: TaskType,
28 pub flow_spec: FlowSpec,
30 pub flow_run_id: FlowRunId,
32 pub node_task_map: HashMap<NodeId, TaskId>,
34 pub dependencies: HashMap<TaskId, Vec<TaskId>>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct StepPayload {
42 pub task_type: TaskType,
44 pub flow_run_id: FlowRunId,
46 pub node_id: NodeId,
48 pub node_type: NodeType,
50 pub flow_params: Option<Value>,
52}
53
54pub const PAYLOAD_CONTENT_TYPE: &str = "application/json";
56
57#[cfg(test)]
58mod tests {
59 use serde_json::json;
60 use worldinterface_core::flowspec::*;
61
62 use super::*;
63
64 fn sample_flow_spec() -> FlowSpec {
65 let id = NodeId::new();
66 FlowSpec {
67 id: None,
68 name: Some("test".into()),
69 nodes: vec![Node {
70 id,
71 label: None,
72 node_type: NodeType::Connector(ConnectorNode {
73 connector: "http.request".into(),
74 params: json!({"url": "https://example.com"}),
75 idempotency_config: None,
76 }),
77 }],
78 edges: vec![],
79 params: Some(json!({"timeout": 30})),
80 }
81 }
82
83 #[test]
84 fn coordinator_payload_roundtrip() {
85 let spec = sample_flow_spec();
86 let payload = CoordinatorPayload {
87 task_type: TaskType::Coordinator,
88 flow_spec: spec.clone(),
89 flow_run_id: FlowRunId::new(),
90 node_task_map: HashMap::new(),
91 dependencies: HashMap::new(),
92 };
93 let json = serde_json::to_string(&payload).unwrap();
94 let back: CoordinatorPayload = serde_json::from_str(&json).unwrap();
95 assert_eq!(back.task_type, TaskType::Coordinator);
96 assert_eq!(back.flow_spec, spec);
97 }
98
99 #[test]
100 fn step_payload_roundtrip() {
101 let node_id = NodeId::new();
102 let node_type = NodeType::Connector(ConnectorNode {
103 connector: "http.request".into(),
104 params: json!({}),
105 idempotency_config: None,
106 });
107 let payload = StepPayload {
108 task_type: TaskType::Step,
109 flow_run_id: FlowRunId::new(),
110 node_id,
111 node_type: node_type.clone(),
112 flow_params: Some(json!({"key": "value"})),
113 };
114 let json = serde_json::to_string(&payload).unwrap();
115 let back: StepPayload = serde_json::from_str(&json).unwrap();
116 assert_eq!(back.task_type, TaskType::Step);
117 assert_eq!(back.node_id, node_id);
118 assert_eq!(back.node_type, node_type);
119 assert_eq!(back.flow_params, Some(json!({"key": "value"})));
120 }
121
122 #[test]
123 fn coordinator_payload_contains_task_type() {
124 let payload = CoordinatorPayload {
125 task_type: TaskType::Coordinator,
126 flow_spec: sample_flow_spec(),
127 flow_run_id: FlowRunId::new(),
128 node_task_map: HashMap::new(),
129 dependencies: HashMap::new(),
130 };
131 let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
132 assert_eq!(value["task_type"], "coordinator");
133 }
134
135 #[test]
136 fn step_payload_contains_task_type() {
137 let payload = StepPayload {
138 task_type: TaskType::Step,
139 flow_run_id: FlowRunId::new(),
140 node_id: NodeId::new(),
141 node_type: NodeType::Transform(TransformNode {
142 transform: TransformType::Identity,
143 input: json!({}),
144 }),
145 flow_params: None,
146 };
147 let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
148 assert_eq!(value["task_type"], "step");
149 }
150
151 #[test]
152 fn step_payload_contains_node_type() {
153 let payload = StepPayload {
155 task_type: TaskType::Step,
156 flow_run_id: FlowRunId::new(),
157 node_id: NodeId::new(),
158 node_type: NodeType::Connector(ConnectorNode {
159 connector: "test".into(),
160 params: json!({}),
161 idempotency_config: None,
162 }),
163 flow_params: None,
164 };
165 let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
166 assert!(value["node_type"]["connector"].is_object());
167
168 let payload = StepPayload {
170 task_type: TaskType::Step,
171 flow_run_id: FlowRunId::new(),
172 node_id: NodeId::new(),
173 node_type: NodeType::Transform(TransformNode {
174 transform: TransformType::Identity,
175 input: json!({}),
176 }),
177 flow_params: None,
178 };
179 let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
180 assert!(value["node_type"]["transform"].is_object());
181
182 let target = NodeId::new();
184 let payload = StepPayload {
185 task_type: TaskType::Step,
186 flow_run_id: FlowRunId::new(),
187 node_id: NodeId::new(),
188 node_type: NodeType::Branch(BranchNode {
189 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
190 then_edge: target,
191 else_edge: None,
192 }),
193 flow_params: None,
194 };
195 let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
196 assert!(value["node_type"]["branch"].is_object());
197 }
198
199 #[test]
200 fn coordinator_payload_contains_full_spec() {
201 let spec = sample_flow_spec();
202 let payload = CoordinatorPayload {
203 task_type: TaskType::Coordinator,
204 flow_spec: spec.clone(),
205 flow_run_id: FlowRunId::new(),
206 node_task_map: HashMap::new(),
207 dependencies: HashMap::new(),
208 };
209 let json = serde_json::to_string(&payload).unwrap();
210 let back: CoordinatorPayload = serde_json::from_str(&json).unwrap();
211 assert_eq!(back.flow_spec, spec);
212 }
213}