1use std::collections::HashMap;
8
9use actionqueue_core::ids::TaskId;
10use actionqueue_core::task::constraints::TaskConstraints;
11use actionqueue_core::task::metadata::TaskMetadata;
12use actionqueue_core::task::run_policy::RunPolicy;
13use actionqueue_core::task::safety::SafetyLevel;
14use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
15use worldinterface_core::flowspec::topo;
16use worldinterface_core::flowspec::{FlowSpec, NodeType};
17use worldinterface_core::id::{FlowRunId, NodeId};
18
19use crate::config::CompilerConfig;
20use crate::error::CompilationError;
21use crate::id::{derive_coordinator_task_id, derive_task_id};
22use crate::payload::{CoordinatorPayload, StepPayload, TaskType, PAYLOAD_CONTENT_TYPE};
23
24#[derive(Debug, Clone)]
26pub struct CompilationResult {
27 pub coordinator: TaskSpec,
29 pub steps: Vec<TaskSpec>,
31 pub dependencies: HashMap<TaskId, Vec<TaskId>>,
33 pub node_task_map: HashMap<NodeId, TaskId>,
35 pub flow_run_id: FlowRunId,
37}
38
39pub fn compile(spec: &FlowSpec) -> Result<CompilationResult, CompilationError> {
43 compile_with_config(spec, &CompilerConfig::default(), FlowRunId::new())
44}
45
46pub fn compile_with_config(
51 spec: &FlowSpec,
52 config: &CompilerConfig,
53 flow_run_id: FlowRunId,
54) -> Result<CompilationResult, CompilationError> {
55 spec.validate()?;
57
58 if spec.nodes.is_empty() {
59 return Err(CompilationError::EmptyFlowSpec);
60 }
61
62 let topo_order = topo::topological_sort(spec).expect(
64 "topological_sort failed after successful validation (cycle should have been caught by \
65 validate)",
66 );
67
68 let node_task_map: HashMap<NodeId, TaskId> =
70 topo_order.iter().map(|&node_id| (node_id, derive_task_id(flow_run_id, node_id))).collect();
71
72 let mut dependencies: HashMap<TaskId, Vec<TaskId>> = HashMap::new();
74 for edge in &spec.edges {
75 if let (Some(&to_task), Some(&from_task)) =
76 (node_task_map.get(&edge.to), node_task_map.get(&edge.from))
77 {
78 dependencies.entry(to_task).or_default().push(from_task);
79 }
80 }
81
82 let node_index: HashMap<NodeId, &worldinterface_core::flowspec::Node> =
84 spec.nodes.iter().map(|n| (n.id, n)).collect();
85
86 let coordinator_task_id = derive_coordinator_task_id(flow_run_id);
88
89 let coordinator_payload = CoordinatorPayload {
90 task_type: TaskType::Coordinator,
91 flow_spec: spec.clone(),
92 flow_run_id,
93 node_task_map: node_task_map.clone(),
94 dependencies: dependencies.clone(),
95 };
96 let coordinator_payload_bytes = serde_json::to_vec(&coordinator_payload)?;
97
98 let mut coordinator_constraints = TaskConstraints::default();
99 coordinator_constraints.set_safety_level(SafetyLevel::Pure);
100 if let Some(timeout) = config.coordinator_timeout_secs {
101 coordinator_constraints.set_timeout_secs(Some(timeout)).map_err(|e| {
102 CompilationError::TaskSpecFailed {
103 node_id: None,
104 source: actionqueue_core::task::task_spec::TaskSpecError::InvalidConstraints(e),
105 }
106 })?;
107 }
108
109 let coordinator_metadata = TaskMetadata::new(
110 vec!["wi".into(), "coordinator".into()],
111 0,
112 spec.name.as_ref().map(|n| format!("Coordinator for flow: {n}")),
113 );
114
115 let coordinator = TaskSpec::new(
116 coordinator_task_id,
117 TaskPayload::with_content_type(coordinator_payload_bytes, PAYLOAD_CONTENT_TYPE),
118 RunPolicy::Once,
119 coordinator_constraints,
120 coordinator_metadata,
121 )
122 .map_err(|e| CompilationError::TaskSpecFailed { node_id: None, source: e })?;
123
124 let mut steps = Vec::with_capacity(topo_order.len());
126
127 for &node_id in &topo_order {
128 let node = node_index[&node_id];
129 let task_id = node_task_map[&node_id];
130
131 let step_payload = StepPayload {
132 task_type: TaskType::Step,
133 flow_run_id,
134 node_id,
135 node_type: node.node_type.clone(),
136 flow_params: spec.params.clone(),
137 };
138 let step_payload_bytes = serde_json::to_vec(&step_payload)?;
139
140 let safety_level = match &node.node_type {
141 NodeType::Connector(_) => SafetyLevel::Idempotent,
142 NodeType::Transform(_) => SafetyLevel::Pure,
143 NodeType::Branch(_) => SafetyLevel::Pure,
144 };
145
146 let mut step_constraints = TaskConstraints::new(
147 config.default_step_max_attempts,
148 config.default_step_timeout_secs,
149 None,
150 )
151 .map_err(|e| CompilationError::TaskSpecFailed {
152 node_id: Some(node_id),
153 source: actionqueue_core::task::task_spec::TaskSpecError::InvalidConstraints(e),
154 })?;
155 step_constraints.set_safety_level(safety_level);
156
157 let step_metadata =
158 TaskMetadata::new(vec!["wi".into(), "step".into()], 0, node.label.clone());
159
160 let step = TaskSpec::new(
161 task_id,
162 TaskPayload::with_content_type(step_payload_bytes, PAYLOAD_CONTENT_TYPE),
163 RunPolicy::Once,
164 step_constraints,
165 step_metadata,
166 )
167 .map_err(|e| CompilationError::TaskSpecFailed { node_id: Some(node_id), source: e })?
168 .with_parent(coordinator_task_id);
169
170 steps.push(step);
171 }
172
173 Ok(CompilationResult { coordinator, steps, dependencies, node_task_map, flow_run_id })
174}
175
176#[cfg(test)]
177mod tests {
178 use proptest::prelude::*;
179 use serde_json::json;
180 use worldinterface_core::flowspec::*;
181
182 use super::*;
183
184 fn connector_node(id: NodeId, name: &str) -> Node {
185 Node {
186 id,
187 label: Some(name.into()),
188 node_type: NodeType::Connector(ConnectorNode {
189 connector: name.into(),
190 params: json!({}),
191 idempotency_config: None,
192 }),
193 }
194 }
195
196 fn transform_node(id: NodeId) -> Node {
197 Node {
198 id,
199 label: None,
200 node_type: NodeType::Transform(TransformNode {
201 transform: TransformType::Identity,
202 input: json!({}),
203 }),
204 }
205 }
206
207 fn branch_node_helper(id: NodeId, then_edge: NodeId, else_edge: Option<NodeId>) -> Node {
208 Node {
209 id,
210 label: None,
211 node_type: NodeType::Branch(BranchNode {
212 condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
213 then_edge,
214 else_edge,
215 }),
216 }
217 }
218
219 fn edge(from: NodeId, to: NodeId) -> Edge {
220 Edge { from, to, condition: None }
221 }
222
223 fn branch_edge(from: NodeId, to: NodeId, condition: EdgeCondition) -> Edge {
224 Edge { from, to, condition: Some(condition) }
225 }
226
227 fn make_ids(n: usize) -> Vec<NodeId> {
228 (0..n).map(|_| NodeId::new()).collect()
229 }
230
231 fn make_spec(nodes: Vec<Node>, edges: Vec<Edge>) -> FlowSpec {
232 FlowSpec { id: None, name: None, nodes, edges, params: None }
233 }
234
235 fn make_spec_with_params(nodes: Vec<Node>, edges: Vec<Edge>) -> FlowSpec {
236 FlowSpec {
237 id: None,
238 name: Some("test-flow".into()),
239 nodes,
240 edges,
241 params: Some(json!({"timeout": 30})),
242 }
243 }
244
245 #[test]
248 fn compile_single_node() {
249 let id = NodeId::new();
250 let spec = make_spec(vec![connector_node(id, "delay")], vec![]);
251 let result = compile(&spec).unwrap();
252 assert_eq!(result.steps.len(), 1);
253 assert!(result.dependencies.is_empty());
254 }
255
256 #[test]
257 fn compile_linear_three_nodes() {
258 let ids = make_ids(3);
259 let spec = make_spec(
260 vec![
261 connector_node(ids[0], "a"),
262 connector_node(ids[1], "b"),
263 connector_node(ids[2], "c"),
264 ],
265 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
266 );
267 let result = compile(&spec).unwrap();
268 assert_eq!(result.steps.len(), 3);
269
270 let task_a = result.node_task_map[&ids[0]];
271 let task_b = result.node_task_map[&ids[1]];
272 let task_c = result.node_task_map[&ids[2]];
273
274 assert!(result.dependencies[&task_b].contains(&task_a));
276 assert!(result.dependencies[&task_c].contains(&task_b));
278 assert!(!result.dependencies.contains_key(&task_a));
280 }
281
282 #[test]
283 fn compile_linear_five_nodes() {
284 let ids = make_ids(5);
285 let spec = make_spec(
286 vec![
287 connector_node(ids[0], "a"),
288 connector_node(ids[1], "b"),
289 connector_node(ids[2], "c"),
290 connector_node(ids[3], "d"),
291 connector_node(ids[4], "e"),
292 ],
293 vec![
294 edge(ids[0], ids[1]),
295 edge(ids[1], ids[2]),
296 edge(ids[2], ids[3]),
297 edge(ids[3], ids[4]),
298 ],
299 );
300 let result = compile(&spec).unwrap();
301 assert_eq!(result.steps.len(), 5);
302
303 for i in 1..5 {
304 let task = result.node_task_map[&ids[i]];
305 let prev = result.node_task_map[&ids[i - 1]];
306 assert!(result.dependencies[&task].contains(&prev));
307 }
308 }
309
310 #[test]
311 fn compile_step_has_parent() {
312 let ids = make_ids(2);
313 let spec = make_spec(
314 vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
315 vec![edge(ids[0], ids[1])],
316 );
317 let result = compile(&spec).unwrap();
318 let coord_id = result.coordinator.id();
319 for step in &result.steps {
320 assert_eq!(step.parent_task_id(), Some(coord_id));
321 }
322 }
323
324 #[test]
325 fn compile_step_task_ids_unique() {
326 let ids = make_ids(3);
327 let spec = make_spec(
328 vec![
329 connector_node(ids[0], "a"),
330 connector_node(ids[1], "b"),
331 connector_node(ids[2], "c"),
332 ],
333 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
334 );
335 let result = compile(&spec).unwrap();
336 let mut task_ids: Vec<TaskId> = result.steps.iter().map(|s| s.id()).collect();
337 task_ids.push(result.coordinator.id());
338 task_ids.sort_by_key(|id| *id.as_uuid());
339 task_ids.dedup_by_key(|id| *id.as_uuid());
340 assert_eq!(task_ids.len(), 4); }
342
343 #[test]
344 fn compile_step_order_is_topological() {
345 let ids = make_ids(3);
346 let spec = make_spec(
347 vec![
348 connector_node(ids[0], "a"),
349 connector_node(ids[1], "b"),
350 connector_node(ids[2], "c"),
351 ],
352 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
353 );
354 let result = compile(&spec).unwrap();
355 let step_task_ids: Vec<TaskId> = result.steps.iter().map(|s| s.id()).collect();
356 assert_eq!(step_task_ids[0], result.node_task_map[&ids[0]]);
358 assert_eq!(step_task_ids[1], result.node_task_map[&ids[1]]);
359 assert_eq!(step_task_ids[2], result.node_task_map[&ids[2]]);
360 }
361
362 #[test]
365 fn compile_branch_simple() {
366 let ids = make_ids(4);
367 let spec = make_spec(
369 vec![
370 connector_node(ids[0], "a"),
371 branch_node_helper(ids[1], ids[2], Some(ids[3])),
372 connector_node(ids[2], "b"),
373 connector_node(ids[3], "c"),
374 ],
375 vec![
376 edge(ids[0], ids[1]),
377 branch_edge(ids[1], ids[2], EdgeCondition::BranchTrue),
378 branch_edge(ids[1], ids[3], EdgeCondition::BranchFalse),
379 ],
380 );
381 let result = compile(&spec).unwrap();
382 assert_eq!(result.steps.len(), 4);
383
384 let task_branch = result.node_task_map[&ids[1]];
385 let task_b = result.node_task_map[&ids[2]];
386 let task_c = result.node_task_map[&ids[3]];
387
388 assert!(result.dependencies[&task_b].contains(&task_branch));
390 assert!(result.dependencies[&task_c].contains(&task_branch));
391 }
392
393 #[test]
394 fn compile_branch_then_only() {
395 let ids = make_ids(3);
396 let spec = make_spec(
398 vec![
399 connector_node(ids[0], "a"),
400 branch_node_helper(ids[1], ids[2], None),
401 connector_node(ids[2], "b"),
402 ],
403 vec![edge(ids[0], ids[1]), branch_edge(ids[1], ids[2], EdgeCondition::BranchTrue)],
404 );
405 let result = compile(&spec).unwrap();
406 assert_eq!(result.steps.len(), 3);
407
408 let task_branch = result.node_task_map[&ids[1]];
409 let task_b = result.node_task_map[&ids[2]];
410 assert!(result.dependencies[&task_b].contains(&task_branch));
411 }
412
413 #[test]
414 fn compile_branch_with_downstream() {
415 let ids = make_ids(5);
416 let spec = make_spec(
418 vec![
419 connector_node(ids[0], "a"),
420 branch_node_helper(ids[1], ids[2], Some(ids[3])),
421 connector_node(ids[2], "b"),
422 connector_node(ids[3], "c"),
423 connector_node(ids[4], "d"),
424 ],
425 vec![
426 edge(ids[0], ids[1]),
427 branch_edge(ids[1], ids[2], EdgeCondition::BranchTrue),
428 branch_edge(ids[1], ids[3], EdgeCondition::BranchFalse),
429 edge(ids[2], ids[4]),
430 edge(ids[3], ids[4]),
431 ],
432 );
433 let result = compile(&spec).unwrap();
434 assert_eq!(result.steps.len(), 5);
435
436 let task_b = result.node_task_map[&ids[2]];
437 let task_c = result.node_task_map[&ids[3]];
438 let task_d = result.node_task_map[&ids[4]];
439
440 let d_deps = &result.dependencies[&task_d];
442 assert!(d_deps.contains(&task_b));
443 assert!(d_deps.contains(&task_c));
444 }
445
446 #[test]
449 fn compiled_coordinator_payload_deserializes() {
450 let ids = make_ids(2);
451 let spec = make_spec_with_params(
452 vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
453 vec![edge(ids[0], ids[1])],
454 );
455 let result = compile(&spec).unwrap();
456 let payload_bytes = result.coordinator.payload();
457 let payload: CoordinatorPayload = serde_json::from_slice(payload_bytes).unwrap();
458 assert_eq!(payload.task_type, TaskType::Coordinator);
459 assert_eq!(payload.flow_spec, spec);
460 assert_eq!(payload.flow_run_id, result.flow_run_id);
461 }
462
463 #[test]
464 fn compiled_step_payloads_deserialize() {
465 let ids = make_ids(2);
466 let spec = make_spec(
467 vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
468 vec![edge(ids[0], ids[1])],
469 );
470 let result = compile(&spec).unwrap();
471 for step in &result.steps {
472 let payload: StepPayload = serde_json::from_slice(step.payload()).unwrap();
473 assert_eq!(payload.task_type, TaskType::Step);
474 assert_eq!(payload.flow_run_id, result.flow_run_id);
475 assert!(result.node_task_map.contains_key(&payload.node_id));
477 }
478 }
479
480 #[test]
481 fn compiled_step_flow_params_present() {
482 let id = NodeId::new();
483 let spec = make_spec_with_params(vec![connector_node(id, "a")], vec![]);
484 let result = compile(&spec).unwrap();
485 let payload: StepPayload = serde_json::from_slice(result.steps[0].payload()).unwrap();
486 assert_eq!(payload.flow_params, Some(json!({"timeout": 30})));
487 }
488
489 #[test]
492 fn compiled_task_specs_have_valid_ids() {
493 let ids = make_ids(2);
494 let spec = make_spec(
495 vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
496 vec![edge(ids[0], ids[1])],
497 );
498 let result = compile(&spec).unwrap();
499 assert!(!result.coordinator.id().is_nil());
500 for step in &result.steps {
501 assert!(!step.id().is_nil());
502 }
503 }
504
505 #[test]
506 fn compiled_task_specs_have_correct_run_policy() {
507 let id = NodeId::new();
508 let spec = make_spec(vec![connector_node(id, "a")], vec![]);
509 let result = compile(&spec).unwrap();
510 assert_eq!(*result.coordinator.run_policy(), RunPolicy::Once);
511 for step in &result.steps {
512 assert_eq!(*step.run_policy(), RunPolicy::Once);
513 }
514 }
515
516 #[test]
517 fn compiled_coordinator_has_no_parent() {
518 let id = NodeId::new();
519 let spec = make_spec(vec![connector_node(id, "a")], vec![]);
520 let result = compile(&spec).unwrap();
521 assert_eq!(result.coordinator.parent_task_id(), None);
522 }
523
524 #[test]
525 fn compiled_steps_have_parent() {
526 let ids = make_ids(3);
527 let spec = make_spec(
528 vec![
529 connector_node(ids[0], "a"),
530 connector_node(ids[1], "b"),
531 connector_node(ids[2], "c"),
532 ],
533 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
534 );
535 let result = compile(&spec).unwrap();
536 let coord_id = result.coordinator.id();
537 for step in &result.steps {
538 assert_eq!(step.parent_task_id(), Some(coord_id));
539 }
540 }
541
542 #[test]
543 fn compiled_step_safety_levels() {
544 let ids = make_ids(4);
545 let spec = make_spec(
546 vec![
547 connector_node(ids[0], "http.request"),
548 transform_node(ids[1]),
549 branch_node_helper(ids[2], ids[3], None),
550 connector_node(ids[3], "fs.write"),
551 ],
552 vec![
553 edge(ids[0], ids[1]),
554 edge(ids[1], ids[2]),
555 branch_edge(ids[2], ids[3], EdgeCondition::BranchTrue),
556 ],
557 );
558 let result = compile(&spec).unwrap();
559
560 let step_map: HashMap<NodeId, &TaskSpec> = result
562 .steps
563 .iter()
564 .map(|s| {
565 let payload: StepPayload = serde_json::from_slice(s.payload()).unwrap();
566 (payload.node_id, s)
567 })
568 .collect();
569
570 assert_eq!(step_map[&ids[0]].constraints().safety_level(), SafetyLevel::Idempotent);
572 assert_eq!(step_map[&ids[1]].constraints().safety_level(), SafetyLevel::Pure);
574 assert_eq!(step_map[&ids[2]].constraints().safety_level(), SafetyLevel::Pure);
576 assert_eq!(step_map[&ids[3]].constraints().safety_level(), SafetyLevel::Idempotent);
578 }
579
580 #[test]
581 fn compiled_step_constraints_match_config() {
582 let id = NodeId::new();
583 let spec = make_spec(vec![connector_node(id, "a")], vec![]);
584 let config = CompilerConfig {
585 default_step_timeout_secs: Some(60),
586 default_step_max_attempts: 5,
587 coordinator_timeout_secs: Some(3600),
588 };
589 let result = compile_with_config(&spec, &config, FlowRunId::new()).unwrap();
590
591 let step = &result.steps[0];
592 assert_eq!(step.constraints().timeout_secs(), Some(60));
593 assert_eq!(step.constraints().max_attempts(), 5);
594
595 assert_eq!(result.coordinator.constraints().timeout_secs(), Some(3600));
596 }
597
598 #[test]
601 fn compile_rejects_invalid_spec() {
602 let ids = make_ids(2);
603 let spec = make_spec(
604 vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
605 vec![edge(ids[0], ids[1]), edge(ids[1], ids[0])], );
607 let err = compile(&spec).unwrap_err();
608 assert!(matches!(err, CompilationError::ValidationFailed(_)));
609 }
610
611 #[test]
612 fn compile_rejects_empty_spec() {
613 let spec = make_spec(vec![], vec![]);
614 let err = compile(&spec).unwrap_err();
615 assert!(matches!(err, CompilationError::ValidationFailed(_)));
617 }
618
619 #[test]
620 fn compile_rejects_dangling_edge() {
621 let id = NodeId::new();
622 let phantom = NodeId::new();
623 let spec = make_spec(vec![connector_node(id, "a")], vec![edge(id, phantom)]);
624 let err = compile(&spec).unwrap_err();
625 assert!(matches!(err, CompilationError::ValidationFailed(_)));
626 }
627
628 #[test]
631 fn node_task_map_has_all_nodes() {
632 let ids = make_ids(3);
633 let spec = make_spec(
634 vec![
635 connector_node(ids[0], "a"),
636 connector_node(ids[1], "b"),
637 connector_node(ids[2], "c"),
638 ],
639 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
640 );
641 let result = compile(&spec).unwrap();
642 for &id in &ids {
643 assert!(result.node_task_map.contains_key(&id));
644 }
645 }
646
647 #[test]
648 fn node_task_map_values_match_step_ids() {
649 let ids = make_ids(3);
650 let spec = make_spec(
651 vec![
652 connector_node(ids[0], "a"),
653 connector_node(ids[1], "b"),
654 connector_node(ids[2], "c"),
655 ],
656 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
657 );
658 let result = compile(&spec).unwrap();
659 let step_ids: std::collections::HashSet<TaskId> =
660 result.steps.iter().map(|s| s.id()).collect();
661 for task_id in result.node_task_map.values() {
662 assert!(step_ids.contains(task_id));
663 }
664 }
665
666 #[test]
667 fn dependency_map_uses_task_ids_from_node_task_map() {
668 let ids = make_ids(3);
669 let spec = make_spec(
670 vec![
671 connector_node(ids[0], "a"),
672 connector_node(ids[1], "b"),
673 connector_node(ids[2], "c"),
674 ],
675 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
676 );
677 let result = compile(&spec).unwrap();
678 let valid_ids: std::collections::HashSet<TaskId> =
679 result.node_task_map.values().copied().collect();
680 for (task_id, deps) in &result.dependencies {
681 assert!(valid_ids.contains(task_id));
682 for dep in deps {
683 assert!(valid_ids.contains(dep));
684 }
685 }
686 }
687
688 #[test]
691 fn custom_config_applied() {
692 let id = NodeId::new();
693 let spec = make_spec(vec![connector_node(id, "a")], vec![]);
694 let config = CompilerConfig {
695 default_step_timeout_secs: Some(120),
696 default_step_max_attempts: 7,
697 coordinator_timeout_secs: Some(7200),
698 };
699 let result = compile_with_config(&spec, &config, FlowRunId::new()).unwrap();
700 assert_eq!(result.steps[0].constraints().timeout_secs(), Some(120));
701 assert_eq!(result.steps[0].constraints().max_attempts(), 7);
702 assert_eq!(result.coordinator.constraints().timeout_secs(), Some(7200));
703 }
704
705 #[test]
706 fn default_config_produces_valid_specs() {
707 let ids = make_ids(3);
708 let spec = make_spec(
709 vec![
710 connector_node(ids[0], "a"),
711 connector_node(ids[1], "b"),
712 connector_node(ids[2], "c"),
713 ],
714 vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
715 );
716 let result = compile(&spec).unwrap();
717 result.coordinator.validate().unwrap();
718 for step in &result.steps {
719 step.validate().unwrap();
720 }
721 }
722
723 fn linear_flow(n: usize) -> FlowSpec {
726 let ids: Vec<NodeId> = (0..n).map(|_| NodeId::new()).collect();
727 let nodes = ids.iter().map(|&id| connector_node(id, "test")).collect();
728 let edges = ids.windows(2).map(|w| edge(w[0], w[1])).collect();
729 FlowSpec { id: None, name: None, nodes, edges, params: None }
730 }
731
732 proptest! {
733 #[test]
734 fn compilation_is_deterministic(n in 1usize..=20) {
735 let spec = linear_flow(n);
736 let flow_run_id = FlowRunId::new();
737 let config = CompilerConfig::default();
738 let r1 = compile_with_config(&spec, &config, flow_run_id).unwrap();
739 let r2 = compile_with_config(&spec, &config, flow_run_id).unwrap();
740 prop_assert_eq!(r1.coordinator.id(), r2.coordinator.id());
742 prop_assert_eq!(r1.flow_run_id, r2.flow_run_id);
743 for (s1, s2) in r1.steps.iter().zip(r2.steps.iter()) {
744 prop_assert_eq!(s1.id(), s2.id());
745 }
746 }
747
748 #[test]
749 fn compiled_dependencies_are_acyclic(n in 1usize..=20) {
750 let spec = linear_flow(n);
751 let result = compile(&spec).unwrap();
752 for (task_id, deps) in &result.dependencies {
754 prop_assert!(!deps.contains(task_id), "task depends on itself");
755 }
756 }
757
758 #[test]
759 fn step_count_equals_node_count(n in 1usize..=20) {
760 let spec = linear_flow(n);
761 let result = compile(&spec).unwrap();
762 prop_assert_eq!(result.steps.len(), spec.nodes.len());
763 }
764 }
765}