Skip to main content

worldinterface_flowspec/
compile.rs

1//! FlowSpec -> ActionQueue DAG compiler.
2//!
3//! The compiler is a pure function: `FlowSpec -> CompilationResult`. It does not
4//! execute anything. The Coordinator handler (Sprint 4) submits the compiled steps
5//! to AQ at runtime.
6
7use std::collections::HashMap;
8
9use actionqueue_core::ids::TaskId;
10use actionqueue_core::task::constraints::TaskConstraints;
11use actionqueue_core::task::metadata::TaskMetadata;
12use actionqueue_core::task::run_policy::RunPolicy;
13use actionqueue_core::task::safety::SafetyLevel;
14use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
15use worldinterface_core::flowspec::topo;
16use worldinterface_core::flowspec::{FlowSpec, NodeType};
17use worldinterface_core::id::{FlowRunId, NodeId};
18
19use crate::config::CompilerConfig;
20use crate::error::CompilationError;
21use crate::id::{derive_coordinator_task_id, derive_task_id};
22use crate::payload::{CoordinatorPayload, StepPayload, TaskType, PAYLOAD_CONTENT_TYPE};
23
24/// The complete output of compiling a FlowSpec.
25#[derive(Debug, Clone)]
26pub struct CompilationResult {
27    /// The Coordinator task spec (root of the AQ hierarchy).
28    pub coordinator: TaskSpec,
29    /// Step task specs, one per FlowSpec node, in topological order.
30    pub steps: Vec<TaskSpec>,
31    /// Dependency declarations: step TaskId -> Vec of prerequisite TaskIds.
32    pub dependencies: HashMap<TaskId, Vec<TaskId>>,
33    /// Mapping from FlowSpec NodeId -> AQ TaskId for each step.
34    pub node_task_map: HashMap<NodeId, TaskId>,
35    /// The FlowRunId assigned to this compilation.
36    pub flow_run_id: FlowRunId,
37}
38
39/// Compile a FlowSpec into an ActionQueue task hierarchy.
40///
41/// Uses `CompilerConfig::default()` and generates a fresh `FlowRunId`.
42pub fn compile(spec: &FlowSpec) -> Result<CompilationResult, CompilationError> {
43    compile_with_config(spec, &CompilerConfig::default(), FlowRunId::new())
44}
45
46/// Compile a FlowSpec with explicit configuration and FlowRunId.
47///
48/// The explicit `flow_run_id` parameter enables deterministic compilation
49/// for testing and crash-resume scenarios.
50pub fn compile_with_config(
51    spec: &FlowSpec,
52    config: &CompilerConfig,
53    flow_run_id: FlowRunId,
54) -> Result<CompilationResult, CompilationError> {
55    // 1. Validate
56    spec.validate()?;
57
58    if spec.nodes.is_empty() {
59        return Err(CompilationError::EmptyFlowSpec);
60    }
61
62    // 2. Compute topological order
63    let topo_order = topo::topological_sort(spec).expect(
64        "topological_sort failed after successful validation (cycle should have been caught by \
65         validate)",
66    );
67
68    // 3. Build NodeId -> TaskId mapping (deterministic)
69    let node_task_map: HashMap<NodeId, TaskId> =
70        topo_order.iter().map(|&node_id| (node_id, derive_task_id(flow_run_id, node_id))).collect();
71
72    // 4. Build dependency map from edges
73    let mut dependencies: HashMap<TaskId, Vec<TaskId>> = HashMap::new();
74    for edge in &spec.edges {
75        if let (Some(&to_task), Some(&from_task)) =
76            (node_task_map.get(&edge.to), node_task_map.get(&edge.from))
77        {
78            dependencies.entry(to_task).or_default().push(from_task);
79        }
80    }
81
82    // 5. Build node index for quick lookup
83    let node_index: HashMap<NodeId, &worldinterface_core::flowspec::Node> =
84        spec.nodes.iter().map(|n| (n.id, n)).collect();
85
86    // 6. Generate Coordinator TaskSpec
87    let coordinator_task_id = derive_coordinator_task_id(flow_run_id);
88
89    let coordinator_payload = CoordinatorPayload {
90        task_type: TaskType::Coordinator,
91        flow_spec: spec.clone(),
92        flow_run_id,
93        node_task_map: node_task_map.clone(),
94        dependencies: dependencies.clone(),
95    };
96    let coordinator_payload_bytes = serde_json::to_vec(&coordinator_payload)?;
97
98    let mut coordinator_constraints = TaskConstraints::default();
99    coordinator_constraints.set_safety_level(SafetyLevel::Pure);
100    if let Some(timeout) = config.coordinator_timeout_secs {
101        coordinator_constraints.set_timeout_secs(Some(timeout)).map_err(|e| {
102            CompilationError::TaskSpecFailed {
103                node_id: None,
104                source: actionqueue_core::task::task_spec::TaskSpecError::InvalidConstraints(e),
105            }
106        })?;
107    }
108
109    let coordinator_metadata = TaskMetadata::new(
110        vec!["wi".into(), "coordinator".into()],
111        0,
112        spec.name.as_ref().map(|n| format!("Coordinator for flow: {n}")),
113    );
114
115    let coordinator = TaskSpec::new(
116        coordinator_task_id,
117        TaskPayload::with_content_type(coordinator_payload_bytes, PAYLOAD_CONTENT_TYPE),
118        RunPolicy::Once,
119        coordinator_constraints,
120        coordinator_metadata,
121    )
122    .map_err(|e| CompilationError::TaskSpecFailed { node_id: None, source: e })?;
123
124    // 7. Generate Step TaskSpecs in topological order
125    let mut steps = Vec::with_capacity(topo_order.len());
126
127    for &node_id in &topo_order {
128        let node = node_index[&node_id];
129        let task_id = node_task_map[&node_id];
130
131        let step_payload = StepPayload {
132            task_type: TaskType::Step,
133            flow_run_id,
134            node_id,
135            node_type: node.node_type.clone(),
136            flow_params: spec.params.clone(),
137        };
138        let step_payload_bytes = serde_json::to_vec(&step_payload)?;
139
140        let safety_level = match &node.node_type {
141            NodeType::Connector(_) => SafetyLevel::Idempotent,
142            NodeType::Transform(_) => SafetyLevel::Pure,
143            NodeType::Branch(_) => SafetyLevel::Pure,
144        };
145
146        let mut step_constraints = TaskConstraints::new(
147            config.default_step_max_attempts,
148            config.default_step_timeout_secs,
149            None,
150        )
151        .map_err(|e| CompilationError::TaskSpecFailed {
152            node_id: Some(node_id),
153            source: actionqueue_core::task::task_spec::TaskSpecError::InvalidConstraints(e),
154        })?;
155        step_constraints.set_safety_level(safety_level);
156
157        let step_metadata =
158            TaskMetadata::new(vec!["wi".into(), "step".into()], 0, node.label.clone());
159
160        let step = TaskSpec::new(
161            task_id,
162            TaskPayload::with_content_type(step_payload_bytes, PAYLOAD_CONTENT_TYPE),
163            RunPolicy::Once,
164            step_constraints,
165            step_metadata,
166        )
167        .map_err(|e| CompilationError::TaskSpecFailed { node_id: Some(node_id), source: e })?
168        .with_parent(coordinator_task_id);
169
170        steps.push(step);
171    }
172
173    Ok(CompilationResult { coordinator, steps, dependencies, node_task_map, flow_run_id })
174}
175
176#[cfg(test)]
177mod tests {
178    use proptest::prelude::*;
179    use serde_json::json;
180    use worldinterface_core::flowspec::*;
181
182    use super::*;
183
184    fn connector_node(id: NodeId, name: &str) -> Node {
185        Node {
186            id,
187            label: Some(name.into()),
188            node_type: NodeType::Connector(ConnectorNode {
189                connector: name.into(),
190                params: json!({}),
191                idempotency_config: None,
192            }),
193        }
194    }
195
196    fn transform_node(id: NodeId) -> Node {
197        Node {
198            id,
199            label: None,
200            node_type: NodeType::Transform(TransformNode {
201                transform: TransformType::Identity,
202                input: json!({}),
203            }),
204        }
205    }
206
207    fn branch_node_helper(id: NodeId, then_edge: NodeId, else_edge: Option<NodeId>) -> Node {
208        Node {
209            id,
210            label: None,
211            node_type: NodeType::Branch(BranchNode {
212                condition: BranchCondition::Exists(ParamRef::FlowParam { path: "flag".into() }),
213                then_edge,
214                else_edge,
215            }),
216        }
217    }
218
219    fn edge(from: NodeId, to: NodeId) -> Edge {
220        Edge { from, to, condition: None }
221    }
222
223    fn branch_edge(from: NodeId, to: NodeId, condition: EdgeCondition) -> Edge {
224        Edge { from, to, condition: Some(condition) }
225    }
226
227    fn make_ids(n: usize) -> Vec<NodeId> {
228        (0..n).map(|_| NodeId::new()).collect()
229    }
230
231    fn make_spec(nodes: Vec<Node>, edges: Vec<Edge>) -> FlowSpec {
232        FlowSpec { id: None, name: None, nodes, edges, params: None }
233    }
234
235    fn make_spec_with_params(nodes: Vec<Node>, edges: Vec<Edge>) -> FlowSpec {
236        FlowSpec {
237            id: None,
238            name: Some("test-flow".into()),
239            nodes,
240            edges,
241            params: Some(json!({"timeout": 30})),
242        }
243    }
244
245    // --- T-3: Compiler - Linear Flow ---
246
247    #[test]
248    fn compile_single_node() {
249        let id = NodeId::new();
250        let spec = make_spec(vec![connector_node(id, "delay")], vec![]);
251        let result = compile(&spec).unwrap();
252        assert_eq!(result.steps.len(), 1);
253        assert!(result.dependencies.is_empty());
254    }
255
256    #[test]
257    fn compile_linear_three_nodes() {
258        let ids = make_ids(3);
259        let spec = make_spec(
260            vec![
261                connector_node(ids[0], "a"),
262                connector_node(ids[1], "b"),
263                connector_node(ids[2], "c"),
264            ],
265            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
266        );
267        let result = compile(&spec).unwrap();
268        assert_eq!(result.steps.len(), 3);
269
270        let task_a = result.node_task_map[&ids[0]];
271        let task_b = result.node_task_map[&ids[1]];
272        let task_c = result.node_task_map[&ids[2]];
273
274        // B depends on A
275        assert!(result.dependencies[&task_b].contains(&task_a));
276        // C depends on B
277        assert!(result.dependencies[&task_c].contains(&task_b));
278        // A has no dependencies
279        assert!(!result.dependencies.contains_key(&task_a));
280    }
281
282    #[test]
283    fn compile_linear_five_nodes() {
284        let ids = make_ids(5);
285        let spec = make_spec(
286            vec![
287                connector_node(ids[0], "a"),
288                connector_node(ids[1], "b"),
289                connector_node(ids[2], "c"),
290                connector_node(ids[3], "d"),
291                connector_node(ids[4], "e"),
292            ],
293            vec![
294                edge(ids[0], ids[1]),
295                edge(ids[1], ids[2]),
296                edge(ids[2], ids[3]),
297                edge(ids[3], ids[4]),
298            ],
299        );
300        let result = compile(&spec).unwrap();
301        assert_eq!(result.steps.len(), 5);
302
303        for i in 1..5 {
304            let task = result.node_task_map[&ids[i]];
305            let prev = result.node_task_map[&ids[i - 1]];
306            assert!(result.dependencies[&task].contains(&prev));
307        }
308    }
309
310    #[test]
311    fn compile_step_has_parent() {
312        let ids = make_ids(2);
313        let spec = make_spec(
314            vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
315            vec![edge(ids[0], ids[1])],
316        );
317        let result = compile(&spec).unwrap();
318        let coord_id = result.coordinator.id();
319        for step in &result.steps {
320            assert_eq!(step.parent_task_id(), Some(coord_id));
321        }
322    }
323
324    #[test]
325    fn compile_step_task_ids_unique() {
326        let ids = make_ids(3);
327        let spec = make_spec(
328            vec![
329                connector_node(ids[0], "a"),
330                connector_node(ids[1], "b"),
331                connector_node(ids[2], "c"),
332            ],
333            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
334        );
335        let result = compile(&spec).unwrap();
336        let mut task_ids: Vec<TaskId> = result.steps.iter().map(|s| s.id()).collect();
337        task_ids.push(result.coordinator.id());
338        task_ids.sort_by_key(|id| *id.as_uuid());
339        task_ids.dedup_by_key(|id| *id.as_uuid());
340        assert_eq!(task_ids.len(), 4); // 3 steps + 1 coordinator
341    }
342
343    #[test]
344    fn compile_step_order_is_topological() {
345        let ids = make_ids(3);
346        let spec = make_spec(
347            vec![
348                connector_node(ids[0], "a"),
349                connector_node(ids[1], "b"),
350                connector_node(ids[2], "c"),
351            ],
352            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
353        );
354        let result = compile(&spec).unwrap();
355        let step_task_ids: Vec<TaskId> = result.steps.iter().map(|s| s.id()).collect();
356        // Step 0 should be task for ids[0], etc.
357        assert_eq!(step_task_ids[0], result.node_task_map[&ids[0]]);
358        assert_eq!(step_task_ids[1], result.node_task_map[&ids[1]]);
359        assert_eq!(step_task_ids[2], result.node_task_map[&ids[2]]);
360    }
361
362    // --- T-4: Compiler - Branch Flow ---
363
364    #[test]
365    fn compile_branch_simple() {
366        let ids = make_ids(4);
367        // A -> Branch -> B | C
368        let spec = make_spec(
369            vec![
370                connector_node(ids[0], "a"),
371                branch_node_helper(ids[1], ids[2], Some(ids[3])),
372                connector_node(ids[2], "b"),
373                connector_node(ids[3], "c"),
374            ],
375            vec![
376                edge(ids[0], ids[1]),
377                branch_edge(ids[1], ids[2], EdgeCondition::BranchTrue),
378                branch_edge(ids[1], ids[3], EdgeCondition::BranchFalse),
379            ],
380        );
381        let result = compile(&spec).unwrap();
382        assert_eq!(result.steps.len(), 4);
383
384        let task_branch = result.node_task_map[&ids[1]];
385        let task_b = result.node_task_map[&ids[2]];
386        let task_c = result.node_task_map[&ids[3]];
387
388        // Both B and C depend on Branch
389        assert!(result.dependencies[&task_b].contains(&task_branch));
390        assert!(result.dependencies[&task_c].contains(&task_branch));
391    }
392
393    #[test]
394    fn compile_branch_then_only() {
395        let ids = make_ids(3);
396        // A -> Branch -> B (no else)
397        let spec = make_spec(
398            vec![
399                connector_node(ids[0], "a"),
400                branch_node_helper(ids[1], ids[2], None),
401                connector_node(ids[2], "b"),
402            ],
403            vec![edge(ids[0], ids[1]), branch_edge(ids[1], ids[2], EdgeCondition::BranchTrue)],
404        );
405        let result = compile(&spec).unwrap();
406        assert_eq!(result.steps.len(), 3);
407
408        let task_branch = result.node_task_map[&ids[1]];
409        let task_b = result.node_task_map[&ids[2]];
410        assert!(result.dependencies[&task_b].contains(&task_branch));
411    }
412
413    #[test]
414    fn compile_branch_with_downstream() {
415        let ids = make_ids(5);
416        // A -> Branch -> B | C, B -> D, C -> D
417        let spec = make_spec(
418            vec![
419                connector_node(ids[0], "a"),
420                branch_node_helper(ids[1], ids[2], Some(ids[3])),
421                connector_node(ids[2], "b"),
422                connector_node(ids[3], "c"),
423                connector_node(ids[4], "d"),
424            ],
425            vec![
426                edge(ids[0], ids[1]),
427                branch_edge(ids[1], ids[2], EdgeCondition::BranchTrue),
428                branch_edge(ids[1], ids[3], EdgeCondition::BranchFalse),
429                edge(ids[2], ids[4]),
430                edge(ids[3], ids[4]),
431            ],
432        );
433        let result = compile(&spec).unwrap();
434        assert_eq!(result.steps.len(), 5);
435
436        let task_b = result.node_task_map[&ids[2]];
437        let task_c = result.node_task_map[&ids[3]];
438        let task_d = result.node_task_map[&ids[4]];
439
440        // D depends on both B and C
441        let d_deps = &result.dependencies[&task_d];
442        assert!(d_deps.contains(&task_b));
443        assert!(d_deps.contains(&task_c));
444    }
445
446    // --- T-6: Payload Inspection ---
447
448    #[test]
449    fn compiled_coordinator_payload_deserializes() {
450        let ids = make_ids(2);
451        let spec = make_spec_with_params(
452            vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
453            vec![edge(ids[0], ids[1])],
454        );
455        let result = compile(&spec).unwrap();
456        let payload_bytes = result.coordinator.payload();
457        let payload: CoordinatorPayload = serde_json::from_slice(payload_bytes).unwrap();
458        assert_eq!(payload.task_type, TaskType::Coordinator);
459        assert_eq!(payload.flow_spec, spec);
460        assert_eq!(payload.flow_run_id, result.flow_run_id);
461    }
462
463    #[test]
464    fn compiled_step_payloads_deserialize() {
465        let ids = make_ids(2);
466        let spec = make_spec(
467            vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
468            vec![edge(ids[0], ids[1])],
469        );
470        let result = compile(&spec).unwrap();
471        for step in &result.steps {
472            let payload: StepPayload = serde_json::from_slice(step.payload()).unwrap();
473            assert_eq!(payload.task_type, TaskType::Step);
474            assert_eq!(payload.flow_run_id, result.flow_run_id);
475            // node_id should be in the node_task_map
476            assert!(result.node_task_map.contains_key(&payload.node_id));
477        }
478    }
479
480    #[test]
481    fn compiled_step_flow_params_present() {
482        let id = NodeId::new();
483        let spec = make_spec_with_params(vec![connector_node(id, "a")], vec![]);
484        let result = compile(&spec).unwrap();
485        let payload: StepPayload = serde_json::from_slice(result.steps[0].payload()).unwrap();
486        assert_eq!(payload.flow_params, Some(json!({"timeout": 30})));
487    }
488
489    // --- T-7: TaskSpec Validity ---
490
491    #[test]
492    fn compiled_task_specs_have_valid_ids() {
493        let ids = make_ids(2);
494        let spec = make_spec(
495            vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
496            vec![edge(ids[0], ids[1])],
497        );
498        let result = compile(&spec).unwrap();
499        assert!(!result.coordinator.id().is_nil());
500        for step in &result.steps {
501            assert!(!step.id().is_nil());
502        }
503    }
504
505    #[test]
506    fn compiled_task_specs_have_correct_run_policy() {
507        let id = NodeId::new();
508        let spec = make_spec(vec![connector_node(id, "a")], vec![]);
509        let result = compile(&spec).unwrap();
510        assert_eq!(*result.coordinator.run_policy(), RunPolicy::Once);
511        for step in &result.steps {
512            assert_eq!(*step.run_policy(), RunPolicy::Once);
513        }
514    }
515
516    #[test]
517    fn compiled_coordinator_has_no_parent() {
518        let id = NodeId::new();
519        let spec = make_spec(vec![connector_node(id, "a")], vec![]);
520        let result = compile(&spec).unwrap();
521        assert_eq!(result.coordinator.parent_task_id(), None);
522    }
523
524    #[test]
525    fn compiled_steps_have_parent() {
526        let ids = make_ids(3);
527        let spec = make_spec(
528            vec![
529                connector_node(ids[0], "a"),
530                connector_node(ids[1], "b"),
531                connector_node(ids[2], "c"),
532            ],
533            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
534        );
535        let result = compile(&spec).unwrap();
536        let coord_id = result.coordinator.id();
537        for step in &result.steps {
538            assert_eq!(step.parent_task_id(), Some(coord_id));
539        }
540    }
541
542    #[test]
543    fn compiled_step_safety_levels() {
544        let ids = make_ids(4);
545        let spec = make_spec(
546            vec![
547                connector_node(ids[0], "http.request"),
548                transform_node(ids[1]),
549                branch_node_helper(ids[2], ids[3], None),
550                connector_node(ids[3], "fs.write"),
551            ],
552            vec![
553                edge(ids[0], ids[1]),
554                edge(ids[1], ids[2]),
555                branch_edge(ids[2], ids[3], EdgeCondition::BranchTrue),
556            ],
557        );
558        let result = compile(&spec).unwrap();
559
560        // Build a map of node_id -> step for easy lookup
561        let step_map: HashMap<NodeId, &TaskSpec> = result
562            .steps
563            .iter()
564            .map(|s| {
565                let payload: StepPayload = serde_json::from_slice(s.payload()).unwrap();
566                (payload.node_id, s)
567            })
568            .collect();
569
570        // Connector -> Idempotent
571        assert_eq!(step_map[&ids[0]].constraints().safety_level(), SafetyLevel::Idempotent);
572        // Transform -> Pure
573        assert_eq!(step_map[&ids[1]].constraints().safety_level(), SafetyLevel::Pure);
574        // Branch -> Pure
575        assert_eq!(step_map[&ids[2]].constraints().safety_level(), SafetyLevel::Pure);
576        // Connector -> Idempotent
577        assert_eq!(step_map[&ids[3]].constraints().safety_level(), SafetyLevel::Idempotent);
578    }
579
580    #[test]
581    fn compiled_step_constraints_match_config() {
582        let id = NodeId::new();
583        let spec = make_spec(vec![connector_node(id, "a")], vec![]);
584        let config = CompilerConfig {
585            default_step_timeout_secs: Some(60),
586            default_step_max_attempts: 5,
587            coordinator_timeout_secs: Some(3600),
588        };
589        let result = compile_with_config(&spec, &config, FlowRunId::new()).unwrap();
590
591        let step = &result.steps[0];
592        assert_eq!(step.constraints().timeout_secs(), Some(60));
593        assert_eq!(step.constraints().max_attempts(), 5);
594
595        assert_eq!(result.coordinator.constraints().timeout_secs(), Some(3600));
596    }
597
598    // --- T-8: Error Cases ---
599
600    #[test]
601    fn compile_rejects_invalid_spec() {
602        let ids = make_ids(2);
603        let spec = make_spec(
604            vec![connector_node(ids[0], "a"), connector_node(ids[1], "b")],
605            vec![edge(ids[0], ids[1]), edge(ids[1], ids[0])], // cycle
606        );
607        let err = compile(&spec).unwrap_err();
608        assert!(matches!(err, CompilationError::ValidationFailed(_)));
609    }
610
611    #[test]
612    fn compile_rejects_empty_spec() {
613        let spec = make_spec(vec![], vec![]);
614        let err = compile(&spec).unwrap_err();
615        // Empty spec is caught by validation (NoNodes rule)
616        assert!(matches!(err, CompilationError::ValidationFailed(_)));
617    }
618
619    #[test]
620    fn compile_rejects_dangling_edge() {
621        let id = NodeId::new();
622        let phantom = NodeId::new();
623        let spec = make_spec(vec![connector_node(id, "a")], vec![edge(id, phantom)]);
624        let err = compile(&spec).unwrap_err();
625        assert!(matches!(err, CompilationError::ValidationFailed(_)));
626    }
627
628    // --- T-9: Node-Task Mapping ---
629
630    #[test]
631    fn node_task_map_has_all_nodes() {
632        let ids = make_ids(3);
633        let spec = make_spec(
634            vec![
635                connector_node(ids[0], "a"),
636                connector_node(ids[1], "b"),
637                connector_node(ids[2], "c"),
638            ],
639            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
640        );
641        let result = compile(&spec).unwrap();
642        for &id in &ids {
643            assert!(result.node_task_map.contains_key(&id));
644        }
645    }
646
647    #[test]
648    fn node_task_map_values_match_step_ids() {
649        let ids = make_ids(3);
650        let spec = make_spec(
651            vec![
652                connector_node(ids[0], "a"),
653                connector_node(ids[1], "b"),
654                connector_node(ids[2], "c"),
655            ],
656            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
657        );
658        let result = compile(&spec).unwrap();
659        let step_ids: std::collections::HashSet<TaskId> =
660            result.steps.iter().map(|s| s.id()).collect();
661        for task_id in result.node_task_map.values() {
662            assert!(step_ids.contains(task_id));
663        }
664    }
665
666    #[test]
667    fn dependency_map_uses_task_ids_from_node_task_map() {
668        let ids = make_ids(3);
669        let spec = make_spec(
670            vec![
671                connector_node(ids[0], "a"),
672                connector_node(ids[1], "b"),
673                connector_node(ids[2], "c"),
674            ],
675            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
676        );
677        let result = compile(&spec).unwrap();
678        let valid_ids: std::collections::HashSet<TaskId> =
679            result.node_task_map.values().copied().collect();
680        for (task_id, deps) in &result.dependencies {
681            assert!(valid_ids.contains(task_id));
682            for dep in deps {
683                assert!(valid_ids.contains(dep));
684            }
685        }
686    }
687
688    // --- T-10: Configuration ---
689
690    #[test]
691    fn custom_config_applied() {
692        let id = NodeId::new();
693        let spec = make_spec(vec![connector_node(id, "a")], vec![]);
694        let config = CompilerConfig {
695            default_step_timeout_secs: Some(120),
696            default_step_max_attempts: 7,
697            coordinator_timeout_secs: Some(7200),
698        };
699        let result = compile_with_config(&spec, &config, FlowRunId::new()).unwrap();
700        assert_eq!(result.steps[0].constraints().timeout_secs(), Some(120));
701        assert_eq!(result.steps[0].constraints().max_attempts(), 7);
702        assert_eq!(result.coordinator.constraints().timeout_secs(), Some(7200));
703    }
704
705    #[test]
706    fn default_config_produces_valid_specs() {
707        let ids = make_ids(3);
708        let spec = make_spec(
709            vec![
710                connector_node(ids[0], "a"),
711                connector_node(ids[1], "b"),
712                connector_node(ids[2], "c"),
713            ],
714            vec![edge(ids[0], ids[1]), edge(ids[1], ids[2])],
715        );
716        let result = compile(&spec).unwrap();
717        result.coordinator.validate().unwrap();
718        for step in &result.steps {
719            step.validate().unwrap();
720        }
721    }
722
723    // --- T-11: Property-Based Tests ---
724
725    fn linear_flow(n: usize) -> FlowSpec {
726        let ids: Vec<NodeId> = (0..n).map(|_| NodeId::new()).collect();
727        let nodes = ids.iter().map(|&id| connector_node(id, "test")).collect();
728        let edges = ids.windows(2).map(|w| edge(w[0], w[1])).collect();
729        FlowSpec { id: None, name: None, nodes, edges, params: None }
730    }
731
732    proptest! {
733        #[test]
734        fn compilation_is_deterministic(n in 1usize..=20) {
735            let spec = linear_flow(n);
736            let flow_run_id = FlowRunId::new();
737            let config = CompilerConfig::default();
738            let r1 = compile_with_config(&spec, &config, flow_run_id).unwrap();
739            let r2 = compile_with_config(&spec, &config, flow_run_id).unwrap();
740            // Same TaskIds
741            prop_assert_eq!(r1.coordinator.id(), r2.coordinator.id());
742            prop_assert_eq!(r1.flow_run_id, r2.flow_run_id);
743            for (s1, s2) in r1.steps.iter().zip(r2.steps.iter()) {
744                prop_assert_eq!(s1.id(), s2.id());
745            }
746        }
747
748        #[test]
749        fn compiled_dependencies_are_acyclic(n in 1usize..=20) {
750            let spec = linear_flow(n);
751            let result = compile(&spec).unwrap();
752            // Verify no self-dependencies
753            for (task_id, deps) in &result.dependencies {
754                prop_assert!(!deps.contains(task_id), "task depends on itself");
755            }
756        }
757
758        #[test]
759        fn step_count_equals_node_count(n in 1usize..=20) {
760            let spec = linear_flow(n);
761            let result = compile(&spec).unwrap();
762            prop_assert_eq!(result.steps.len(), spec.nodes.len());
763        }
764    }
765}