Skip to main content

pureflow_workflow/
lib.rs

1//! External workflow definitions and validation entrypoints.
2//!
3//! ## Fragment: workflow-structural-boundary
4//!
5//! This crate owns the static workflow graph shape. It validates structural
6//! facts that must be true before any runtime can reason about execution:
7//! nodes are uniquely named, ports are uniquely named within a node, and edges
8//! connect declared output ports to declared input ports. Runtime concerns such
9//! as scheduling policy, cycles, payload compatibility, cancellation, and
10//! backpressure are intentionally left to later layers.
11//!
12//! ## Fragment: workflow-validation-scope
13//!
14//! The validation rules stop at structural honesty on purpose. A graph can be
15//! structurally valid and still semantically wrong for a later runtime or data
16//! model. Keeping that line clear prevents the workflow crate from accumulating
17//! scheduling, typing, or capability policy that belongs elsewhere.
18//!
19//! ## Fragment: workflow-deterministic-errors
20//!
21//! Validation uses ordered maps and sets so duplicate detection and missing-edge
22//! errors are reported deterministically. The graphs are small enough that this
23//! tradeoff favors stable diagnostics and test output over marginal hash-table
24//! speed.
25
26use std::collections::{BTreeMap, BTreeSet};
27use std::error::Error;
28use std::fmt;
29use std::num::NonZeroUsize;
30
31use pureflow_types::{IdentifierError, NodeId, PortId, WorkflowId};
32
33/// Direction of a port in a node's static topology.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum PortDirection {
36    /// A port that receives data or control from an upstream node.
37    Input,
38    /// A port that emits data or control to a downstream node.
39    Output,
40}
41
42impl PortDirection {
43    const fn label(self) -> &'static str {
44        match self {
45            Self::Input => "input",
46            Self::Output => "output",
47        }
48    }
49}
50
51/// Which side of an edge failed validation.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum EdgeEndpointRole {
54    /// The upstream endpoint of an edge.
55    Source,
56    /// The downstream endpoint of an edge.
57    Target,
58}
59
60impl EdgeEndpointRole {
61    const fn label(self) -> &'static str {
62        match self {
63            Self::Source => "source",
64            Self::Target => "target",
65        }
66    }
67}
68
69/// Error returned when a workflow graph is structurally invalid.
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum WorkflowValidationError {
72    /// Two nodes in the graph used the same identifier.
73    DuplicateNode {
74        /// Duplicated node identifier.
75        node_id: NodeId,
76    },
77    /// A node declared the same port identifier more than once.
78    DuplicatePort {
79        /// Node that owns the duplicated port.
80        node_id: NodeId,
81        /// Duplicated port identifier.
82        port_id: PortId,
83    },
84    /// An edge referenced a node that is not declared in the graph.
85    UnknownNode {
86        /// Zero-based index of the invalid edge.
87        edge_index: usize,
88        /// Endpoint role that referenced the missing node.
89        endpoint: EdgeEndpointRole,
90        /// Missing node identifier.
91        node_id: NodeId,
92    },
93    /// An edge referenced a port that is not declared for the required direction.
94    UnknownPort {
95        /// Zero-based index of the invalid edge.
96        edge_index: usize,
97        /// Endpoint role that referenced the missing port.
98        endpoint: EdgeEndpointRole,
99        /// Node that should own the port.
100        node_id: NodeId,
101        /// Missing port identifier.
102        port_id: PortId,
103        /// Direction required by this endpoint.
104        expected: PortDirection,
105    },
106    /// The graph contains a directed cycle.
107    CycleDetected {
108        /// One detected cycle, reported in traversal order.
109        cycle: Vec<NodeId>,
110    },
111}
112
113impl fmt::Display for WorkflowValidationError {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        match self {
116            Self::DuplicateNode { node_id } => {
117                write!(f, "workflow graph contains duplicate node `{node_id}`")
118            }
119            Self::DuplicatePort { node_id, port_id } => {
120                write!(f, "node `{node_id}` contains duplicate port `{port_id}`")
121            }
122            Self::UnknownNode {
123                edge_index,
124                endpoint,
125                node_id,
126            } => write!(
127                f,
128                "edge {edge_index} {} references unknown node `{node_id}`",
129                endpoint.label()
130            ),
131            Self::UnknownPort {
132                edge_index,
133                endpoint,
134                node_id,
135                port_id,
136                expected,
137            } => write!(
138                f,
139                "edge {edge_index} {} references unknown {} port `{port_id}` on node `{node_id}`",
140                endpoint.label(),
141                expected.label()
142            ),
143            Self::CycleDetected { cycle } => {
144                write!(f, "workflow graph contains a cycle involving")?;
145                for node_id in cycle {
146                    write!(f, " `{node_id}`")?;
147                }
148                Ok(())
149            }
150        }
151    }
152}
153
154impl Error for WorkflowValidationError {}
155
156/// Static endpoint for one side of a workflow edge.
157#[derive(Debug, Clone, PartialEq, Eq)]
158pub struct EdgeEndpoint {
159    node_id: NodeId,
160    port_id: PortId,
161}
162
163impl EdgeEndpoint {
164    /// Create an edge endpoint from a node and port identifier.
165    #[must_use]
166    pub const fn new(node_id: NodeId, port_id: PortId) -> Self {
167        Self { node_id, port_id }
168    }
169
170    /// Node referenced by this endpoint.
171    #[must_use]
172    pub const fn node_id(&self) -> &NodeId {
173        &self.node_id
174    }
175
176    /// Port referenced by this endpoint.
177    #[must_use]
178    pub const fn port_id(&self) -> &PortId {
179        &self.port_id
180    }
181}
182
183/// Capacity policy for a workflow edge.
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum EdgeCapacity {
186    /// Use the engine default capacity.
187    Default,
188    /// Use an explicit bounded capacity.
189    Explicit(NonZeroUsize),
190}
191
192impl EdgeCapacity {
193    /// Resolve this capacity policy against the runtime default.
194    #[must_use]
195    pub const fn resolve(self, default: NonZeroUsize) -> NonZeroUsize {
196        match self {
197            Self::Default => default,
198            Self::Explicit(capacity) => capacity,
199        }
200    }
201}
202
203/// Directed connection from one output port to one input port.
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct EdgeDefinition {
206    source: EdgeEndpoint,
207    target: EdgeEndpoint,
208    capacity: EdgeCapacity,
209}
210
211impl EdgeDefinition {
212    /// Create an edge from an upstream endpoint to a downstream endpoint.
213    #[must_use]
214    pub const fn new(source: EdgeEndpoint, target: EdgeEndpoint) -> Self {
215        Self {
216            source,
217            target,
218            capacity: EdgeCapacity::Default,
219        }
220    }
221
222    /// Create an edge with an explicit bounded capacity.
223    #[must_use]
224    pub const fn with_capacity(
225        source: EdgeEndpoint,
226        target: EdgeEndpoint,
227        capacity: NonZeroUsize,
228    ) -> Self {
229        Self {
230            source,
231            target,
232            capacity: EdgeCapacity::Explicit(capacity),
233        }
234    }
235
236    /// Upstream output endpoint.
237    #[must_use]
238    pub const fn source(&self) -> &EdgeEndpoint {
239        &self.source
240    }
241
242    /// Downstream input endpoint.
243    #[must_use]
244    pub const fn target(&self) -> &EdgeEndpoint {
245        &self.target
246    }
247
248    /// Capacity policy for this edge.
249    #[must_use]
250    pub const fn capacity(&self) -> EdgeCapacity {
251        self.capacity
252    }
253}
254
255/// Static node declaration and its input/output port topology.
256#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct NodeDefinition {
258    id: NodeId,
259    input_ports: Vec<PortId>,
260    output_ports: Vec<PortId>,
261}
262
263impl NodeDefinition {
264    /// Create a node with declared input and output ports.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if a port identifier is repeated within this node,
269    /// including reuse across input and output directions.
270    pub fn new(
271        id: NodeId,
272        input_ports: impl Into<Vec<PortId>>,
273        output_ports: impl Into<Vec<PortId>>,
274    ) -> Result<Self, WorkflowValidationError> {
275        let input_ports: Vec<PortId> = input_ports.into();
276        let output_ports: Vec<PortId> = output_ports.into();
277        reject_duplicate_ports(&id, &input_ports, &output_ports)?;
278
279        Ok(Self {
280            id,
281            input_ports,
282            output_ports,
283        })
284    }
285
286    /// Node identifier.
287    #[must_use]
288    pub const fn id(&self) -> &NodeId {
289        &self.id
290    }
291
292    /// Declared input ports.
293    #[must_use]
294    pub fn input_ports(&self) -> &[PortId] {
295        &self.input_ports
296    }
297
298    /// Declared output ports.
299    #[must_use]
300    pub fn output_ports(&self) -> &[PortId] {
301        &self.output_ports
302    }
303}
304
305/// Validated graph-level workflow structure.
306#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct WorkflowGraph {
308    nodes: Vec<NodeDefinition>,
309    edges: Vec<EdgeDefinition>,
310}
311
312impl WorkflowGraph {
313    /// Create and validate a workflow graph.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error when nodes or ports are duplicated, when an edge
318    /// references an undeclared node or the wrong port direction, or when the
319    /// graph contains a directed cycle.
320    pub fn new(
321        nodes: impl Into<Vec<NodeDefinition>>,
322        edges: impl Into<Vec<EdgeDefinition>>,
323    ) -> Result<Self, WorkflowValidationError> {
324        Self::build(nodes, edges, false)
325    }
326
327    /// Create and validate a workflow graph while allowing cycles.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error when nodes or ports are duplicated, or when an edge
332    /// references an undeclared node or the wrong port direction.
333    pub fn with_cycles_allowed(
334        nodes: impl Into<Vec<NodeDefinition>>,
335        edges: impl Into<Vec<EdgeDefinition>>,
336    ) -> Result<Self, WorkflowValidationError> {
337        Self::build(nodes, edges, true)
338    }
339
340    /// Create an empty graph with no nodes or edges.
341    #[must_use]
342    pub const fn empty() -> Self {
343        Self {
344            nodes: Vec::new(),
345            edges: Vec::new(),
346        }
347    }
348
349    /// Declared nodes in stable workflow order.
350    #[must_use]
351    pub fn nodes(&self) -> &[NodeDefinition] {
352        &self.nodes
353    }
354
355    /// Declared edges in stable workflow order.
356    #[must_use]
357    pub fn edges(&self) -> &[EdgeDefinition] {
358        &self.edges
359    }
360
361    /// Return a deterministic topological order for the nodes in this graph.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error when the graph is structurally invalid or contains a
366    /// directed cycle.
367    pub fn topological_order(&self) -> Result<Vec<NodeId>, WorkflowValidationError> {
368        let topology: GraphTopology = GraphTopology::from_graph(&self.nodes, &self.edges)?;
369        topology.topological_order()
370    }
371
372    fn build(
373        nodes: impl Into<Vec<NodeDefinition>>,
374        edges: impl Into<Vec<EdgeDefinition>>,
375        allow_cycles: bool,
376    ) -> Result<Self, WorkflowValidationError> {
377        let graph: Self = Self {
378            nodes: nodes.into(),
379            edges: edges.into(),
380        };
381        graph.validate(allow_cycles)?;
382        Ok(graph)
383    }
384
385    fn validate(&self, allow_cycles: bool) -> Result<(), WorkflowValidationError> {
386        let topology: GraphTopology = GraphTopology::from_graph(&self.nodes, &self.edges)?;
387        if !allow_cycles {
388            topology.topological_order()?;
389        }
390        Ok(())
391    }
392}
393
394/// Parsed workflow definition independent of runtime execution.
395#[derive(Debug, Clone, PartialEq, Eq)]
396pub struct WorkflowDefinition {
397    id: WorkflowId,
398    graph: WorkflowGraph,
399}
400
401impl WorkflowDefinition {
402    /// Create a workflow definition from an already validated graph.
403    #[must_use]
404    pub const fn new(id: WorkflowId, graph: WorkflowGraph) -> Self {
405        Self { id, graph }
406    }
407
408    /// Create a workflow definition from raw graph parts.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error when the graph is structurally invalid.
413    pub fn from_parts(
414        id: WorkflowId,
415        nodes: impl Into<Vec<NodeDefinition>>,
416        edges: impl Into<Vec<EdgeDefinition>>,
417    ) -> Result<Self, WorkflowValidationError> {
418        let graph: WorkflowGraph = WorkflowGraph::new(nodes, edges)?;
419        Ok(Self::new(id, graph))
420    }
421
422    /// Create a placeholder workflow with no nodes.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if the workflow identifier is invalid.
427    pub fn empty(name: impl Into<String>) -> Result<Self, IdentifierError> {
428        Ok(Self::new(WorkflowId::new(name)?, WorkflowGraph::empty()))
429    }
430
431    /// Workflow identifier.
432    #[must_use]
433    pub const fn id(&self) -> &WorkflowId {
434        &self.id
435    }
436
437    /// Validated workflow graph.
438    #[must_use]
439    pub const fn graph(&self) -> &WorkflowGraph {
440        &self.graph
441    }
442
443    /// Declared nodes in stable workflow order.
444    #[must_use]
445    pub fn nodes(&self) -> &[NodeDefinition] {
446        self.graph.nodes()
447    }
448
449    /// Declared edges in stable workflow order.
450    #[must_use]
451    pub fn edges(&self) -> &[EdgeDefinition] {
452        self.graph.edges()
453    }
454}
455
456struct GraphTopology {
457    node_ids: Vec<NodeId>,
458    inputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>>,
459    outputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>>,
460    outgoing_by_node: BTreeMap<NodeId, BTreeSet<NodeId>>,
461    indegree_by_node: BTreeMap<NodeId, usize>,
462}
463
464impl GraphTopology {
465    fn from_graph(
466        nodes: &[NodeDefinition],
467        edges: &[EdgeDefinition],
468    ) -> Result<Self, WorkflowValidationError> {
469        reject_duplicate_nodes(nodes)?;
470
471        let mut inputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>> = BTreeMap::new();
472        let mut outputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>> = BTreeMap::new();
473        let mut outgoing_by_node: BTreeMap<NodeId, BTreeSet<NodeId>> = BTreeMap::new();
474        let mut indegree_by_node: BTreeMap<NodeId, usize> = BTreeMap::new();
475        let mut node_ids: Vec<NodeId> = Vec::with_capacity(nodes.len());
476
477        for node in nodes {
478            let node_id: NodeId = node.id().clone();
479            node_ids.push(node_id.clone());
480            inputs_by_node.insert(
481                node_id.clone(),
482                node.input_ports().iter().cloned().collect(),
483            );
484            outputs_by_node.insert(
485                node_id.clone(),
486                node.output_ports().iter().cloned().collect(),
487            );
488            outgoing_by_node.insert(node_id.clone(), BTreeSet::new());
489            indegree_by_node.insert(node_id, 0);
490        }
491
492        let mut topology: Self = Self {
493            node_ids,
494            inputs_by_node,
495            outputs_by_node,
496            outgoing_by_node,
497            indegree_by_node,
498        };
499
500        for (edge_index, edge) in edges.iter().enumerate() {
501            topology.validate_endpoint(
502                edge_index,
503                EdgeEndpointRole::Source,
504                edge.source(),
505                PortDirection::Output,
506            )?;
507            topology.validate_endpoint(
508                edge_index,
509                EdgeEndpointRole::Target,
510                edge.target(),
511                PortDirection::Input,
512            )?;
513
514            let Some(outgoing): Option<&mut BTreeSet<NodeId>> =
515                topology.outgoing_by_node.get_mut(edge.source().node_id())
516            else {
517                return Err(WorkflowValidationError::UnknownNode {
518                    edge_index,
519                    endpoint: EdgeEndpointRole::Source,
520                    node_id: edge.source().node_id().clone(),
521                });
522            };
523            outgoing.insert(edge.target().node_id().clone());
524
525            let Some(indegree): Option<&mut usize> =
526                topology.indegree_by_node.get_mut(edge.target().node_id())
527            else {
528                return Err(WorkflowValidationError::UnknownNode {
529                    edge_index,
530                    endpoint: EdgeEndpointRole::Target,
531                    node_id: edge.target().node_id().clone(),
532                });
533            };
534            *indegree += 1;
535        }
536
537        Ok(topology)
538    }
539
540    fn validate_endpoint(
541        &self,
542        edge_index: usize,
543        endpoint: EdgeEndpointRole,
544        edge_endpoint: &EdgeEndpoint,
545        expected: PortDirection,
546    ) -> Result<(), WorkflowValidationError> {
547        let ports_by_node: &BTreeMap<NodeId, BTreeSet<PortId>> = match expected {
548            PortDirection::Input => &self.inputs_by_node,
549            PortDirection::Output => &self.outputs_by_node,
550        };
551
552        let ports: &BTreeSet<PortId> =
553            ports_by_node.get(edge_endpoint.node_id()).ok_or_else(|| {
554                WorkflowValidationError::UnknownNode {
555                    edge_index,
556                    endpoint,
557                    node_id: edge_endpoint.node_id().clone(),
558                }
559            })?;
560
561        if !ports.contains(edge_endpoint.port_id()) {
562            return Err(WorkflowValidationError::UnknownPort {
563                edge_index,
564                endpoint,
565                node_id: edge_endpoint.node_id().clone(),
566                port_id: edge_endpoint.port_id().clone(),
567                expected,
568            });
569        }
570
571        Ok(())
572    }
573
574    fn topological_order(&self) -> Result<Vec<NodeId>, WorkflowValidationError> {
575        let mut indegree_by_node: BTreeMap<NodeId, usize> = self.indegree_by_node.clone();
576        let mut ready: BTreeSet<NodeId> = indegree_by_node
577            .iter()
578            .filter_map(|(node_id, indegree): (&NodeId, &usize)| {
579                (*indegree == 0).then_some(node_id.clone())
580            })
581            .collect();
582        let mut order: Vec<NodeId> = Vec::with_capacity(indegree_by_node.len());
583
584        while let Some(node_id) = ready.pop_first() {
585            order.push(node_id.clone());
586
587            let Some(children): Option<&BTreeSet<NodeId>> = self.outgoing_by_node.get(&node_id)
588            else {
589                continue;
590            };
591
592            for child in children {
593                let Some(indegree): Option<&mut usize> = indegree_by_node.get_mut(child) else {
594                    continue;
595                };
596                *indegree -= 1;
597                if *indegree == 0 {
598                    ready.insert(child.clone());
599                }
600            }
601        }
602
603        if order.len() == self.node_ids.len() {
604            return Ok(order);
605        }
606
607        let remaining: BTreeSet<NodeId> = self
608            .node_ids
609            .iter()
610            .filter(|node_id: &&NodeId| !order.contains(node_id))
611            .cloned()
612            .collect();
613        let cycle: Vec<NodeId> = self.find_cycle(&remaining);
614        Err(WorkflowValidationError::CycleDetected { cycle })
615    }
616
617    fn find_cycle(&self, remaining: &BTreeSet<NodeId>) -> Vec<NodeId> {
618        #[derive(Clone, Copy, PartialEq, Eq)]
619        enum VisitState {
620            Visiting,
621            Visited,
622        }
623
624        fn dfs(
625            node_id: &NodeId,
626            topology: &GraphTopology,
627            remaining: &BTreeSet<NodeId>,
628            states: &mut BTreeMap<NodeId, VisitState>,
629            stack: &mut Vec<NodeId>,
630        ) -> Option<Vec<NodeId>> {
631            states.insert(node_id.clone(), VisitState::Visiting);
632            stack.push(node_id.clone());
633
634            let Some(children): Option<&BTreeSet<NodeId>> = topology.outgoing_by_node.get(node_id)
635            else {
636                stack.pop();
637                states.insert(node_id.clone(), VisitState::Visited);
638                return None;
639            };
640
641            for child in children {
642                if !remaining.contains(child) {
643                    continue;
644                }
645
646                match states.get(child) {
647                    Some(VisitState::Visiting) => {
648                        if let Some(cycle) = cycle_from_stack(stack, child) {
649                            return Some(cycle);
650                        }
651                    }
652                    Some(VisitState::Visited) => {}
653                    None => {
654                        if let Some(cycle) = dfs(child, topology, remaining, states, stack) {
655                            return Some(cycle);
656                        }
657                    }
658                }
659            }
660
661            stack.pop();
662            states.insert(node_id.clone(), VisitState::Visited);
663            None
664        }
665
666        fn cycle_from_stack(stack: &[NodeId], child: &NodeId) -> Option<Vec<NodeId>> {
667            let start_index: usize = stack.iter().position(|entry: &NodeId| entry == child)?;
668            let mut cycle: Vec<NodeId> = stack.iter().skip(start_index).cloned().collect();
669            cycle.push(child.clone());
670            Some(cycle)
671        }
672
673        let mut states: BTreeMap<NodeId, VisitState> = BTreeMap::new();
674        let mut stack: Vec<NodeId> = Vec::new();
675
676        for node_id in &self.node_ids {
677            if !remaining.contains(node_id) || states.contains_key(node_id) {
678                continue;
679            }
680
681            if let Some(cycle) = dfs(node_id, self, remaining, &mut states, &mut stack) {
682                return cycle;
683            }
684        }
685
686        remaining.iter().cloned().collect()
687    }
688}
689
690fn reject_duplicate_nodes(nodes: &[NodeDefinition]) -> Result<(), WorkflowValidationError> {
691    let mut seen: BTreeSet<NodeId> = BTreeSet::new();
692
693    for node in nodes {
694        if !seen.insert(node.id().clone()) {
695            return Err(WorkflowValidationError::DuplicateNode {
696                node_id: node.id().clone(),
697            });
698        }
699    }
700
701    Ok(())
702}
703
704fn reject_duplicate_ports(
705    node_id: &NodeId,
706    input_ports: &[PortId],
707    output_ports: &[PortId],
708) -> Result<(), WorkflowValidationError> {
709    let mut seen: BTreeSet<PortId> = BTreeSet::new();
710
711    for port_id in input_ports.iter().chain(output_ports) {
712        if !seen.insert(port_id.clone()) {
713            return Err(WorkflowValidationError::DuplicatePort {
714                node_id: node_id.clone(),
715                port_id: port_id.clone(),
716            });
717        }
718    }
719
720    Ok(())
721}
722
723#[cfg(test)]
724mod tests {
725    use super::*;
726    use pureflow_types::IdentifierKind;
727    use proptest::{collection::hash_set, prelude::*};
728    use quickcheck::{Arbitrary as QuickArbitrary, Gen, QuickCheck};
729    use std::num::NonZeroUsize;
730    use std::panic::{self, AssertUnwindSafe};
731
732    fn valid_identifier_strategy() -> impl Strategy<Value = String> {
733        prop::collection::vec(
734            any::<char>().prop_filter(
735                "identifier characters must not be whitespace or control",
736                |ch| !ch.is_whitespace() && !ch.is_control(),
737            ),
738            1..16,
739        )
740        .prop_map(|chars: Vec<char>| chars.into_iter().collect())
741    }
742
743    fn workflow_id(value: &str) -> WorkflowId {
744        WorkflowId::new(value).expect("valid workflow id")
745    }
746
747    fn node_id(value: &str) -> NodeId {
748        NodeId::new(value).expect("valid node id")
749    }
750
751    fn port_id(value: &str) -> PortId {
752        PortId::new(value).expect("valid port id")
753    }
754
755    fn endpoint(node: &str, port: &str) -> EdgeEndpoint {
756        EdgeEndpoint::new(node_id(node), port_id(port))
757    }
758
759    #[derive(Debug, Clone)]
760    struct GeneratedValidGraph {
761        nodes: Vec<NodeDefinition>,
762        edges: Vec<EdgeDefinition>,
763    }
764
765    impl QuickArbitrary for GeneratedValidGraph {
766        fn arbitrary(g: &mut Gen) -> Self {
767            let node_count = generated_count(g, 1, 6);
768            let nodes: Vec<NodeDefinition> = (0..node_count)
769                .map(|index| generated_routable_node(index))
770                .collect();
771            let mut edges = Vec::new();
772
773            for source in 0..node_count {
774                for target in (source + 1)..node_count {
775                    if generated_bool(g) {
776                        edges.push(generated_edge(source, target));
777                    }
778                }
779            }
780
781            Self { nodes, edges }
782        }
783    }
784
785    #[derive(Debug, Clone)]
786    struct SmallNodeCount(usize);
787
788    impl QuickArbitrary for SmallNodeCount {
789        fn arbitrary(g: &mut Gen) -> Self {
790            Self(generated_count(g, 1, 6))
791        }
792    }
793
794    #[derive(Debug, Clone)]
795    struct GeneratedValidationCase {
796        scenario: ValidationScenario,
797    }
798
799    #[derive(Debug, Clone)]
800    enum ValidationScenario {
801        DuplicatePort {
802            node_id: NodeId,
803            port_id: PortId,
804        },
805        Graph {
806            nodes: Vec<NodeDefinition>,
807            edges: Vec<EdgeDefinition>,
808            expected: ExpectedGraphResult,
809        },
810    }
811
812    #[derive(Debug, Clone, Copy)]
813    enum ExpectedGraphResult {
814        Ok,
815        DuplicateNode,
816        UnknownNode(EdgeEndpointRole),
817        UnknownPort(EdgeEndpointRole, PortDirection),
818        CycleDetected,
819    }
820
821    impl QuickArbitrary for GeneratedValidationCase {
822        fn arbitrary(g: &mut Gen) -> Self {
823            let scenario = match generated_u8(g) % 8 {
824                0 => {
825                    let graph = GeneratedValidGraph::arbitrary(g);
826                    ValidationScenario::Graph {
827                        nodes: graph.nodes,
828                        edges: graph.edges,
829                        expected: ExpectedGraphResult::Ok,
830                    }
831                }
832                1 => ValidationScenario::Graph {
833                    nodes: vec![generated_empty_node(0), generated_empty_node(0)],
834                    edges: Vec::new(),
835                    expected: ExpectedGraphResult::DuplicateNode,
836                },
837                2 => ValidationScenario::DuplicatePort {
838                    node_id: generated_node_id(0),
839                    port_id: port_id("dup"),
840                },
841                3 => ValidationScenario::Graph {
842                    nodes: vec![generated_sink_node(0)],
843                    edges: vec![EdgeDefinition::new(
844                        EdgeEndpoint::new(node_id("missing_source"), port_id("out")),
845                        EdgeEndpoint::new(generated_node_id(0), port_id("in")),
846                    )],
847                    expected: ExpectedGraphResult::UnknownNode(EdgeEndpointRole::Source),
848                },
849                4 => ValidationScenario::Graph {
850                    nodes: vec![generated_source_node(0)],
851                    edges: vec![EdgeDefinition::new(
852                        EdgeEndpoint::new(generated_node_id(0), port_id("out")),
853                        EdgeEndpoint::new(node_id("missing_target"), port_id("in")),
854                    )],
855                    expected: ExpectedGraphResult::UnknownNode(EdgeEndpointRole::Target),
856                },
857                5 => ValidationScenario::Graph {
858                    nodes: vec![generated_sink_node(0), generated_sink_node(1)],
859                    edges: vec![EdgeDefinition::new(
860                        EdgeEndpoint::new(generated_node_id(0), port_id("in")),
861                        EdgeEndpoint::new(generated_node_id(1), port_id("in")),
862                    )],
863                    expected: ExpectedGraphResult::UnknownPort(
864                        EdgeEndpointRole::Source,
865                        PortDirection::Output,
866                    ),
867                },
868                6 => ValidationScenario::Graph {
869                    nodes: vec![generated_source_node(0), generated_source_node(1)],
870                    edges: vec![EdgeDefinition::new(
871                        EdgeEndpoint::new(generated_node_id(0), port_id("out")),
872                        EdgeEndpoint::new(generated_node_id(1), port_id("out")),
873                    )],
874                    expected: ExpectedGraphResult::UnknownPort(
875                        EdgeEndpointRole::Target,
876                        PortDirection::Input,
877                    ),
878                },
879                _ => {
880                    let (nodes, edges) = generated_cycle_graph(g);
881                    ValidationScenario::Graph {
882                        nodes,
883                        edges,
884                        expected: ExpectedGraphResult::CycleDetected,
885                    }
886                }
887            };
888
889            Self { scenario }
890        }
891    }
892
893    fn generated_count(g: &mut Gen, min: usize, max_exclusive: usize) -> usize {
894        min + (generated_usize(g) % (max_exclusive - min))
895    }
896
897    fn generated_bool(g: &mut Gen) -> bool {
898        <bool as QuickArbitrary>::arbitrary(g)
899    }
900
901    fn generated_u8(g: &mut Gen) -> u8 {
902        <u8 as QuickArbitrary>::arbitrary(g)
903    }
904
905    fn generated_usize(g: &mut Gen) -> usize {
906        <usize as QuickArbitrary>::arbitrary(g)
907    }
908
909    fn generated_node_id(index: usize) -> NodeId {
910        node_id(&format!("node_{index}"))
911    }
912
913    fn generated_routable_node(index: usize) -> NodeDefinition {
914        NodeDefinition::new(generated_node_id(index), [port_id("in")], [port_id("out")])
915            .expect("generated routable node is valid")
916    }
917
918    fn generated_source_node(index: usize) -> NodeDefinition {
919        NodeDefinition::new(
920            generated_node_id(index),
921            Vec::<PortId>::new(),
922            [port_id("out")],
923        )
924        .expect("generated source node is valid")
925    }
926
927    fn generated_sink_node(index: usize) -> NodeDefinition {
928        NodeDefinition::new(
929            generated_node_id(index),
930            [port_id("in")],
931            Vec::<PortId>::new(),
932        )
933        .expect("generated sink node is valid")
934    }
935
936    fn generated_empty_node(index: usize) -> NodeDefinition {
937        NodeDefinition::new(
938            generated_node_id(index),
939            Vec::<PortId>::new(),
940            Vec::<PortId>::new(),
941        )
942        .expect("generated empty node is valid")
943    }
944
945    fn generated_edge(source: usize, target: usize) -> EdgeDefinition {
946        EdgeDefinition::new(
947            EdgeEndpoint::new(generated_node_id(source), port_id("out")),
948            EdgeEndpoint::new(generated_node_id(target), port_id("in")),
949        )
950    }
951
952    fn generated_cycle_graph(g: &mut Gen) -> (Vec<NodeDefinition>, Vec<EdgeDefinition>) {
953        let node_count = generated_count(g, 2, 7);
954        let nodes = (0..node_count).map(generated_routable_node).collect();
955        let edges = (0..node_count)
956            .map(|source| generated_edge(source, (source + 1) % node_count))
957            .collect();
958
959        (nodes, edges)
960    }
961
962    fn generated_fan_out_graph(target_count: usize) -> (Vec<NodeDefinition>, Vec<EdgeDefinition>) {
963        let mut nodes = vec![generated_source_node(0)];
964        let mut edges = Vec::new();
965
966        for target in 1..=target_count {
967            nodes.push(generated_sink_node(target));
968            edges.push(generated_edge(0, target));
969        }
970
971        (nodes, edges)
972    }
973
974    fn generated_fan_in_graph(source_count: usize) -> (Vec<NodeDefinition>, Vec<EdgeDefinition>) {
975        let sink_index = source_count;
976        let mut nodes = Vec::new();
977        let mut edges = Vec::new();
978
979        for source in 0..source_count {
980            nodes.push(generated_source_node(source));
981            edges.push(generated_edge(source, sink_index));
982        }
983
984        nodes.push(generated_sink_node(sink_index));
985        (nodes, edges)
986    }
987
988    fn validate_generated_case(case: &GeneratedValidationCase) -> bool {
989        match &case.scenario {
990            ValidationScenario::DuplicatePort { node_id, port_id } => matches!(
991                NodeDefinition::new(node_id.clone(), [port_id.clone()], [port_id.clone()]),
992                Err(WorkflowValidationError::DuplicatePort { .. })
993            ),
994            ValidationScenario::Graph {
995                nodes,
996                edges,
997                expected,
998            } => graph_result_matches(WorkflowGraph::new(nodes.clone(), edges.clone()), *expected),
999        }
1000    }
1001
1002    fn graph_result_matches(
1003        result: Result<WorkflowGraph, WorkflowValidationError>,
1004        expected: ExpectedGraphResult,
1005    ) -> bool {
1006        match (result, expected) {
1007            (Ok(_), ExpectedGraphResult::Ok) => true,
1008            (
1009                Err(WorkflowValidationError::DuplicateNode { .. }),
1010                ExpectedGraphResult::DuplicateNode,
1011            ) => true,
1012            (
1013                Err(WorkflowValidationError::UnknownNode { endpoint, .. }),
1014                ExpectedGraphResult::UnknownNode(expected_endpoint),
1015            ) => endpoint == expected_endpoint,
1016            (
1017                Err(WorkflowValidationError::UnknownPort {
1018                    endpoint, expected, ..
1019                }),
1020                ExpectedGraphResult::UnknownPort(expected_endpoint, expected_direction),
1021            ) => endpoint == expected_endpoint && expected == expected_direction,
1022            (
1023                Err(WorkflowValidationError::CycleDetected { cycle }),
1024                ExpectedGraphResult::CycleDetected,
1025            ) => !cycle.is_empty(),
1026            _ => false,
1027        }
1028    }
1029
1030    #[test]
1031    fn empty_workflow_uses_valid_identifier() {
1032        let workflow = WorkflowDefinition::empty("pureflow-scaffold").expect("valid id");
1033
1034        assert_eq!(workflow.id().as_str(), "pureflow-scaffold");
1035        assert!(workflow.nodes().is_empty());
1036        assert!(workflow.edges().is_empty());
1037    }
1038
1039    #[test]
1040    fn empty_workflow_rejects_invalid_identifier() {
1041        let err = WorkflowDefinition::empty("bad workflow").expect_err("whitespace must fail");
1042        assert_eq!(
1043            err,
1044            IdentifierError::Whitespace {
1045                kind: IdentifierKind::Workflow
1046            }
1047        );
1048    }
1049
1050    #[test]
1051    fn valid_workflow_represents_nodes_ports_and_edges() {
1052        let producer = NodeDefinition::new(
1053            node_id("producer"),
1054            Vec::<PortId>::new(),
1055            [port_id("records")],
1056        )
1057        .expect("valid producer");
1058        let consumer = NodeDefinition::new(
1059            node_id("consumer"),
1060            [port_id("records")],
1061            Vec::<PortId>::new(),
1062        )
1063        .expect("valid consumer");
1064        let edge = EdgeDefinition::new(
1065            endpoint("producer", "records"),
1066            endpoint("consumer", "records"),
1067        );
1068
1069        let workflow =
1070            WorkflowDefinition::from_parts(workflow_id("ingest"), [producer, consumer], [edge])
1071                .expect("valid graph");
1072
1073        assert_eq!(workflow.id().as_str(), "ingest");
1074        assert_eq!(workflow.nodes().len(), 2);
1075        assert_eq!(workflow.edges().len(), 1);
1076    }
1077
1078    #[test]
1079    fn edge_capacity_defaults_to_engine_default_policy() {
1080        let edge = EdgeDefinition::new(endpoint("producer", "records"), endpoint("consumer", "in"));
1081
1082        assert_eq!(edge.capacity(), EdgeCapacity::Default);
1083        assert_eq!(
1084            edge.capacity()
1085                .resolve(NonZeroUsize::new(7).expect("nonzero")),
1086            NonZeroUsize::new(7).expect("nonzero")
1087        );
1088    }
1089
1090    #[test]
1091    fn edge_capacity_round_trips_explicit_value() {
1092        let capacity: NonZeroUsize = NonZeroUsize::new(3).expect("nonzero");
1093        let edge = EdgeDefinition::with_capacity(
1094            endpoint("producer", "records"),
1095            endpoint("consumer", "in"),
1096            capacity,
1097        );
1098
1099        assert_eq!(edge.capacity(), EdgeCapacity::Explicit(capacity));
1100        assert_eq!(
1101            edge.capacity()
1102                .resolve(NonZeroUsize::new(7).expect("nonzero")),
1103            capacity
1104        );
1105    }
1106
1107    #[test]
1108    fn topological_order_returns_sources_before_sinks() {
1109        let producer =
1110            NodeDefinition::new(node_id("producer"), Vec::<PortId>::new(), [port_id("out")])
1111                .expect("valid producer");
1112        let consumer =
1113            NodeDefinition::new(node_id("consumer"), [port_id("in")], Vec::<PortId>::new())
1114                .expect("valid consumer");
1115        let edge = EdgeDefinition::new(endpoint("producer", "out"), endpoint("consumer", "in"));
1116        let graph = WorkflowGraph::new([producer, consumer], [edge]).expect("valid graph");
1117
1118        assert_eq!(
1119            graph
1120                .topological_order()
1121                .expect("acyclic graph should order"),
1122            vec![node_id("producer"), node_id("consumer")]
1123        );
1124    }
1125
1126    #[test]
1127    fn workflow_graph_rejects_cycles_by_default() {
1128        let first = NodeDefinition::new(node_id("first"), [port_id("in")], [port_id("out")])
1129            .expect("valid first node");
1130        let second = NodeDefinition::new(node_id("second"), [port_id("in")], [port_id("out")])
1131            .expect("valid second node");
1132        let edges = [
1133            EdgeDefinition::new(endpoint("first", "out"), endpoint("second", "in")),
1134            EdgeDefinition::new(endpoint("second", "out"), endpoint("first", "in")),
1135        ];
1136
1137        let err = WorkflowGraph::new([first, second], edges).expect_err("cycle must fail");
1138
1139        assert!(
1140            matches!(err, WorkflowValidationError::CycleDetected { cycle } if cycle.contains(&node_id("first")) && cycle.contains(&node_id("second")))
1141        );
1142    }
1143
1144    #[test]
1145    fn workflow_graph_with_cycles_allowed_keeps_ordering_diagnostics_available() {
1146        let first = NodeDefinition::new(node_id("first"), [port_id("in")], [port_id("out")])
1147            .expect("valid first node");
1148        let second = NodeDefinition::new(node_id("second"), [port_id("in")], [port_id("out")])
1149            .expect("valid second node");
1150        let edges = [
1151            EdgeDefinition::new(endpoint("first", "out"), endpoint("second", "in")),
1152            EdgeDefinition::new(endpoint("second", "out"), endpoint("first", "in")),
1153        ];
1154
1155        let graph = WorkflowGraph::with_cycles_allowed([first, second], edges)
1156            .expect("cycle-allowed graph should build");
1157
1158        let err = graph
1159            .topological_order()
1160            .expect_err("cycle should still be reported by ordering");
1161        assert!(matches!(err, WorkflowValidationError::CycleDetected { .. }));
1162    }
1163
1164    #[test]
1165    fn duplicate_nodes_are_rejected() {
1166        let first =
1167            NodeDefinition::new(node_id("step"), Vec::<PortId>::new(), Vec::<PortId>::new())
1168                .expect("valid node");
1169        let second =
1170            NodeDefinition::new(node_id("step"), Vec::<PortId>::new(), Vec::<PortId>::new())
1171                .expect("valid node");
1172
1173        let err = WorkflowGraph::new([first, second], Vec::<EdgeDefinition>::new())
1174            .expect_err("duplicate nodes must fail");
1175
1176        assert_eq!(
1177            err,
1178            WorkflowValidationError::DuplicateNode {
1179                node_id: node_id("step")
1180            }
1181        );
1182    }
1183
1184    #[test]
1185    fn duplicate_ports_on_one_node_are_rejected() {
1186        let err = NodeDefinition::new(node_id("step"), [port_id("value")], [port_id("value")])
1187            .expect_err("duplicate ports must fail");
1188
1189        assert_eq!(
1190            err,
1191            WorkflowValidationError::DuplicatePort {
1192                node_id: node_id("step"),
1193                port_id: port_id("value")
1194            }
1195        );
1196    }
1197
1198    #[test]
1199    fn edge_source_must_reference_existing_node() {
1200        let consumer = NodeDefinition::new(
1201            node_id("consumer"),
1202            [port_id("records")],
1203            Vec::<PortId>::new(),
1204        )
1205        .expect("valid consumer");
1206        let edge = EdgeDefinition::new(
1207            endpoint("missing", "records"),
1208            endpoint("consumer", "records"),
1209        );
1210
1211        let err = WorkflowGraph::new([consumer], [edge]).expect_err("missing source must fail");
1212
1213        assert_eq!(
1214            err,
1215            WorkflowValidationError::UnknownNode {
1216                edge_index: 0,
1217                endpoint: EdgeEndpointRole::Source,
1218                node_id: node_id("missing")
1219            }
1220        );
1221    }
1222
1223    #[test]
1224    fn edge_source_must_reference_output_port() {
1225        let producer = NodeDefinition::new(
1226            node_id("producer"),
1227            [port_id("records")],
1228            Vec::<PortId>::new(),
1229        )
1230        .expect("valid producer");
1231        let consumer = NodeDefinition::new(
1232            node_id("consumer"),
1233            [port_id("records")],
1234            Vec::<PortId>::new(),
1235        )
1236        .expect("valid consumer");
1237        let edge = EdgeDefinition::new(
1238            endpoint("producer", "records"),
1239            endpoint("consumer", "records"),
1240        );
1241
1242        let err = WorkflowGraph::new([producer, consumer], [edge])
1243            .expect_err("input source port must fail");
1244
1245        assert_eq!(
1246            err,
1247            WorkflowValidationError::UnknownPort {
1248                edge_index: 0,
1249                endpoint: EdgeEndpointRole::Source,
1250                node_id: node_id("producer"),
1251                port_id: port_id("records"),
1252                expected: PortDirection::Output
1253            }
1254        );
1255    }
1256
1257    #[test]
1258    fn edge_target_must_reference_input_port() {
1259        let producer = NodeDefinition::new(
1260            node_id("producer"),
1261            Vec::<PortId>::new(),
1262            [port_id("records")],
1263        )
1264        .expect("valid producer");
1265        let consumer = NodeDefinition::new(
1266            node_id("consumer"),
1267            Vec::<PortId>::new(),
1268            [port_id("records")],
1269        )
1270        .expect("valid consumer");
1271        let edge = EdgeDefinition::new(
1272            endpoint("producer", "records"),
1273            endpoint("consumer", "records"),
1274        );
1275
1276        let err = WorkflowGraph::new([producer, consumer], [edge])
1277            .expect_err("output target port must fail");
1278
1279        assert_eq!(
1280            err,
1281            WorkflowValidationError::UnknownPort {
1282                edge_index: 0,
1283                endpoint: EdgeEndpointRole::Target,
1284                node_id: node_id("consumer"),
1285                port_id: port_id("records"),
1286                expected: PortDirection::Input
1287            }
1288        );
1289    }
1290
1291    #[test]
1292    fn generated_acyclic_graphs_with_disconnected_nodes_validate() {
1293        fn property(graph: GeneratedValidGraph) -> bool {
1294            WorkflowGraph::new(graph.nodes, graph.edges).is_ok()
1295        }
1296
1297        QuickCheck::new()
1298            .tests(128)
1299            .quickcheck(property as fn(GeneratedValidGraph) -> bool);
1300    }
1301
1302    #[test]
1303    fn generated_validation_cases_return_consistent_error_variants_without_panicking() {
1304        fn property(case: GeneratedValidationCase) -> bool {
1305            panic::catch_unwind(AssertUnwindSafe(|| validate_generated_case(&case)))
1306                .unwrap_or(false)
1307        }
1308
1309        QuickCheck::new()
1310            .tests(128)
1311            .quickcheck(property as fn(GeneratedValidationCase) -> bool);
1312    }
1313
1314    #[test]
1315    fn generated_fan_out_topologies_validate() {
1316        fn property(count: SmallNodeCount) -> bool {
1317            let (nodes, edges) = generated_fan_out_graph(count.0);
1318
1319            WorkflowGraph::new(nodes, edges).is_ok()
1320        }
1321
1322        QuickCheck::new()
1323            .tests(128)
1324            .quickcheck(property as fn(SmallNodeCount) -> bool);
1325    }
1326
1327    #[test]
1328    fn generated_fan_in_topologies_validate() {
1329        fn property(count: SmallNodeCount) -> bool {
1330            let (nodes, edges) = generated_fan_in_graph(count.0);
1331
1332            WorkflowGraph::new(nodes, edges).is_ok()
1333        }
1334
1335        QuickCheck::new()
1336            .tests(128)
1337            .quickcheck(property as fn(SmallNodeCount) -> bool);
1338    }
1339
1340    fn build_linear_workflow(node_names: &[String]) -> WorkflowDefinition {
1341        let mut nodes: Vec<NodeDefinition> = Vec::new();
1342        let mut edges: Vec<EdgeDefinition> = Vec::new();
1343
1344        for (index, node_name) in node_names.iter().enumerate() {
1345            let mut input_ports: Vec<PortId> = Vec::new();
1346            let mut output_ports: Vec<PortId> = Vec::new();
1347
1348            if index > 0 {
1349                input_ports.push(port_id("in"));
1350            }
1351
1352            if index + 1 < node_names.len() {
1353                output_ports.push(port_id("out"));
1354            }
1355
1356            nodes.push(
1357                NodeDefinition::new(node_id(node_name), input_ports, output_ports)
1358                    .expect("linear workflow nodes must be valid"),
1359            );
1360        }
1361
1362        for edge in node_names.windows(2) {
1363            edges.push(EdgeDefinition::new(
1364                endpoint(&edge[0], "out"),
1365                endpoint(&edge[1], "in"),
1366            ));
1367        }
1368
1369        WorkflowDefinition::from_parts(workflow_id("flow"), nodes, edges)
1370            .expect("linear workflow must be valid")
1371    }
1372
1373    proptest! {
1374        #[test]
1375        fn linear_workflows_with_unique_valid_node_ids_validate(
1376            node_names in hash_set(valid_identifier_strategy(), 1..6)
1377        ) {
1378            let mut node_names: Vec<String> = node_names.into_iter().collect();
1379            node_names.sort();
1380
1381            let workflow: WorkflowDefinition = build_linear_workflow(&node_names);
1382
1383            prop_assert_eq!(workflow.nodes().len(), node_names.len());
1384            prop_assert_eq!(workflow.edges().len(), node_names.len().saturating_sub(1));
1385        }
1386    }
1387}