1use std::collections::{BTreeMap, BTreeSet};
27use std::error::Error;
28use std::fmt;
29use std::num::NonZeroUsize;
30
31use pureflow_types::{IdentifierError, NodeId, PortId, WorkflowId};
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum PortDirection {
36 Input,
38 Output,
40}
41
42impl PortDirection {
43 const fn label(self) -> &'static str {
44 match self {
45 Self::Input => "input",
46 Self::Output => "output",
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum EdgeEndpointRole {
54 Source,
56 Target,
58}
59
60impl EdgeEndpointRole {
61 const fn label(self) -> &'static str {
62 match self {
63 Self::Source => "source",
64 Self::Target => "target",
65 }
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub enum WorkflowValidationError {
72 DuplicateNode {
74 node_id: NodeId,
76 },
77 DuplicatePort {
79 node_id: NodeId,
81 port_id: PortId,
83 },
84 UnknownNode {
86 edge_index: usize,
88 endpoint: EdgeEndpointRole,
90 node_id: NodeId,
92 },
93 UnknownPort {
95 edge_index: usize,
97 endpoint: EdgeEndpointRole,
99 node_id: NodeId,
101 port_id: PortId,
103 expected: PortDirection,
105 },
106 CycleDetected {
108 cycle: Vec<NodeId>,
110 },
111}
112
113impl fmt::Display for WorkflowValidationError {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 match self {
116 Self::DuplicateNode { node_id } => {
117 write!(f, "workflow graph contains duplicate node `{node_id}`")
118 }
119 Self::DuplicatePort { node_id, port_id } => {
120 write!(f, "node `{node_id}` contains duplicate port `{port_id}`")
121 }
122 Self::UnknownNode {
123 edge_index,
124 endpoint,
125 node_id,
126 } => write!(
127 f,
128 "edge {edge_index} {} references unknown node `{node_id}`",
129 endpoint.label()
130 ),
131 Self::UnknownPort {
132 edge_index,
133 endpoint,
134 node_id,
135 port_id,
136 expected,
137 } => write!(
138 f,
139 "edge {edge_index} {} references unknown {} port `{port_id}` on node `{node_id}`",
140 endpoint.label(),
141 expected.label()
142 ),
143 Self::CycleDetected { cycle } => {
144 write!(f, "workflow graph contains a cycle involving")?;
145 for node_id in cycle {
146 write!(f, " `{node_id}`")?;
147 }
148 Ok(())
149 }
150 }
151 }
152}
153
154impl Error for WorkflowValidationError {}
155
156#[derive(Debug, Clone, PartialEq, Eq)]
158pub struct EdgeEndpoint {
159 node_id: NodeId,
160 port_id: PortId,
161}
162
163impl EdgeEndpoint {
164 #[must_use]
166 pub const fn new(node_id: NodeId, port_id: PortId) -> Self {
167 Self { node_id, port_id }
168 }
169
170 #[must_use]
172 pub const fn node_id(&self) -> &NodeId {
173 &self.node_id
174 }
175
176 #[must_use]
178 pub const fn port_id(&self) -> &PortId {
179 &self.port_id
180 }
181}
182
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
185pub enum EdgeCapacity {
186 Default,
188 Explicit(NonZeroUsize),
190}
191
192impl EdgeCapacity {
193 #[must_use]
195 pub const fn resolve(self, default: NonZeroUsize) -> NonZeroUsize {
196 match self {
197 Self::Default => default,
198 Self::Explicit(capacity) => capacity,
199 }
200 }
201}
202
203#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct EdgeDefinition {
206 source: EdgeEndpoint,
207 target: EdgeEndpoint,
208 capacity: EdgeCapacity,
209}
210
211impl EdgeDefinition {
212 #[must_use]
214 pub const fn new(source: EdgeEndpoint, target: EdgeEndpoint) -> Self {
215 Self {
216 source,
217 target,
218 capacity: EdgeCapacity::Default,
219 }
220 }
221
222 #[must_use]
224 pub const fn with_capacity(
225 source: EdgeEndpoint,
226 target: EdgeEndpoint,
227 capacity: NonZeroUsize,
228 ) -> Self {
229 Self {
230 source,
231 target,
232 capacity: EdgeCapacity::Explicit(capacity),
233 }
234 }
235
236 #[must_use]
238 pub const fn source(&self) -> &EdgeEndpoint {
239 &self.source
240 }
241
242 #[must_use]
244 pub const fn target(&self) -> &EdgeEndpoint {
245 &self.target
246 }
247
248 #[must_use]
250 pub const fn capacity(&self) -> EdgeCapacity {
251 self.capacity
252 }
253}
254
255#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct NodeDefinition {
258 id: NodeId,
259 input_ports: Vec<PortId>,
260 output_ports: Vec<PortId>,
261}
262
263impl NodeDefinition {
264 pub fn new(
271 id: NodeId,
272 input_ports: impl Into<Vec<PortId>>,
273 output_ports: impl Into<Vec<PortId>>,
274 ) -> Result<Self, WorkflowValidationError> {
275 let input_ports: Vec<PortId> = input_ports.into();
276 let output_ports: Vec<PortId> = output_ports.into();
277 reject_duplicate_ports(&id, &input_ports, &output_ports)?;
278
279 Ok(Self {
280 id,
281 input_ports,
282 output_ports,
283 })
284 }
285
286 #[must_use]
288 pub const fn id(&self) -> &NodeId {
289 &self.id
290 }
291
292 #[must_use]
294 pub fn input_ports(&self) -> &[PortId] {
295 &self.input_ports
296 }
297
298 #[must_use]
300 pub fn output_ports(&self) -> &[PortId] {
301 &self.output_ports
302 }
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
307pub struct WorkflowGraph {
308 nodes: Vec<NodeDefinition>,
309 edges: Vec<EdgeDefinition>,
310}
311
312impl WorkflowGraph {
313 pub fn new(
321 nodes: impl Into<Vec<NodeDefinition>>,
322 edges: impl Into<Vec<EdgeDefinition>>,
323 ) -> Result<Self, WorkflowValidationError> {
324 Self::build(nodes, edges, false)
325 }
326
327 pub fn with_cycles_allowed(
334 nodes: impl Into<Vec<NodeDefinition>>,
335 edges: impl Into<Vec<EdgeDefinition>>,
336 ) -> Result<Self, WorkflowValidationError> {
337 Self::build(nodes, edges, true)
338 }
339
340 #[must_use]
342 pub const fn empty() -> Self {
343 Self {
344 nodes: Vec::new(),
345 edges: Vec::new(),
346 }
347 }
348
349 #[must_use]
351 pub fn nodes(&self) -> &[NodeDefinition] {
352 &self.nodes
353 }
354
355 #[must_use]
357 pub fn edges(&self) -> &[EdgeDefinition] {
358 &self.edges
359 }
360
361 pub fn topological_order(&self) -> Result<Vec<NodeId>, WorkflowValidationError> {
368 let topology: GraphTopology = GraphTopology::from_graph(&self.nodes, &self.edges)?;
369 topology.topological_order()
370 }
371
372 fn build(
373 nodes: impl Into<Vec<NodeDefinition>>,
374 edges: impl Into<Vec<EdgeDefinition>>,
375 allow_cycles: bool,
376 ) -> Result<Self, WorkflowValidationError> {
377 let graph: Self = Self {
378 nodes: nodes.into(),
379 edges: edges.into(),
380 };
381 graph.validate(allow_cycles)?;
382 Ok(graph)
383 }
384
385 fn validate(&self, allow_cycles: bool) -> Result<(), WorkflowValidationError> {
386 let topology: GraphTopology = GraphTopology::from_graph(&self.nodes, &self.edges)?;
387 if !allow_cycles {
388 topology.topological_order()?;
389 }
390 Ok(())
391 }
392}
393
394#[derive(Debug, Clone, PartialEq, Eq)]
396pub struct WorkflowDefinition {
397 id: WorkflowId,
398 graph: WorkflowGraph,
399}
400
401impl WorkflowDefinition {
402 #[must_use]
404 pub const fn new(id: WorkflowId, graph: WorkflowGraph) -> Self {
405 Self { id, graph }
406 }
407
408 pub fn from_parts(
414 id: WorkflowId,
415 nodes: impl Into<Vec<NodeDefinition>>,
416 edges: impl Into<Vec<EdgeDefinition>>,
417 ) -> Result<Self, WorkflowValidationError> {
418 let graph: WorkflowGraph = WorkflowGraph::new(nodes, edges)?;
419 Ok(Self::new(id, graph))
420 }
421
422 pub fn empty(name: impl Into<String>) -> Result<Self, IdentifierError> {
428 Ok(Self::new(WorkflowId::new(name)?, WorkflowGraph::empty()))
429 }
430
431 #[must_use]
433 pub const fn id(&self) -> &WorkflowId {
434 &self.id
435 }
436
437 #[must_use]
439 pub const fn graph(&self) -> &WorkflowGraph {
440 &self.graph
441 }
442
443 #[must_use]
445 pub fn nodes(&self) -> &[NodeDefinition] {
446 self.graph.nodes()
447 }
448
449 #[must_use]
451 pub fn edges(&self) -> &[EdgeDefinition] {
452 self.graph.edges()
453 }
454}
455
456struct GraphTopology {
457 node_ids: Vec<NodeId>,
458 inputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>>,
459 outputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>>,
460 outgoing_by_node: BTreeMap<NodeId, BTreeSet<NodeId>>,
461 indegree_by_node: BTreeMap<NodeId, usize>,
462}
463
464impl GraphTopology {
465 fn from_graph(
466 nodes: &[NodeDefinition],
467 edges: &[EdgeDefinition],
468 ) -> Result<Self, WorkflowValidationError> {
469 reject_duplicate_nodes(nodes)?;
470
471 let mut inputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>> = BTreeMap::new();
472 let mut outputs_by_node: BTreeMap<NodeId, BTreeSet<PortId>> = BTreeMap::new();
473 let mut outgoing_by_node: BTreeMap<NodeId, BTreeSet<NodeId>> = BTreeMap::new();
474 let mut indegree_by_node: BTreeMap<NodeId, usize> = BTreeMap::new();
475 let mut node_ids: Vec<NodeId> = Vec::with_capacity(nodes.len());
476
477 for node in nodes {
478 let node_id: NodeId = node.id().clone();
479 node_ids.push(node_id.clone());
480 inputs_by_node.insert(
481 node_id.clone(),
482 node.input_ports().iter().cloned().collect(),
483 );
484 outputs_by_node.insert(
485 node_id.clone(),
486 node.output_ports().iter().cloned().collect(),
487 );
488 outgoing_by_node.insert(node_id.clone(), BTreeSet::new());
489 indegree_by_node.insert(node_id, 0);
490 }
491
492 let mut topology: Self = Self {
493 node_ids,
494 inputs_by_node,
495 outputs_by_node,
496 outgoing_by_node,
497 indegree_by_node,
498 };
499
500 for (edge_index, edge) in edges.iter().enumerate() {
501 topology.validate_endpoint(
502 edge_index,
503 EdgeEndpointRole::Source,
504 edge.source(),
505 PortDirection::Output,
506 )?;
507 topology.validate_endpoint(
508 edge_index,
509 EdgeEndpointRole::Target,
510 edge.target(),
511 PortDirection::Input,
512 )?;
513
514 let Some(outgoing): Option<&mut BTreeSet<NodeId>> =
515 topology.outgoing_by_node.get_mut(edge.source().node_id())
516 else {
517 return Err(WorkflowValidationError::UnknownNode {
518 edge_index,
519 endpoint: EdgeEndpointRole::Source,
520 node_id: edge.source().node_id().clone(),
521 });
522 };
523 outgoing.insert(edge.target().node_id().clone());
524
525 let Some(indegree): Option<&mut usize> =
526 topology.indegree_by_node.get_mut(edge.target().node_id())
527 else {
528 return Err(WorkflowValidationError::UnknownNode {
529 edge_index,
530 endpoint: EdgeEndpointRole::Target,
531 node_id: edge.target().node_id().clone(),
532 });
533 };
534 *indegree += 1;
535 }
536
537 Ok(topology)
538 }
539
540 fn validate_endpoint(
541 &self,
542 edge_index: usize,
543 endpoint: EdgeEndpointRole,
544 edge_endpoint: &EdgeEndpoint,
545 expected: PortDirection,
546 ) -> Result<(), WorkflowValidationError> {
547 let ports_by_node: &BTreeMap<NodeId, BTreeSet<PortId>> = match expected {
548 PortDirection::Input => &self.inputs_by_node,
549 PortDirection::Output => &self.outputs_by_node,
550 };
551
552 let ports: &BTreeSet<PortId> =
553 ports_by_node.get(edge_endpoint.node_id()).ok_or_else(|| {
554 WorkflowValidationError::UnknownNode {
555 edge_index,
556 endpoint,
557 node_id: edge_endpoint.node_id().clone(),
558 }
559 })?;
560
561 if !ports.contains(edge_endpoint.port_id()) {
562 return Err(WorkflowValidationError::UnknownPort {
563 edge_index,
564 endpoint,
565 node_id: edge_endpoint.node_id().clone(),
566 port_id: edge_endpoint.port_id().clone(),
567 expected,
568 });
569 }
570
571 Ok(())
572 }
573
574 fn topological_order(&self) -> Result<Vec<NodeId>, WorkflowValidationError> {
575 let mut indegree_by_node: BTreeMap<NodeId, usize> = self.indegree_by_node.clone();
576 let mut ready: BTreeSet<NodeId> = indegree_by_node
577 .iter()
578 .filter_map(|(node_id, indegree): (&NodeId, &usize)| {
579 (*indegree == 0).then_some(node_id.clone())
580 })
581 .collect();
582 let mut order: Vec<NodeId> = Vec::with_capacity(indegree_by_node.len());
583
584 while let Some(node_id) = ready.pop_first() {
585 order.push(node_id.clone());
586
587 let Some(children): Option<&BTreeSet<NodeId>> = self.outgoing_by_node.get(&node_id)
588 else {
589 continue;
590 };
591
592 for child in children {
593 let Some(indegree): Option<&mut usize> = indegree_by_node.get_mut(child) else {
594 continue;
595 };
596 *indegree -= 1;
597 if *indegree == 0 {
598 ready.insert(child.clone());
599 }
600 }
601 }
602
603 if order.len() == self.node_ids.len() {
604 return Ok(order);
605 }
606
607 let remaining: BTreeSet<NodeId> = self
608 .node_ids
609 .iter()
610 .filter(|node_id: &&NodeId| !order.contains(node_id))
611 .cloned()
612 .collect();
613 let cycle: Vec<NodeId> = self.find_cycle(&remaining);
614 Err(WorkflowValidationError::CycleDetected { cycle })
615 }
616
617 fn find_cycle(&self, remaining: &BTreeSet<NodeId>) -> Vec<NodeId> {
618 #[derive(Clone, Copy, PartialEq, Eq)]
619 enum VisitState {
620 Visiting,
621 Visited,
622 }
623
624 fn dfs(
625 node_id: &NodeId,
626 topology: &GraphTopology,
627 remaining: &BTreeSet<NodeId>,
628 states: &mut BTreeMap<NodeId, VisitState>,
629 stack: &mut Vec<NodeId>,
630 ) -> Option<Vec<NodeId>> {
631 states.insert(node_id.clone(), VisitState::Visiting);
632 stack.push(node_id.clone());
633
634 let Some(children): Option<&BTreeSet<NodeId>> = topology.outgoing_by_node.get(node_id)
635 else {
636 stack.pop();
637 states.insert(node_id.clone(), VisitState::Visited);
638 return None;
639 };
640
641 for child in children {
642 if !remaining.contains(child) {
643 continue;
644 }
645
646 match states.get(child) {
647 Some(VisitState::Visiting) => {
648 if let Some(cycle) = cycle_from_stack(stack, child) {
649 return Some(cycle);
650 }
651 }
652 Some(VisitState::Visited) => {}
653 None => {
654 if let Some(cycle) = dfs(child, topology, remaining, states, stack) {
655 return Some(cycle);
656 }
657 }
658 }
659 }
660
661 stack.pop();
662 states.insert(node_id.clone(), VisitState::Visited);
663 None
664 }
665
666 fn cycle_from_stack(stack: &[NodeId], child: &NodeId) -> Option<Vec<NodeId>> {
667 let start_index: usize = stack.iter().position(|entry: &NodeId| entry == child)?;
668 let mut cycle: Vec<NodeId> = stack.iter().skip(start_index).cloned().collect();
669 cycle.push(child.clone());
670 Some(cycle)
671 }
672
673 let mut states: BTreeMap<NodeId, VisitState> = BTreeMap::new();
674 let mut stack: Vec<NodeId> = Vec::new();
675
676 for node_id in &self.node_ids {
677 if !remaining.contains(node_id) || states.contains_key(node_id) {
678 continue;
679 }
680
681 if let Some(cycle) = dfs(node_id, self, remaining, &mut states, &mut stack) {
682 return cycle;
683 }
684 }
685
686 remaining.iter().cloned().collect()
687 }
688}
689
690fn reject_duplicate_nodes(nodes: &[NodeDefinition]) -> Result<(), WorkflowValidationError> {
691 let mut seen: BTreeSet<NodeId> = BTreeSet::new();
692
693 for node in nodes {
694 if !seen.insert(node.id().clone()) {
695 return Err(WorkflowValidationError::DuplicateNode {
696 node_id: node.id().clone(),
697 });
698 }
699 }
700
701 Ok(())
702}
703
704fn reject_duplicate_ports(
705 node_id: &NodeId,
706 input_ports: &[PortId],
707 output_ports: &[PortId],
708) -> Result<(), WorkflowValidationError> {
709 let mut seen: BTreeSet<PortId> = BTreeSet::new();
710
711 for port_id in input_ports.iter().chain(output_ports) {
712 if !seen.insert(port_id.clone()) {
713 return Err(WorkflowValidationError::DuplicatePort {
714 node_id: node_id.clone(),
715 port_id: port_id.clone(),
716 });
717 }
718 }
719
720 Ok(())
721}
722
723#[cfg(test)]
724mod tests {
725 use super::*;
726 use pureflow_types::IdentifierKind;
727 use proptest::{collection::hash_set, prelude::*};
728 use quickcheck::{Arbitrary as QuickArbitrary, Gen, QuickCheck};
729 use std::num::NonZeroUsize;
730 use std::panic::{self, AssertUnwindSafe};
731
732 fn valid_identifier_strategy() -> impl Strategy<Value = String> {
733 prop::collection::vec(
734 any::<char>().prop_filter(
735 "identifier characters must not be whitespace or control",
736 |ch| !ch.is_whitespace() && !ch.is_control(),
737 ),
738 1..16,
739 )
740 .prop_map(|chars: Vec<char>| chars.into_iter().collect())
741 }
742
743 fn workflow_id(value: &str) -> WorkflowId {
744 WorkflowId::new(value).expect("valid workflow id")
745 }
746
747 fn node_id(value: &str) -> NodeId {
748 NodeId::new(value).expect("valid node id")
749 }
750
751 fn port_id(value: &str) -> PortId {
752 PortId::new(value).expect("valid port id")
753 }
754
755 fn endpoint(node: &str, port: &str) -> EdgeEndpoint {
756 EdgeEndpoint::new(node_id(node), port_id(port))
757 }
758
759 #[derive(Debug, Clone)]
760 struct GeneratedValidGraph {
761 nodes: Vec<NodeDefinition>,
762 edges: Vec<EdgeDefinition>,
763 }
764
765 impl QuickArbitrary for GeneratedValidGraph {
766 fn arbitrary(g: &mut Gen) -> Self {
767 let node_count = generated_count(g, 1, 6);
768 let nodes: Vec<NodeDefinition> = (0..node_count)
769 .map(|index| generated_routable_node(index))
770 .collect();
771 let mut edges = Vec::new();
772
773 for source in 0..node_count {
774 for target in (source + 1)..node_count {
775 if generated_bool(g) {
776 edges.push(generated_edge(source, target));
777 }
778 }
779 }
780
781 Self { nodes, edges }
782 }
783 }
784
785 #[derive(Debug, Clone)]
786 struct SmallNodeCount(usize);
787
788 impl QuickArbitrary for SmallNodeCount {
789 fn arbitrary(g: &mut Gen) -> Self {
790 Self(generated_count(g, 1, 6))
791 }
792 }
793
794 #[derive(Debug, Clone)]
795 struct GeneratedValidationCase {
796 scenario: ValidationScenario,
797 }
798
799 #[derive(Debug, Clone)]
800 enum ValidationScenario {
801 DuplicatePort {
802 node_id: NodeId,
803 port_id: PortId,
804 },
805 Graph {
806 nodes: Vec<NodeDefinition>,
807 edges: Vec<EdgeDefinition>,
808 expected: ExpectedGraphResult,
809 },
810 }
811
812 #[derive(Debug, Clone, Copy)]
813 enum ExpectedGraphResult {
814 Ok,
815 DuplicateNode,
816 UnknownNode(EdgeEndpointRole),
817 UnknownPort(EdgeEndpointRole, PortDirection),
818 CycleDetected,
819 }
820
821 impl QuickArbitrary for GeneratedValidationCase {
822 fn arbitrary(g: &mut Gen) -> Self {
823 let scenario = match generated_u8(g) % 8 {
824 0 => {
825 let graph = GeneratedValidGraph::arbitrary(g);
826 ValidationScenario::Graph {
827 nodes: graph.nodes,
828 edges: graph.edges,
829 expected: ExpectedGraphResult::Ok,
830 }
831 }
832 1 => ValidationScenario::Graph {
833 nodes: vec![generated_empty_node(0), generated_empty_node(0)],
834 edges: Vec::new(),
835 expected: ExpectedGraphResult::DuplicateNode,
836 },
837 2 => ValidationScenario::DuplicatePort {
838 node_id: generated_node_id(0),
839 port_id: port_id("dup"),
840 },
841 3 => ValidationScenario::Graph {
842 nodes: vec![generated_sink_node(0)],
843 edges: vec![EdgeDefinition::new(
844 EdgeEndpoint::new(node_id("missing_source"), port_id("out")),
845 EdgeEndpoint::new(generated_node_id(0), port_id("in")),
846 )],
847 expected: ExpectedGraphResult::UnknownNode(EdgeEndpointRole::Source),
848 },
849 4 => ValidationScenario::Graph {
850 nodes: vec![generated_source_node(0)],
851 edges: vec![EdgeDefinition::new(
852 EdgeEndpoint::new(generated_node_id(0), port_id("out")),
853 EdgeEndpoint::new(node_id("missing_target"), port_id("in")),
854 )],
855 expected: ExpectedGraphResult::UnknownNode(EdgeEndpointRole::Target),
856 },
857 5 => ValidationScenario::Graph {
858 nodes: vec![generated_sink_node(0), generated_sink_node(1)],
859 edges: vec![EdgeDefinition::new(
860 EdgeEndpoint::new(generated_node_id(0), port_id("in")),
861 EdgeEndpoint::new(generated_node_id(1), port_id("in")),
862 )],
863 expected: ExpectedGraphResult::UnknownPort(
864 EdgeEndpointRole::Source,
865 PortDirection::Output,
866 ),
867 },
868 6 => ValidationScenario::Graph {
869 nodes: vec![generated_source_node(0), generated_source_node(1)],
870 edges: vec![EdgeDefinition::new(
871 EdgeEndpoint::new(generated_node_id(0), port_id("out")),
872 EdgeEndpoint::new(generated_node_id(1), port_id("out")),
873 )],
874 expected: ExpectedGraphResult::UnknownPort(
875 EdgeEndpointRole::Target,
876 PortDirection::Input,
877 ),
878 },
879 _ => {
880 let (nodes, edges) = generated_cycle_graph(g);
881 ValidationScenario::Graph {
882 nodes,
883 edges,
884 expected: ExpectedGraphResult::CycleDetected,
885 }
886 }
887 };
888
889 Self { scenario }
890 }
891 }
892
893 fn generated_count(g: &mut Gen, min: usize, max_exclusive: usize) -> usize {
894 min + (generated_usize(g) % (max_exclusive - min))
895 }
896
897 fn generated_bool(g: &mut Gen) -> bool {
898 <bool as QuickArbitrary>::arbitrary(g)
899 }
900
901 fn generated_u8(g: &mut Gen) -> u8 {
902 <u8 as QuickArbitrary>::arbitrary(g)
903 }
904
905 fn generated_usize(g: &mut Gen) -> usize {
906 <usize as QuickArbitrary>::arbitrary(g)
907 }
908
909 fn generated_node_id(index: usize) -> NodeId {
910 node_id(&format!("node_{index}"))
911 }
912
913 fn generated_routable_node(index: usize) -> NodeDefinition {
914 NodeDefinition::new(generated_node_id(index), [port_id("in")], [port_id("out")])
915 .expect("generated routable node is valid")
916 }
917
918 fn generated_source_node(index: usize) -> NodeDefinition {
919 NodeDefinition::new(
920 generated_node_id(index),
921 Vec::<PortId>::new(),
922 [port_id("out")],
923 )
924 .expect("generated source node is valid")
925 }
926
927 fn generated_sink_node(index: usize) -> NodeDefinition {
928 NodeDefinition::new(
929 generated_node_id(index),
930 [port_id("in")],
931 Vec::<PortId>::new(),
932 )
933 .expect("generated sink node is valid")
934 }
935
936 fn generated_empty_node(index: usize) -> NodeDefinition {
937 NodeDefinition::new(
938 generated_node_id(index),
939 Vec::<PortId>::new(),
940 Vec::<PortId>::new(),
941 )
942 .expect("generated empty node is valid")
943 }
944
945 fn generated_edge(source: usize, target: usize) -> EdgeDefinition {
946 EdgeDefinition::new(
947 EdgeEndpoint::new(generated_node_id(source), port_id("out")),
948 EdgeEndpoint::new(generated_node_id(target), port_id("in")),
949 )
950 }
951
952 fn generated_cycle_graph(g: &mut Gen) -> (Vec<NodeDefinition>, Vec<EdgeDefinition>) {
953 let node_count = generated_count(g, 2, 7);
954 let nodes = (0..node_count).map(generated_routable_node).collect();
955 let edges = (0..node_count)
956 .map(|source| generated_edge(source, (source + 1) % node_count))
957 .collect();
958
959 (nodes, edges)
960 }
961
962 fn generated_fan_out_graph(target_count: usize) -> (Vec<NodeDefinition>, Vec<EdgeDefinition>) {
963 let mut nodes = vec![generated_source_node(0)];
964 let mut edges = Vec::new();
965
966 for target in 1..=target_count {
967 nodes.push(generated_sink_node(target));
968 edges.push(generated_edge(0, target));
969 }
970
971 (nodes, edges)
972 }
973
974 fn generated_fan_in_graph(source_count: usize) -> (Vec<NodeDefinition>, Vec<EdgeDefinition>) {
975 let sink_index = source_count;
976 let mut nodes = Vec::new();
977 let mut edges = Vec::new();
978
979 for source in 0..source_count {
980 nodes.push(generated_source_node(source));
981 edges.push(generated_edge(source, sink_index));
982 }
983
984 nodes.push(generated_sink_node(sink_index));
985 (nodes, edges)
986 }
987
988 fn validate_generated_case(case: &GeneratedValidationCase) -> bool {
989 match &case.scenario {
990 ValidationScenario::DuplicatePort { node_id, port_id } => matches!(
991 NodeDefinition::new(node_id.clone(), [port_id.clone()], [port_id.clone()]),
992 Err(WorkflowValidationError::DuplicatePort { .. })
993 ),
994 ValidationScenario::Graph {
995 nodes,
996 edges,
997 expected,
998 } => graph_result_matches(WorkflowGraph::new(nodes.clone(), edges.clone()), *expected),
999 }
1000 }
1001
1002 fn graph_result_matches(
1003 result: Result<WorkflowGraph, WorkflowValidationError>,
1004 expected: ExpectedGraphResult,
1005 ) -> bool {
1006 match (result, expected) {
1007 (Ok(_), ExpectedGraphResult::Ok) => true,
1008 (
1009 Err(WorkflowValidationError::DuplicateNode { .. }),
1010 ExpectedGraphResult::DuplicateNode,
1011 ) => true,
1012 (
1013 Err(WorkflowValidationError::UnknownNode { endpoint, .. }),
1014 ExpectedGraphResult::UnknownNode(expected_endpoint),
1015 ) => endpoint == expected_endpoint,
1016 (
1017 Err(WorkflowValidationError::UnknownPort {
1018 endpoint, expected, ..
1019 }),
1020 ExpectedGraphResult::UnknownPort(expected_endpoint, expected_direction),
1021 ) => endpoint == expected_endpoint && expected == expected_direction,
1022 (
1023 Err(WorkflowValidationError::CycleDetected { cycle }),
1024 ExpectedGraphResult::CycleDetected,
1025 ) => !cycle.is_empty(),
1026 _ => false,
1027 }
1028 }
1029
1030 #[test]
1031 fn empty_workflow_uses_valid_identifier() {
1032 let workflow = WorkflowDefinition::empty("pureflow-scaffold").expect("valid id");
1033
1034 assert_eq!(workflow.id().as_str(), "pureflow-scaffold");
1035 assert!(workflow.nodes().is_empty());
1036 assert!(workflow.edges().is_empty());
1037 }
1038
1039 #[test]
1040 fn empty_workflow_rejects_invalid_identifier() {
1041 let err = WorkflowDefinition::empty("bad workflow").expect_err("whitespace must fail");
1042 assert_eq!(
1043 err,
1044 IdentifierError::Whitespace {
1045 kind: IdentifierKind::Workflow
1046 }
1047 );
1048 }
1049
1050 #[test]
1051 fn valid_workflow_represents_nodes_ports_and_edges() {
1052 let producer = NodeDefinition::new(
1053 node_id("producer"),
1054 Vec::<PortId>::new(),
1055 [port_id("records")],
1056 )
1057 .expect("valid producer");
1058 let consumer = NodeDefinition::new(
1059 node_id("consumer"),
1060 [port_id("records")],
1061 Vec::<PortId>::new(),
1062 )
1063 .expect("valid consumer");
1064 let edge = EdgeDefinition::new(
1065 endpoint("producer", "records"),
1066 endpoint("consumer", "records"),
1067 );
1068
1069 let workflow =
1070 WorkflowDefinition::from_parts(workflow_id("ingest"), [producer, consumer], [edge])
1071 .expect("valid graph");
1072
1073 assert_eq!(workflow.id().as_str(), "ingest");
1074 assert_eq!(workflow.nodes().len(), 2);
1075 assert_eq!(workflow.edges().len(), 1);
1076 }
1077
1078 #[test]
1079 fn edge_capacity_defaults_to_engine_default_policy() {
1080 let edge = EdgeDefinition::new(endpoint("producer", "records"), endpoint("consumer", "in"));
1081
1082 assert_eq!(edge.capacity(), EdgeCapacity::Default);
1083 assert_eq!(
1084 edge.capacity()
1085 .resolve(NonZeroUsize::new(7).expect("nonzero")),
1086 NonZeroUsize::new(7).expect("nonzero")
1087 );
1088 }
1089
1090 #[test]
1091 fn edge_capacity_round_trips_explicit_value() {
1092 let capacity: NonZeroUsize = NonZeroUsize::new(3).expect("nonzero");
1093 let edge = EdgeDefinition::with_capacity(
1094 endpoint("producer", "records"),
1095 endpoint("consumer", "in"),
1096 capacity,
1097 );
1098
1099 assert_eq!(edge.capacity(), EdgeCapacity::Explicit(capacity));
1100 assert_eq!(
1101 edge.capacity()
1102 .resolve(NonZeroUsize::new(7).expect("nonzero")),
1103 capacity
1104 );
1105 }
1106
1107 #[test]
1108 fn topological_order_returns_sources_before_sinks() {
1109 let producer =
1110 NodeDefinition::new(node_id("producer"), Vec::<PortId>::new(), [port_id("out")])
1111 .expect("valid producer");
1112 let consumer =
1113 NodeDefinition::new(node_id("consumer"), [port_id("in")], Vec::<PortId>::new())
1114 .expect("valid consumer");
1115 let edge = EdgeDefinition::new(endpoint("producer", "out"), endpoint("consumer", "in"));
1116 let graph = WorkflowGraph::new([producer, consumer], [edge]).expect("valid graph");
1117
1118 assert_eq!(
1119 graph
1120 .topological_order()
1121 .expect("acyclic graph should order"),
1122 vec![node_id("producer"), node_id("consumer")]
1123 );
1124 }
1125
1126 #[test]
1127 fn workflow_graph_rejects_cycles_by_default() {
1128 let first = NodeDefinition::new(node_id("first"), [port_id("in")], [port_id("out")])
1129 .expect("valid first node");
1130 let second = NodeDefinition::new(node_id("second"), [port_id("in")], [port_id("out")])
1131 .expect("valid second node");
1132 let edges = [
1133 EdgeDefinition::new(endpoint("first", "out"), endpoint("second", "in")),
1134 EdgeDefinition::new(endpoint("second", "out"), endpoint("first", "in")),
1135 ];
1136
1137 let err = WorkflowGraph::new([first, second], edges).expect_err("cycle must fail");
1138
1139 assert!(
1140 matches!(err, WorkflowValidationError::CycleDetected { cycle } if cycle.contains(&node_id("first")) && cycle.contains(&node_id("second")))
1141 );
1142 }
1143
1144 #[test]
1145 fn workflow_graph_with_cycles_allowed_keeps_ordering_diagnostics_available() {
1146 let first = NodeDefinition::new(node_id("first"), [port_id("in")], [port_id("out")])
1147 .expect("valid first node");
1148 let second = NodeDefinition::new(node_id("second"), [port_id("in")], [port_id("out")])
1149 .expect("valid second node");
1150 let edges = [
1151 EdgeDefinition::new(endpoint("first", "out"), endpoint("second", "in")),
1152 EdgeDefinition::new(endpoint("second", "out"), endpoint("first", "in")),
1153 ];
1154
1155 let graph = WorkflowGraph::with_cycles_allowed([first, second], edges)
1156 .expect("cycle-allowed graph should build");
1157
1158 let err = graph
1159 .topological_order()
1160 .expect_err("cycle should still be reported by ordering");
1161 assert!(matches!(err, WorkflowValidationError::CycleDetected { .. }));
1162 }
1163
1164 #[test]
1165 fn duplicate_nodes_are_rejected() {
1166 let first =
1167 NodeDefinition::new(node_id("step"), Vec::<PortId>::new(), Vec::<PortId>::new())
1168 .expect("valid node");
1169 let second =
1170 NodeDefinition::new(node_id("step"), Vec::<PortId>::new(), Vec::<PortId>::new())
1171 .expect("valid node");
1172
1173 let err = WorkflowGraph::new([first, second], Vec::<EdgeDefinition>::new())
1174 .expect_err("duplicate nodes must fail");
1175
1176 assert_eq!(
1177 err,
1178 WorkflowValidationError::DuplicateNode {
1179 node_id: node_id("step")
1180 }
1181 );
1182 }
1183
1184 #[test]
1185 fn duplicate_ports_on_one_node_are_rejected() {
1186 let err = NodeDefinition::new(node_id("step"), [port_id("value")], [port_id("value")])
1187 .expect_err("duplicate ports must fail");
1188
1189 assert_eq!(
1190 err,
1191 WorkflowValidationError::DuplicatePort {
1192 node_id: node_id("step"),
1193 port_id: port_id("value")
1194 }
1195 );
1196 }
1197
1198 #[test]
1199 fn edge_source_must_reference_existing_node() {
1200 let consumer = NodeDefinition::new(
1201 node_id("consumer"),
1202 [port_id("records")],
1203 Vec::<PortId>::new(),
1204 )
1205 .expect("valid consumer");
1206 let edge = EdgeDefinition::new(
1207 endpoint("missing", "records"),
1208 endpoint("consumer", "records"),
1209 );
1210
1211 let err = WorkflowGraph::new([consumer], [edge]).expect_err("missing source must fail");
1212
1213 assert_eq!(
1214 err,
1215 WorkflowValidationError::UnknownNode {
1216 edge_index: 0,
1217 endpoint: EdgeEndpointRole::Source,
1218 node_id: node_id("missing")
1219 }
1220 );
1221 }
1222
1223 #[test]
1224 fn edge_source_must_reference_output_port() {
1225 let producer = NodeDefinition::new(
1226 node_id("producer"),
1227 [port_id("records")],
1228 Vec::<PortId>::new(),
1229 )
1230 .expect("valid producer");
1231 let consumer = NodeDefinition::new(
1232 node_id("consumer"),
1233 [port_id("records")],
1234 Vec::<PortId>::new(),
1235 )
1236 .expect("valid consumer");
1237 let edge = EdgeDefinition::new(
1238 endpoint("producer", "records"),
1239 endpoint("consumer", "records"),
1240 );
1241
1242 let err = WorkflowGraph::new([producer, consumer], [edge])
1243 .expect_err("input source port must fail");
1244
1245 assert_eq!(
1246 err,
1247 WorkflowValidationError::UnknownPort {
1248 edge_index: 0,
1249 endpoint: EdgeEndpointRole::Source,
1250 node_id: node_id("producer"),
1251 port_id: port_id("records"),
1252 expected: PortDirection::Output
1253 }
1254 );
1255 }
1256
1257 #[test]
1258 fn edge_target_must_reference_input_port() {
1259 let producer = NodeDefinition::new(
1260 node_id("producer"),
1261 Vec::<PortId>::new(),
1262 [port_id("records")],
1263 )
1264 .expect("valid producer");
1265 let consumer = NodeDefinition::new(
1266 node_id("consumer"),
1267 Vec::<PortId>::new(),
1268 [port_id("records")],
1269 )
1270 .expect("valid consumer");
1271 let edge = EdgeDefinition::new(
1272 endpoint("producer", "records"),
1273 endpoint("consumer", "records"),
1274 );
1275
1276 let err = WorkflowGraph::new([producer, consumer], [edge])
1277 .expect_err("output target port must fail");
1278
1279 assert_eq!(
1280 err,
1281 WorkflowValidationError::UnknownPort {
1282 edge_index: 0,
1283 endpoint: EdgeEndpointRole::Target,
1284 node_id: node_id("consumer"),
1285 port_id: port_id("records"),
1286 expected: PortDirection::Input
1287 }
1288 );
1289 }
1290
1291 #[test]
1292 fn generated_acyclic_graphs_with_disconnected_nodes_validate() {
1293 fn property(graph: GeneratedValidGraph) -> bool {
1294 WorkflowGraph::new(graph.nodes, graph.edges).is_ok()
1295 }
1296
1297 QuickCheck::new()
1298 .tests(128)
1299 .quickcheck(property as fn(GeneratedValidGraph) -> bool);
1300 }
1301
1302 #[test]
1303 fn generated_validation_cases_return_consistent_error_variants_without_panicking() {
1304 fn property(case: GeneratedValidationCase) -> bool {
1305 panic::catch_unwind(AssertUnwindSafe(|| validate_generated_case(&case)))
1306 .unwrap_or(false)
1307 }
1308
1309 QuickCheck::new()
1310 .tests(128)
1311 .quickcheck(property as fn(GeneratedValidationCase) -> bool);
1312 }
1313
1314 #[test]
1315 fn generated_fan_out_topologies_validate() {
1316 fn property(count: SmallNodeCount) -> bool {
1317 let (nodes, edges) = generated_fan_out_graph(count.0);
1318
1319 WorkflowGraph::new(nodes, edges).is_ok()
1320 }
1321
1322 QuickCheck::new()
1323 .tests(128)
1324 .quickcheck(property as fn(SmallNodeCount) -> bool);
1325 }
1326
1327 #[test]
1328 fn generated_fan_in_topologies_validate() {
1329 fn property(count: SmallNodeCount) -> bool {
1330 let (nodes, edges) = generated_fan_in_graph(count.0);
1331
1332 WorkflowGraph::new(nodes, edges).is_ok()
1333 }
1334
1335 QuickCheck::new()
1336 .tests(128)
1337 .quickcheck(property as fn(SmallNodeCount) -> bool);
1338 }
1339
1340 fn build_linear_workflow(node_names: &[String]) -> WorkflowDefinition {
1341 let mut nodes: Vec<NodeDefinition> = Vec::new();
1342 let mut edges: Vec<EdgeDefinition> = Vec::new();
1343
1344 for (index, node_name) in node_names.iter().enumerate() {
1345 let mut input_ports: Vec<PortId> = Vec::new();
1346 let mut output_ports: Vec<PortId> = Vec::new();
1347
1348 if index > 0 {
1349 input_ports.push(port_id("in"));
1350 }
1351
1352 if index + 1 < node_names.len() {
1353 output_ports.push(port_id("out"));
1354 }
1355
1356 nodes.push(
1357 NodeDefinition::new(node_id(node_name), input_ports, output_ports)
1358 .expect("linear workflow nodes must be valid"),
1359 );
1360 }
1361
1362 for edge in node_names.windows(2) {
1363 edges.push(EdgeDefinition::new(
1364 endpoint(&edge[0], "out"),
1365 endpoint(&edge[1], "in"),
1366 ));
1367 }
1368
1369 WorkflowDefinition::from_parts(workflow_id("flow"), nodes, edges)
1370 .expect("linear workflow must be valid")
1371 }
1372
1373 proptest! {
1374 #[test]
1375 fn linear_workflows_with_unique_valid_node_ids_validate(
1376 node_names in hash_set(valid_identifier_strategy(), 1..6)
1377 ) {
1378 let mut node_names: Vec<String> = node_names.into_iter().collect();
1379 node_names.sort();
1380
1381 let workflow: WorkflowDefinition = build_linear_workflow(&node_names);
1382
1383 prop_assert_eq!(workflow.nodes().len(), node_names.len());
1384 prop_assert_eq!(workflow.edges().len(), node_names.len().saturating_sub(1));
1385 }
1386 }
1387}