Skip to main content

pureflow_contract/
lib.rs

1//! Node contract data and validation for Pureflow.
2//!
3//! ## Fragment: contract-data-boundary
4//!
5//! This crate keeps the contract layer thin: it owns typed contract metadata
6//! and validation against validated workflow topology plus existing capability
7//! descriptors. It does not add a second capability model, a parser, or a
8//! runtime.
9//!
10//! ## Fragment: contract-validation-boundary
11//!
12//! Validation is intentionally staged. Workflow structure is still owned by
13//! `pureflow-workflow`, capability descriptors remain in `pureflow-core`, and
14//! this crate only verifies that the two line up with the contract metadata
15//! supplied for inspection or execution planning.
16
17use std::collections::BTreeMap;
18use std::error::Error;
19use std::fmt;
20
21use pureflow_core::{
22    RetryDisposition,
23    capability::{
24        CapabilityValidationError, EffectCapability, NodeCapabilities,
25        validate_workflow_capabilities,
26    },
27};
28use pureflow_types::{NodeId, PortId};
29use pureflow_workflow::{PortDirection, WorkflowDefinition};
30
31/// Opaque schema reference attached to a contract port.
32#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
33pub struct SchemaRef(String);
34
35impl SchemaRef {
36    /// Create an opaque schema reference.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the supplied reference is empty or whitespace-only.
41    pub fn new(value: impl Into<String>) -> Result<Self, ContractValidationError> {
42        let value: String = value.into();
43        if value.trim().is_empty() {
44            return Err(ContractValidationError::EmptySchemaRef);
45        }
46
47        Ok(Self(value))
48    }
49
50    /// Raw schema reference string.
51    #[must_use]
52    pub fn as_str(&self) -> &str {
53        &self.0
54    }
55}
56
57impl fmt::Display for SchemaRef {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.write_str(&self.0)
60    }
61}
62
63/// Declared execution mode for one node contract.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum ExecutionMode {
66    /// Host-native execution.
67    Native,
68    /// WASM component execution.
69    Wasm,
70    /// Future process-backed execution.
71    Process,
72}
73
74/// Contract-level determinism declaration.
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum Determinism {
77    /// The node is deterministic for the same input and execution metadata.
78    Deterministic,
79    /// The node may vary across runs.
80    NonDeterministic,
81    /// The contract does not yet declare determinism.
82    Unknown,
83}
84
85/// Contract-side declaration for one node port.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct PortContract {
88    port_id: PortId,
89    direction: PortDirection,
90    schema: Option<SchemaRef>,
91}
92
93impl PortContract {
94    /// Create a port contract.
95    #[must_use]
96    pub const fn new(port_id: PortId, direction: PortDirection, schema: Option<SchemaRef>) -> Self {
97        Self {
98            port_id,
99            direction,
100            schema,
101        }
102    }
103
104    /// Declared port identifier.
105    #[must_use]
106    pub const fn port_id(&self) -> &PortId {
107        &self.port_id
108    }
109
110    /// Declared port direction.
111    #[must_use]
112    pub const fn direction(&self) -> PortDirection {
113        self.direction
114    }
115
116    /// Schema reference attached to this port, if any.
117    #[must_use]
118    pub const fn schema(&self) -> Option<&SchemaRef> {
119        self.schema.as_ref()
120    }
121}
122
123/// Validation error for contract metadata.
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum ContractValidationError {
126    /// A schema reference was empty or whitespace-only.
127    EmptySchemaRef,
128    /// A node contract duplicated the same port identifier.
129    DuplicatePortContract {
130        /// Node whose contract is invalid.
131        node_id: NodeId,
132        /// Duplicated port identifier.
133        port_id: PortId,
134    },
135    /// A contract referenced a workflow node that does not exist.
136    UnknownWorkflowNode {
137        /// Missing node identifier.
138        node_id: NodeId,
139    },
140    /// A contract referenced a port that does not exist on the workflow node.
141    UnknownWorkflowPort {
142        /// Node identifier.
143        node_id: NodeId,
144        /// Port identifier.
145        port_id: PortId,
146        /// Direction declared by the contract.
147        direction: PortDirection,
148    },
149    /// A contract declared a port with a different direction than the workflow.
150    PortDirectionMismatch {
151        /// Node identifier.
152        node_id: NodeId,
153        /// Port identifier.
154        port_id: PortId,
155        /// Direction declared by the workflow.
156        workflow: PortDirection,
157        /// Direction declared by the contract.
158        contract: PortDirection,
159    },
160    /// A contract referenced a node with no matching capability descriptor.
161    MissingCapabilityDescriptor {
162        /// Node identifier.
163        node_id: NodeId,
164    },
165    /// A contract declared a schema mismatch on a workflow edge.
166    SchemaMismatch {
167        /// Zero-based edge index.
168        edge_index: usize,
169        /// Source node identifier.
170        source_node_id: NodeId,
171        /// Source port identifier.
172        source_port_id: PortId,
173        /// Target node identifier.
174        target_node_id: NodeId,
175        /// Target port identifier.
176        target_port_id: PortId,
177    },
178    /// Capability validation failed.
179    Capability {
180        /// Underlying capability validation error.
181        error: CapabilityValidationError,
182    },
183}
184
185impl fmt::Display for ContractValidationError {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        match self {
188            Self::EmptySchemaRef => write!(f, "schema reference cannot be empty"),
189            Self::DuplicatePortContract { node_id, port_id } => {
190                write!(
191                    f,
192                    "node `{node_id}` declares duplicate contract port `{port_id}`"
193                )
194            }
195            Self::UnknownWorkflowNode { node_id } => {
196                write!(f, "contract references unknown workflow node `{node_id}`")
197            }
198            Self::UnknownWorkflowPort {
199                node_id,
200                port_id,
201                direction,
202            } => write!(
203                f,
204                "node `{node_id}` contract references unknown {} workflow port `{port_id}`",
205                port_direction_label(*direction)
206            ),
207            Self::PortDirectionMismatch {
208                node_id,
209                port_id,
210                workflow,
211                contract,
212            } => write!(
213                f,
214                "node `{node_id}` contract port `{port_id}` is {} but workflow declares {}",
215                port_direction_label(*contract),
216                port_direction_label(*workflow)
217            ),
218            Self::MissingCapabilityDescriptor { node_id } => {
219                write!(
220                    f,
221                    "no capability descriptor supplied for workflow node `{node_id}`"
222                )
223            }
224            Self::SchemaMismatch {
225                edge_index,
226                source_node_id,
227                source_port_id,
228                target_node_id,
229                target_port_id,
230            } => write!(
231                f,
232                "edge {edge_index} from `{source_node_id}:{source_port_id}` to `{target_node_id}:{target_port_id}` has incompatible schemas"
233            ),
234            Self::Capability { error } => write!(f, "capability validation failed: {error}"),
235        }
236    }
237}
238
239impl Error for ContractValidationError {}
240
241impl From<CapabilityValidationError> for ContractValidationError {
242    fn from(error: CapabilityValidationError) -> Self {
243        Self::Capability { error }
244    }
245}
246
247/// Contract metadata for one node.
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct NodeContract {
250    id: NodeId,
251    ports: Vec<PortContract>,
252    execution_mode: ExecutionMode,
253    determinism: Determinism,
254    retry: RetryDisposition,
255}
256
257impl NodeContract {
258    /// Create a contract for one node.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the contract repeats a port identifier.
263    pub fn new(
264        id: NodeId,
265        ports: impl Into<Vec<PortContract>>,
266        execution_mode: ExecutionMode,
267        determinism: Determinism,
268        retry: RetryDisposition,
269    ) -> Result<Self, ContractValidationError> {
270        let ports: Vec<PortContract> = ports.into();
271        reject_duplicate_ports(&id, &ports)?;
272
273        Ok(Self {
274            id,
275            ports,
276            execution_mode,
277            determinism,
278            retry,
279        })
280    }
281
282    /// Node identifier.
283    #[must_use]
284    pub const fn id(&self) -> &NodeId {
285        &self.id
286    }
287
288    /// Declared ports.
289    #[must_use]
290    pub fn ports(&self) -> &[PortContract] {
291        &self.ports
292    }
293
294    /// Declared execution mode.
295    #[must_use]
296    pub const fn execution_mode(&self) -> ExecutionMode {
297        self.execution_mode
298    }
299
300    /// Declared determinism.
301    #[must_use]
302    pub const fn determinism(&self) -> Determinism {
303        self.determinism
304    }
305
306    /// Declared retry disposition.
307    #[must_use]
308    pub const fn retry(&self) -> RetryDisposition {
309        self.retry
310    }
311
312    fn port_map(&self) -> BTreeMap<&PortId, &PortContract> {
313        self.ports
314            .iter()
315            .map(|port: &PortContract| (port.port_id(), port))
316            .collect()
317    }
318}
319
320/// Validate workflow topology, capability descriptors, and node contracts together.
321///
322/// # Errors
323///
324/// Returns an error if contracts do not match the workflow topology, if a
325/// capability descriptor is missing or inconsistent, or if schemas disagree on
326/// a connected edge.
327pub fn validate_workflow_contracts(
328    workflow: &WorkflowDefinition,
329    contracts: &[NodeContract],
330    capabilities: &[NodeCapabilities],
331) -> Result<(), ContractValidationError> {
332    validate_workflow_capabilities(workflow, capabilities)?;
333
334    let contract_map: BTreeMap<&NodeId, &NodeContract> = contracts
335        .iter()
336        .map(|contract: &NodeContract| (contract.id(), contract))
337        .collect();
338    let capability_map: BTreeMap<&NodeId, &NodeCapabilities> = capabilities
339        .iter()
340        .map(|capability: &NodeCapabilities| (capability.node_id(), capability))
341        .collect();
342
343    for node in workflow.nodes() {
344        let contract: &NodeContract = contract_map.get(node.id()).copied().ok_or_else(|| {
345            ContractValidationError::UnknownWorkflowNode {
346                node_id: node.id().clone(),
347            }
348        })?;
349        let capability: &NodeCapabilities =
350            capability_map.get(node.id()).copied().ok_or_else(|| {
351                ContractValidationError::MissingCapabilityDescriptor {
352                    node_id: node.id().clone(),
353                }
354            })?;
355
356        validate_node_contract(node, contract)?;
357        validate_enforceable_capabilities(contract, capability)?;
358    }
359
360    for contract in contracts {
361        if workflow
362            .nodes()
363            .iter()
364            .all(|node: &pureflow_workflow::NodeDefinition| node.id() != contract.id())
365        {
366            return Err(ContractValidationError::UnknownWorkflowNode {
367                node_id: contract.id().clone(),
368            });
369        }
370    }
371
372    validate_edge_schema_compatibility(workflow, &contract_map)
373}
374
375fn validate_enforceable_capabilities(
376    contract: &NodeContract,
377    capability: &NodeCapabilities,
378) -> Result<(), ContractValidationError> {
379    match contract.execution_mode() {
380        ExecutionMode::Native => Ok(()),
381        ExecutionMode::Wasm | ExecutionMode::Process => {
382            for effect in capability.effects() {
383                if !strict_boundary_supports_effect(contract.execution_mode(), *effect) {
384                    return Err(CapabilityValidationError::UnenforceableEffectCapability {
385                        node_id: capability.node_id().clone(),
386                        effect: *effect,
387                    }
388                    .into());
389                }
390            }
391
392            Ok(())
393        }
394    }
395}
396
397const fn strict_boundary_supports_effect(
398    _execution_mode: ExecutionMode,
399    _effect: EffectCapability,
400) -> bool {
401    // The current strict boundaries import no host effects. Future WASI/process
402    // adapters should add explicit allowlist entries here as imports land.
403    false
404}
405
406fn validate_node_contract(
407    node: &pureflow_workflow::NodeDefinition,
408    contract: &NodeContract,
409) -> Result<(), ContractValidationError> {
410    let workflow_ports: BTreeMap<&PortId, PortDirection> = node
411        .input_ports()
412        .iter()
413        .map(|port_id: &PortId| (port_id, PortDirection::Input))
414        .chain(
415            node.output_ports()
416                .iter()
417                .map(|port_id: &PortId| (port_id, PortDirection::Output)),
418        )
419        .collect();
420    let contract_ports: BTreeMap<&PortId, &PortContract> = contract.port_map();
421
422    for (port_id, workflow_direction) in &workflow_ports {
423        let contract_port: &&PortContract = contract_ports.get(port_id).ok_or_else(|| {
424            ContractValidationError::UnknownWorkflowPort {
425                node_id: node.id().clone(),
426                port_id: (*port_id).clone(),
427                direction: *workflow_direction,
428            }
429        })?;
430
431        if contract_port.direction() != *workflow_direction {
432            return Err(ContractValidationError::PortDirectionMismatch {
433                node_id: node.id().clone(),
434                port_id: (*port_id).clone(),
435                workflow: *workflow_direction,
436                contract: contract_port.direction(),
437            });
438        }
439    }
440
441    for (port_id, contract_port) in &contract_ports {
442        if !workflow_ports.contains_key(port_id) {
443            return Err(ContractValidationError::UnknownWorkflowPort {
444                node_id: node.id().clone(),
445                port_id: (*port_id).clone(),
446                direction: contract_port.direction(),
447            });
448        }
449    }
450
451    Ok(())
452}
453
454fn validate_edge_schema_compatibility(
455    workflow: &WorkflowDefinition,
456    contracts: &BTreeMap<&NodeId, &NodeContract>,
457) -> Result<(), ContractValidationError> {
458    // Schema compatibility is exact equality while SchemaRef remains opaque.
459    for (edge_index, edge) in workflow.edges().iter().enumerate() {
460        let source_contract: &&NodeContract =
461            contracts.get(edge.source().node_id()).ok_or_else(|| {
462                ContractValidationError::UnknownWorkflowNode {
463                    node_id: edge.source().node_id().clone(),
464                }
465            })?;
466        let target_contract: &&NodeContract =
467            contracts.get(edge.target().node_id()).ok_or_else(|| {
468                ContractValidationError::UnknownWorkflowNode {
469                    node_id: edge.target().node_id().clone(),
470                }
471            })?;
472
473        let source_schema: Option<&SchemaRef> = source_contract
474            .ports()
475            .iter()
476            .find(|port: &&PortContract| port.port_id() == edge.source().port_id())
477            .and_then(PortContract::schema);
478        let target_schema: Option<&SchemaRef> = target_contract
479            .ports()
480            .iter()
481            .find(|port: &&PortContract| port.port_id() == edge.target().port_id())
482            .and_then(PortContract::schema);
483
484        if let (Some(source_schema), Some(target_schema)) = (source_schema, target_schema)
485            && source_schema != target_schema
486        {
487            return Err(ContractValidationError::SchemaMismatch {
488                edge_index,
489                source_node_id: edge.source().node_id().clone(),
490                source_port_id: edge.source().port_id().clone(),
491                target_node_id: edge.target().node_id().clone(),
492                target_port_id: edge.target().port_id().clone(),
493            });
494        }
495    }
496
497    Ok(())
498}
499
500fn reject_duplicate_ports(
501    node_id: &NodeId,
502    ports: &[PortContract],
503) -> Result<(), ContractValidationError> {
504    let mut seen: BTreeMap<&PortId, PortDirection> = BTreeMap::new();
505
506    for port in ports {
507        if seen.insert(port.port_id(), port.direction()).is_some() {
508            return Err(ContractValidationError::DuplicatePortContract {
509                node_id: node_id.clone(),
510                port_id: port.port_id().clone(),
511            });
512        }
513    }
514
515    Ok(())
516}
517
518const fn port_direction_label(direction: PortDirection) -> &'static str {
519    match direction {
520        PortDirection::Input => "input",
521        PortDirection::Output => "output",
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use std::collections::BTreeSet;
529
530    use pureflow_core::{
531        RetryDisposition,
532        capability::{EffectCapability, NodeCapabilities, PortCapability, PortCapabilityDirection},
533    };
534    use pureflow_test_kit::{NodeBuilder, WorkflowBuilder, node_id, port_id, workflow_id};
535    use pureflow_workflow::{EdgeDefinition, EdgeEndpoint, NodeDefinition};
536    use quickcheck::{Arbitrary, Gen, QuickCheck};
537
538    fn schema(value: &str) -> SchemaRef {
539        SchemaRef::new(value).expect("valid schema ref")
540    }
541
542    fn contract(
543        node: &str,
544        ports: Vec<PortContract>,
545        execution_mode: ExecutionMode,
546    ) -> NodeContract {
547        NodeContract::new(
548            node_id(node),
549            ports,
550            execution_mode,
551            Determinism::Deterministic,
552            RetryDisposition::Safe,
553        )
554        .expect("valid contract")
555    }
556
557    fn capabilities(node: &str, ports: Vec<PortCapability>) -> NodeCapabilities {
558        NodeCapabilities::new(node_id(node), ports, [EffectCapability::Clock])
559            .expect("valid capability")
560    }
561
562    fn passive_capabilities(node: &str, ports: Vec<PortCapability>) -> NodeCapabilities {
563        NodeCapabilities::native_passive(node_id(node), ports).expect("valid capability")
564    }
565
566    #[derive(Debug, Clone)]
567    struct NonEmptySchemaString(String);
568
569    impl Arbitrary for NonEmptySchemaString {
570        fn arbitrary(g: &mut Gen) -> Self {
571            let value = String::arbitrary(g);
572            if value.trim().is_empty() {
573                Self("schema://generated".to_string())
574            } else {
575                Self(value)
576            }
577        }
578    }
579
580    #[derive(Debug, Clone)]
581    struct MatchingWorkflowContractCase {
582        workflow: WorkflowDefinition,
583        contracts: Vec<NodeContract>,
584        capabilities: Vec<NodeCapabilities>,
585    }
586
587    impl Arbitrary for MatchingWorkflowContractCase {
588        fn arbitrary(g: &mut Gen) -> Self {
589            let node_count = usize::arbitrary(g) % 5 + 1;
590            let mut inputs_by_node: Vec<BTreeSet<PortId>> = vec![BTreeSet::new(); node_count];
591            let mut outputs_by_node: Vec<BTreeSet<PortId>> = vec![BTreeSet::new(); node_count];
592            let mut edges = Vec::new();
593
594            for source in 0..node_count {
595                for target in (source + 1)..node_count {
596                    if bool::arbitrary(g) {
597                        let source_node = generated_node_id(source);
598                        let target_node = generated_node_id(target);
599                        let source_port = generated_output_port(target);
600                        let target_port = generated_input_port(source);
601
602                        outputs_by_node[source].insert(source_port.clone());
603                        inputs_by_node[target].insert(target_port.clone());
604                        edges.push(EdgeDefinition::new(
605                            EdgeEndpoint::new(source_node, source_port),
606                            EdgeEndpoint::new(target_node, target_port),
607                        ));
608                    }
609                }
610            }
611
612            let nodes: Vec<NodeDefinition> = (0..node_count)
613                .map(|index| {
614                    NodeDefinition::new(
615                        generated_node_id(index),
616                        inputs_by_node[index].iter().cloned().collect::<Vec<_>>(),
617                        outputs_by_node[index].iter().cloned().collect::<Vec<_>>(),
618                    )
619                    .expect("generated node topology is valid")
620                })
621                .collect();
622            let workflow =
623                WorkflowDefinition::from_parts(workflow_id("generated_flow"), nodes, edges)
624                    .expect("generated workflow is acyclic and valid");
625            let mut contracts = Vec::new();
626            let mut capabilities = Vec::new();
627
628            for node in workflow.nodes() {
629                let mut contract_ports = Vec::new();
630                let mut capability_ports = Vec::new();
631
632                for input in node.input_ports() {
633                    contract_ports.push(PortContract::new(
634                        input.clone(),
635                        PortDirection::Input,
636                        Some(schema("schema://generated-packet")),
637                    ));
638                    capability_ports.push(PortCapability::new(
639                        input.clone(),
640                        PortCapabilityDirection::Receive,
641                    ));
642                }
643
644                for output in node.output_ports() {
645                    contract_ports.push(PortContract::new(
646                        output.clone(),
647                        PortDirection::Output,
648                        Some(schema("schema://generated-packet")),
649                    ));
650                    capability_ports.push(PortCapability::new(
651                        output.clone(),
652                        PortCapabilityDirection::Emit,
653                    ));
654                }
655
656                contracts.push(
657                    NodeContract::new(
658                        node.id().clone(),
659                        contract_ports,
660                        ExecutionMode::Native,
661                        Determinism::Deterministic,
662                        RetryDisposition::Safe,
663                    )
664                    .expect("generated contract matches workflow ports"),
665                );
666                capabilities.push(
667                    NodeCapabilities::native_passive(node.id().clone(), capability_ports)
668                        .expect("generated capabilities match workflow ports"),
669                );
670            }
671
672            Self {
673                workflow,
674                contracts,
675                capabilities,
676            }
677        }
678    }
679
680    fn generated_node_id(index: usize) -> NodeId {
681        node_id(&format!("node_{index}"))
682    }
683
684    fn generated_input_port(source_index: usize) -> PortId {
685        port_id(&format!("in_from_{source_index}"))
686    }
687
688    fn generated_output_port(target_index: usize) -> PortId {
689        port_id(&format!("out_to_{target_index}"))
690    }
691
692    #[test]
693    fn generated_non_empty_schema_refs_round_trip() {
694        fn property(input: NonEmptySchemaString) -> bool {
695            let schema = SchemaRef::new(input.0.clone()).expect("generated schema ref is valid");
696            schema.as_str() == input.0
697        }
698
699        QuickCheck::new()
700            .tests(128)
701            .quickcheck(property as fn(NonEmptySchemaString) -> bool);
702    }
703
704    #[test]
705    fn generated_matching_workflow_contracts_validate() {
706        fn property(case: MatchingWorkflowContractCase) -> bool {
707            validate_workflow_contracts(&case.workflow, &case.contracts, &case.capabilities).is_ok()
708        }
709
710        QuickCheck::new()
711            .tests(128)
712            .quickcheck(property as fn(MatchingWorkflowContractCase) -> bool);
713    }
714
715    #[test]
716    fn node_contract_rejects_duplicate_ports() {
717        let err = NodeContract::new(
718            node_id("worker"),
719            [
720                PortContract::new(port_id("in"), PortDirection::Input, None),
721                PortContract::new(port_id("in"), PortDirection::Output, None),
722            ],
723            ExecutionMode::Native,
724            Determinism::Deterministic,
725            RetryDisposition::Safe,
726        )
727        .expect_err("duplicate ports must fail");
728
729        assert_eq!(
730            err,
731            ContractValidationError::DuplicatePortContract {
732                node_id: node_id("worker"),
733                port_id: port_id("in")
734            }
735        );
736    }
737
738    #[test]
739    fn schema_ref_rejects_empty_values() {
740        let err = SchemaRef::new("   ").expect_err("empty schema ref must fail");
741
742        assert_eq!(err, ContractValidationError::EmptySchemaRef);
743    }
744
745    #[test]
746    fn validate_workflow_contracts_accepts_matching_contracts() {
747        let workflow = WorkflowBuilder::new("flow")
748            .node(NodeBuilder::new("source").output("out").build())
749            .node(NodeBuilder::new("sink").input("in").build())
750            .edge("source", "out", "sink", "in")
751            .build();
752        let contracts = vec![
753            contract(
754                "source",
755                vec![PortContract::new(
756                    port_id("out"),
757                    PortDirection::Output,
758                    Some(schema("schema://packet")),
759                )],
760                ExecutionMode::Native,
761            ),
762            contract(
763                "sink",
764                vec![PortContract::new(
765                    port_id("in"),
766                    PortDirection::Input,
767                    Some(schema("schema://packet")),
768                )],
769                ExecutionMode::Native,
770            ),
771        ];
772        let capabilities = vec![
773            capabilities(
774                "source",
775                vec![PortCapability::new(
776                    port_id("out"),
777                    PortCapabilityDirection::Emit,
778                )],
779            ),
780            capabilities(
781                "sink",
782                vec![PortCapability::new(
783                    port_id("in"),
784                    PortCapabilityDirection::Receive,
785                )],
786            ),
787        ];
788
789        validate_workflow_contracts(&workflow, &contracts, &capabilities)
790            .expect("matching contracts should validate");
791    }
792
793    #[test]
794    fn validate_workflow_contracts_rejects_schema_mismatch() {
795        let workflow = WorkflowBuilder::new("flow")
796            .node(NodeBuilder::new("source").output("out").build())
797            .node(NodeBuilder::new("sink").input("in").build())
798            .edge("source", "out", "sink", "in")
799            .build();
800        let contracts = vec![
801            contract(
802                "source",
803                vec![PortContract::new(
804                    port_id("out"),
805                    PortDirection::Output,
806                    Some(schema("schema://packet-a")),
807                )],
808                ExecutionMode::Native,
809            ),
810            contract(
811                "sink",
812                vec![PortContract::new(
813                    port_id("in"),
814                    PortDirection::Input,
815                    Some(schema("schema://packet-b")),
816                )],
817                ExecutionMode::Native,
818            ),
819        ];
820        let capabilities = vec![
821            capabilities(
822                "source",
823                vec![PortCapability::new(
824                    port_id("out"),
825                    PortCapabilityDirection::Emit,
826                )],
827            ),
828            capabilities(
829                "sink",
830                vec![PortCapability::new(
831                    port_id("in"),
832                    PortCapabilityDirection::Receive,
833                )],
834            ),
835        ];
836
837        let err = validate_workflow_contracts(&workflow, &contracts, &capabilities)
838            .expect_err("schema mismatch must fail");
839
840        assert!(matches!(
841            err,
842            ContractValidationError::SchemaMismatch { .. }
843        ));
844    }
845
846    #[test]
847    fn validate_workflow_contracts_rejects_missing_capability_descriptor() {
848        let workflow = WorkflowBuilder::new("flow")
849            .node(NodeBuilder::new("source").output("out").build())
850            .build();
851        let contracts = vec![contract(
852            "source",
853            vec![PortContract::new(
854                port_id("out"),
855                PortDirection::Output,
856                Some(schema("schema://packet")),
857            )],
858            ExecutionMode::Native,
859        )];
860
861        let err = validate_workflow_contracts(&workflow, &contracts, &[])
862            .expect_err("missing capability descriptor must fail");
863
864        assert_eq!(
865            err,
866            ContractValidationError::MissingCapabilityDescriptor {
867                node_id: node_id("source")
868            }
869        );
870    }
871
872    #[test]
873    fn validate_workflow_contracts_accepts_native_advisory_effects() {
874        let workflow = WorkflowBuilder::new("flow")
875            .node(NodeBuilder::new("source").output("out").build())
876            .build();
877        let contracts = vec![contract(
878            "source",
879            vec![PortContract::new(
880                port_id("out"),
881                PortDirection::Output,
882                Some(schema("schema://packet")),
883            )],
884            ExecutionMode::Native,
885        )];
886        let capabilities = vec![capabilities(
887            "source",
888            vec![PortCapability::new(
889                port_id("out"),
890                PortCapabilityDirection::Emit,
891            )],
892        )];
893
894        validate_workflow_contracts(&workflow, &contracts, &capabilities)
895            .expect("native effects are advisory");
896    }
897
898    #[test]
899    fn validate_workflow_contracts_accepts_wasm_without_host_effects() {
900        let workflow = WorkflowBuilder::new("flow")
901            .node(NodeBuilder::new("wasm").input("in").output("out").build())
902            .build();
903        let contracts = vec![contract(
904            "wasm",
905            vec![
906                PortContract::new(
907                    port_id("in"),
908                    PortDirection::Input,
909                    Some(schema("schema://p")),
910                ),
911                PortContract::new(
912                    port_id("out"),
913                    PortDirection::Output,
914                    Some(schema("schema://p")),
915                ),
916            ],
917            ExecutionMode::Wasm,
918        )];
919        let capabilities = vec![passive_capabilities(
920            "wasm",
921            vec![
922                PortCapability::new(port_id("in"), PortCapabilityDirection::Receive),
923                PortCapability::new(port_id("out"), PortCapabilityDirection::Emit),
924            ],
925        )];
926
927        validate_workflow_contracts(&workflow, &contracts, &capabilities)
928            .expect("import-free WASM contract should validate");
929    }
930
931    #[test]
932    fn validate_workflow_contracts_rejects_wasm_effects_without_imports() {
933        let workflow = WorkflowBuilder::new("flow")
934            .node(NodeBuilder::new("wasm").input("in").build())
935            .build();
936        let contracts = vec![contract(
937            "wasm",
938            vec![PortContract::new(
939                port_id("in"),
940                PortDirection::Input,
941                Some(schema("schema://packet")),
942            )],
943            ExecutionMode::Wasm,
944        )];
945        let capabilities = vec![capabilities(
946            "wasm",
947            vec![PortCapability::new(
948                port_id("in"),
949                PortCapabilityDirection::Receive,
950            )],
951        )];
952
953        let err = validate_workflow_contracts(&workflow, &contracts, &capabilities)
954            .expect_err("WASM effect without host import must fail");
955
956        assert_eq!(
957            err,
958            ContractValidationError::Capability {
959                error: CapabilityValidationError::UnenforceableEffectCapability {
960                    node_id: node_id("wasm"),
961                    effect: EffectCapability::Clock,
962                }
963            }
964        );
965    }
966
967    #[test]
968    fn validate_workflow_contracts_rejects_process_effects_without_adapter() {
969        let workflow = WorkflowBuilder::new("flow")
970            .node(NodeBuilder::new("worker").input("in").build())
971            .build();
972        let contracts = vec![contract(
973            "worker",
974            vec![PortContract::new(
975                port_id("in"),
976                PortDirection::Input,
977                Some(schema("schema://packet")),
978            )],
979            ExecutionMode::Process,
980        )];
981        let capabilities = vec![capabilities(
982            "worker",
983            vec![PortCapability::new(
984                port_id("in"),
985                PortCapabilityDirection::Receive,
986            )],
987        )];
988
989        let err = validate_workflow_contracts(&workflow, &contracts, &capabilities)
990            .expect_err("process effect without adapter must fail");
991
992        assert!(matches!(
993            err,
994            ContractValidationError::Capability {
995                error: CapabilityValidationError::UnenforceableEffectCapability { .. }
996            }
997        ));
998    }
999}