1pub mod descriptor;
12pub mod flowspec;
13pub mod id;
14pub mod metrics;
15pub mod receipt;
16
17#[cfg(test)]
18mod tests {
19 use proptest::prelude::*;
20 use serde_json::json;
21
22 use crate::flowspec::*;
23 use crate::id::*;
24
25 fn sample_linear_flow() -> FlowSpec {
26 let ids: Vec<NodeId> = (0..3).map(|_| NodeId::new()).collect();
27 FlowSpec {
28 id: Some(FlowId::new()),
29 name: Some("test-pipeline".into()),
30 nodes: vec![
31 Node {
32 id: ids[0],
33 label: Some("Fetch data".into()),
34 node_type: NodeType::Connector(ConnectorNode {
35 connector: "http.request".into(),
36 params: json!({"url": "https://example.com", "method": "GET"}),
37 idempotency_config: None,
38 }),
39 },
40 Node {
41 id: ids[1],
42 label: Some("Transform".into()),
43 node_type: NodeType::Transform(TransformNode {
44 transform: TransformType::FieldMapping(FieldMappingSpec {
45 mappings: vec![FieldMapping {
46 from: "body.data".into(),
47 to: "result".into(),
48 }],
49 }),
50 input: json!({}),
51 }),
52 },
53 Node {
54 id: ids[2],
55 label: Some("Save".into()),
56 node_type: NodeType::Connector(ConnectorNode {
57 connector: "fs.write".into(),
58 params: json!({"path": "/tmp/out.json"}),
59 idempotency_config: Some(IdempotencyConfig {
60 strategy: IdempotencyStrategy::RunId,
61 }),
62 }),
63 },
64 ],
65 edges: vec![
66 Edge { from: ids[0], to: ids[1], condition: None },
67 Edge { from: ids[1], to: ids[2], condition: None },
68 ],
69 params: Some(json!({"timeout": 30})),
70 }
71 }
72
73 fn sample_branch_flow() -> FlowSpec {
74 let ids: Vec<NodeId> = (0..4).map(|_| NodeId::new()).collect();
75 FlowSpec {
76 id: None,
77 name: None,
78 nodes: vec![
79 Node {
80 id: ids[0],
81 label: None,
82 node_type: NodeType::Connector(ConnectorNode {
83 connector: "http.request".into(),
84 params: json!({}),
85 idempotency_config: None,
86 }),
87 },
88 Node {
89 id: ids[1],
90 label: None,
91 node_type: NodeType::Branch(BranchNode {
92 condition: BranchCondition::Equals {
93 left: ParamRef::NodeOutput { node_id: ids[0], path: "status".into() },
94 right: json!(200),
95 },
96 then_edge: ids[2],
97 else_edge: Some(ids[3]),
98 }),
99 },
100 Node {
101 id: ids[2],
102 label: None,
103 node_type: NodeType::Transform(TransformNode {
104 transform: TransformType::Identity,
105 input: json!({}),
106 }),
107 },
108 Node {
109 id: ids[3],
110 label: None,
111 node_type: NodeType::Connector(ConnectorNode {
112 connector: "delay".into(),
113 params: json!({"duration_ms": 1000}),
114 idempotency_config: None,
115 }),
116 },
117 ],
118 edges: vec![
119 Edge { from: ids[0], to: ids[1], condition: None },
120 Edge { from: ids[1], to: ids[2], condition: Some(EdgeCondition::BranchTrue) },
121 Edge { from: ids[1], to: ids[3], condition: Some(EdgeCondition::BranchFalse) },
122 ],
123 params: None,
124 }
125 }
126
127 #[test]
128 fn flowspec_json_roundtrip_full() {
129 let spec = sample_linear_flow();
130 let json = serde_json::to_string_pretty(&spec).unwrap();
131 let back: FlowSpec = serde_json::from_str(&json).unwrap();
132 assert_eq!(spec, back);
133 }
134
135 #[test]
136 fn flowspec_json_roundtrip_branch() {
137 let spec = sample_branch_flow();
138 let json = serde_json::to_string_pretty(&spec).unwrap();
139 let back: FlowSpec = serde_json::from_str(&json).unwrap();
140 assert_eq!(spec, back);
141 }
142
143 #[test]
144 fn flowspec_yaml_roundtrip() {
145 let spec = sample_linear_flow();
146 let yaml = serde_yaml::to_string(&spec).unwrap();
147 let back: FlowSpec = serde_yaml::from_str(&yaml).unwrap();
148 assert_eq!(spec, back);
149 }
150
151 #[test]
152 fn flowspec_minimal_single_node() {
153 let id = NodeId::new();
154 let spec = FlowSpec {
155 id: None,
156 name: None,
157 nodes: vec![Node {
158 id,
159 label: None,
160 node_type: NodeType::Connector(ConnectorNode {
161 connector: "delay".into(),
162 params: json!({}),
163 idempotency_config: None,
164 }),
165 }],
166 edges: vec![],
167 params: None,
168 };
169 let json = serde_json::to_string(&spec).unwrap();
170 let back: FlowSpec = serde_json::from_str(&json).unwrap();
171 assert_eq!(spec, back);
172 }
173
174 #[test]
175 fn flowspec_ephemeral_has_no_id() {
176 let spec = sample_branch_flow();
177 assert!(spec.id.is_none());
178 assert!(spec.name.is_none());
179 let json_value: serde_json::Value = serde_json::to_value(&spec).unwrap();
183 let obj = json_value.as_object().unwrap();
184 assert!(!obj.contains_key("id"));
185 assert!(!obj.contains_key("name"));
186 }
187
188 #[test]
189 fn flowspec_named_has_id() {
190 let spec = sample_linear_flow();
191 assert!(spec.id.is_some());
192 assert!(spec.name.is_some());
193 }
194
195 #[test]
196 fn flowspec_optional_fields_none_survive_roundtrip() {
197 let id = NodeId::new();
198 let spec = FlowSpec {
199 id: None,
200 name: None,
201 nodes: vec![Node {
202 id,
203 label: None,
204 node_type: NodeType::Connector(ConnectorNode {
205 connector: "delay".into(),
206 params: json!({}),
207 idempotency_config: None,
208 }),
209 }],
210 edges: vec![],
211 params: None,
212 };
213 let json = serde_json::to_string(&spec).unwrap();
214 let back: FlowSpec = serde_json::from_str(&json).unwrap();
215 assert_eq!(back.id, None);
216 assert_eq!(back.name, None);
217 assert_eq!(back.params, None);
218 assert_eq!(back.nodes[0].label, None);
219 }
220
221 #[test]
222 fn flowspec_ignores_unknown_fields() {
223 let json = r#"{
225 "nodes": [{
226 "id": "00000000-0000-0000-0000-000000000001",
227 "node_type": {"connector": {"connector": "delay", "params": {}}},
228 "future_field": "should be ignored"
229 }],
230 "edges": [],
231 "also_unknown": true
232 }"#;
233 let spec: FlowSpec = serde_json::from_str(json).unwrap();
234 assert_eq!(spec.nodes.len(), 1);
235 }
236
237 fn linear_flow(n: usize) -> FlowSpec {
241 let ids: Vec<NodeId> = (0..n).map(|_| NodeId::new()).collect();
242 let nodes = ids
243 .iter()
244 .map(|&id| Node {
245 id,
246 label: None,
247 node_type: NodeType::Connector(ConnectorNode {
248 connector: "test".into(),
249 params: json!({}),
250 idempotency_config: None,
251 }),
252 })
253 .collect();
254 let edges =
255 ids.windows(2).map(|w| Edge { from: w[0], to: w[1], condition: None }).collect();
256 FlowSpec { id: None, name: None, nodes, edges, params: None }
257 }
258
259 proptest! {
260 #[test]
261 fn validation_is_deterministic(n in 1usize..=20) {
262 let spec = linear_flow(n);
263 let r1 = spec.validate();
264 let r2 = spec.validate();
265 prop_assert!(r1.is_ok(), "first call failed");
266 prop_assert!(r2.is_ok(), "second call failed");
267 }
268
269 #[test]
270 fn valid_spec_survives_json_roundtrip(n in 1usize..=20) {
271 let spec = linear_flow(n);
272 prop_assert!(spec.validate().is_ok());
273 let json_str = serde_json::to_string(&spec).unwrap();
274 let back: FlowSpec = serde_json::from_str(&json_str).unwrap();
275 prop_assert_eq!(&spec, &back);
276 prop_assert!(back.validate().is_ok());
277 }
278
279 #[test]
280 fn generated_specs_have_unique_node_ids(n in 1usize..=20) {
281 let spec = linear_flow(n);
282 let result = spec.validate();
283 prop_assert!(result.is_ok(), "linear flow with {} nodes should be valid", n);
284 }
285 }
286}