Skip to main content

worldinterface_flowspec/
payload.rs

1//! Payload types for Coordinator and Step tasks.
2//!
3//! These are serialized as JSON and stored in AQ `TaskPayload`. The multiplexed
4//! handler (Sprint 4) uses `TaskType` to route execution.
5
6use std::collections::HashMap;
7
8use actionqueue_core::ids::TaskId;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use worldinterface_core::flowspec::{FlowSpec, NodeType};
12use worldinterface_core::id::{FlowRunId, NodeId};
13
14/// Discriminator used by the multiplexed handler to route execution.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "snake_case")]
17pub enum TaskType {
18    Coordinator,
19    Step,
20}
21
22/// Payload for the Coordinator task. Contains everything needed to
23/// orchestrate a FlowRun.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct CoordinatorPayload {
26    /// Discriminator for the multiplexed handler.
27    pub task_type: TaskType,
28    /// The FlowSpec being executed (immutable after compilation).
29    pub flow_spec: FlowSpec,
30    /// The assigned FlowRunId.
31    pub flow_run_id: FlowRunId,
32    /// Pre-computed NodeId -> TaskId mapping.
33    pub node_task_map: HashMap<NodeId, TaskId>,
34    /// Pre-computed dependency declarations.
35    pub dependencies: HashMap<TaskId, Vec<TaskId>>,
36}
37
38/// Payload for a Step task. Contains everything a Step handler needs to
39/// invoke a connector or execute a transform.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct StepPayload {
42    /// Discriminator for the multiplexed handler.
43    pub task_type: TaskType,
44    /// The FlowRunId this step belongs to.
45    pub flow_run_id: FlowRunId,
46    /// Which node this step executes.
47    pub node_id: NodeId,
48    /// The node's type-specific configuration.
49    pub node_type: NodeType,
50    /// Flow-level parameters (from FlowSpec.params).
51    pub flow_params: Option<Value>,
52}
53
54/// JSON content type used for all payloads.
55pub const PAYLOAD_CONTENT_TYPE: &str = "application/json";
56
57#[cfg(test)]
58mod tests {
59    use serde_json::json;
60    use worldinterface_core::flowspec::*;
61
62    use super::*;
63
64    fn sample_flow_spec() -> FlowSpec {
65        let id = NodeId::new();
66        FlowSpec {
67            id: None,
68            name: Some("test".into()),
69            nodes: vec![Node {
70                id,
71                label: None,
72                node_type: NodeType::Connector(ConnectorNode {
73                    connector: "http.request".into(),
74                    params: json!({"url": "https://example.com"}),
75                    idempotency_config: None,
76                }),
77            }],
78            edges: vec![],
79            params: Some(json!({"timeout": 30})),
80        }
81    }
82
83    #[test]
84    fn coordinator_payload_roundtrip() {
85        let spec = sample_flow_spec();
86        let payload = CoordinatorPayload {
87            task_type: TaskType::Coordinator,
88            flow_spec: spec.clone(),
89            flow_run_id: FlowRunId::new(),
90            node_task_map: HashMap::new(),
91            dependencies: HashMap::new(),
92        };
93        let json = serde_json::to_string(&payload).unwrap();
94        let back: CoordinatorPayload = serde_json::from_str(&json).unwrap();
95        assert_eq!(back.task_type, TaskType::Coordinator);
96        assert_eq!(back.flow_spec, spec);
97    }
98
99    #[test]
100    fn step_payload_roundtrip() {
101        let node_id = NodeId::new();
102        let node_type = NodeType::Connector(ConnectorNode {
103            connector: "http.request".into(),
104            params: json!({}),
105            idempotency_config: None,
106        });
107        let payload = StepPayload {
108            task_type: TaskType::Step,
109            flow_run_id: FlowRunId::new(),
110            node_id,
111            node_type: node_type.clone(),
112            flow_params: Some(json!({"key": "value"})),
113        };
114        let json = serde_json::to_string(&payload).unwrap();
115        let back: StepPayload = serde_json::from_str(&json).unwrap();
116        assert_eq!(back.task_type, TaskType::Step);
117        assert_eq!(back.node_id, node_id);
118        assert_eq!(back.node_type, node_type);
119        assert_eq!(back.flow_params, Some(json!({"key": "value"})));
120    }
121
122    #[test]
123    fn coordinator_payload_contains_task_type() {
124        let payload = CoordinatorPayload {
125            task_type: TaskType::Coordinator,
126            flow_spec: sample_flow_spec(),
127            flow_run_id: FlowRunId::new(),
128            node_task_map: HashMap::new(),
129            dependencies: HashMap::new(),
130        };
131        let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
132        assert_eq!(value["task_type"], "coordinator");
133    }
134
135    #[test]
136    fn step_payload_contains_task_type() {
137        let payload = StepPayload {
138            task_type: TaskType::Step,
139            flow_run_id: FlowRunId::new(),
140            node_id: NodeId::new(),
141            node_type: NodeType::Transform(TransformNode {
142                transform: TransformType::Identity,
143                input: json!({}),
144            }),
145            flow_params: None,
146        };
147        let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
148        assert_eq!(value["task_type"], "step");
149    }
150
151    #[test]
152    fn step_payload_contains_node_type() {
153        // Connector
154        let payload = StepPayload {
155            task_type: TaskType::Step,
156            flow_run_id: FlowRunId::new(),
157            node_id: NodeId::new(),
158            node_type: NodeType::Connector(ConnectorNode {
159                connector: "test".into(),
160                params: json!({}),
161                idempotency_config: None,
162            }),
163            flow_params: None,
164        };
165        let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
166        assert!(value["node_type"]["connector"].is_object());
167
168        // Transform
169        let payload = StepPayload {
170            task_type: TaskType::Step,
171            flow_run_id: FlowRunId::new(),
172            node_id: NodeId::new(),
173            node_type: NodeType::Transform(TransformNode {
174                transform: TransformType::Identity,
175                input: json!({}),
176            }),
177            flow_params: None,
178        };
179        let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
180        assert!(value["node_type"]["transform"].is_object());
181
182        // Branch
183        let target = NodeId::new();
184        let payload = StepPayload {
185            task_type: TaskType::Step,
186            flow_run_id: FlowRunId::new(),
187            node_id: NodeId::new(),
188            node_type: NodeType::Branch(BranchNode {
189                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
190                then_edge: target,
191                else_edge: None,
192            }),
193            flow_params: None,
194        };
195        let value: serde_json::Value = serde_json::to_value(&payload).unwrap();
196        assert!(value["node_type"]["branch"].is_object());
197    }
198
199    #[test]
200    fn coordinator_payload_contains_full_spec() {
201        let spec = sample_flow_spec();
202        let payload = CoordinatorPayload {
203            task_type: TaskType::Coordinator,
204            flow_spec: spec.clone(),
205            flow_run_id: FlowRunId::new(),
206            node_task_map: HashMap::new(),
207            dependencies: HashMap::new(),
208        };
209        let json = serde_json::to_string(&payload).unwrap();
210        let back: CoordinatorPayload = serde_json::from_str(&json).unwrap();
211        assert_eq!(back.flow_spec, spec);
212    }
213}