1use std::collections::BTreeMap;
18use std::error::Error;
19use std::fmt;
20
21use pureflow_core::{
22 RetryDisposition,
23 capability::{
24 CapabilityValidationError, EffectCapability, NodeCapabilities,
25 validate_workflow_capabilities,
26 },
27};
28use pureflow_types::{NodeId, PortId};
29use pureflow_workflow::{PortDirection, WorkflowDefinition};
30
31#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
33pub struct SchemaRef(String);
34
35impl SchemaRef {
36 pub fn new(value: impl Into<String>) -> Result<Self, ContractValidationError> {
42 let value: String = value.into();
43 if value.trim().is_empty() {
44 return Err(ContractValidationError::EmptySchemaRef);
45 }
46
47 Ok(Self(value))
48 }
49
50 #[must_use]
52 pub fn as_str(&self) -> &str {
53 &self.0
54 }
55}
56
57impl fmt::Display for SchemaRef {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.write_str(&self.0)
60 }
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum ExecutionMode {
66 Native,
68 Wasm,
70 Process,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum Determinism {
77 Deterministic,
79 NonDeterministic,
81 Unknown,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct PortContract {
88 port_id: PortId,
89 direction: PortDirection,
90 schema: Option<SchemaRef>,
91}
92
93impl PortContract {
94 #[must_use]
96 pub const fn new(port_id: PortId, direction: PortDirection, schema: Option<SchemaRef>) -> Self {
97 Self {
98 port_id,
99 direction,
100 schema,
101 }
102 }
103
104 #[must_use]
106 pub const fn port_id(&self) -> &PortId {
107 &self.port_id
108 }
109
110 #[must_use]
112 pub const fn direction(&self) -> PortDirection {
113 self.direction
114 }
115
116 #[must_use]
118 pub const fn schema(&self) -> Option<&SchemaRef> {
119 self.schema.as_ref()
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq)]
125pub enum ContractValidationError {
126 EmptySchemaRef,
128 DuplicatePortContract {
130 node_id: NodeId,
132 port_id: PortId,
134 },
135 UnknownWorkflowNode {
137 node_id: NodeId,
139 },
140 UnknownWorkflowPort {
142 node_id: NodeId,
144 port_id: PortId,
146 direction: PortDirection,
148 },
149 PortDirectionMismatch {
151 node_id: NodeId,
153 port_id: PortId,
155 workflow: PortDirection,
157 contract: PortDirection,
159 },
160 MissingCapabilityDescriptor {
162 node_id: NodeId,
164 },
165 SchemaMismatch {
167 edge_index: usize,
169 source_node_id: NodeId,
171 source_port_id: PortId,
173 target_node_id: NodeId,
175 target_port_id: PortId,
177 },
178 Capability {
180 error: CapabilityValidationError,
182 },
183}
184
185impl fmt::Display for ContractValidationError {
186 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187 match self {
188 Self::EmptySchemaRef => write!(f, "schema reference cannot be empty"),
189 Self::DuplicatePortContract { node_id, port_id } => {
190 write!(
191 f,
192 "node `{node_id}` declares duplicate contract port `{port_id}`"
193 )
194 }
195 Self::UnknownWorkflowNode { node_id } => {
196 write!(f, "contract references unknown workflow node `{node_id}`")
197 }
198 Self::UnknownWorkflowPort {
199 node_id,
200 port_id,
201 direction,
202 } => write!(
203 f,
204 "node `{node_id}` contract references unknown {} workflow port `{port_id}`",
205 port_direction_label(*direction)
206 ),
207 Self::PortDirectionMismatch {
208 node_id,
209 port_id,
210 workflow,
211 contract,
212 } => write!(
213 f,
214 "node `{node_id}` contract port `{port_id}` is {} but workflow declares {}",
215 port_direction_label(*contract),
216 port_direction_label(*workflow)
217 ),
218 Self::MissingCapabilityDescriptor { node_id } => {
219 write!(
220 f,
221 "no capability descriptor supplied for workflow node `{node_id}`"
222 )
223 }
224 Self::SchemaMismatch {
225 edge_index,
226 source_node_id,
227 source_port_id,
228 target_node_id,
229 target_port_id,
230 } => write!(
231 f,
232 "edge {edge_index} from `{source_node_id}:{source_port_id}` to `{target_node_id}:{target_port_id}` has incompatible schemas"
233 ),
234 Self::Capability { error } => write!(f, "capability validation failed: {error}"),
235 }
236 }
237}
238
239impl Error for ContractValidationError {}
240
241impl From<CapabilityValidationError> for ContractValidationError {
242 fn from(error: CapabilityValidationError) -> Self {
243 Self::Capability { error }
244 }
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
249pub struct NodeContract {
250 id: NodeId,
251 ports: Vec<PortContract>,
252 execution_mode: ExecutionMode,
253 determinism: Determinism,
254 retry: RetryDisposition,
255}
256
257impl NodeContract {
258 pub fn new(
264 id: NodeId,
265 ports: impl Into<Vec<PortContract>>,
266 execution_mode: ExecutionMode,
267 determinism: Determinism,
268 retry: RetryDisposition,
269 ) -> Result<Self, ContractValidationError> {
270 let ports: Vec<PortContract> = ports.into();
271 reject_duplicate_ports(&id, &ports)?;
272
273 Ok(Self {
274 id,
275 ports,
276 execution_mode,
277 determinism,
278 retry,
279 })
280 }
281
282 #[must_use]
284 pub const fn id(&self) -> &NodeId {
285 &self.id
286 }
287
288 #[must_use]
290 pub fn ports(&self) -> &[PortContract] {
291 &self.ports
292 }
293
294 #[must_use]
296 pub const fn execution_mode(&self) -> ExecutionMode {
297 self.execution_mode
298 }
299
300 #[must_use]
302 pub const fn determinism(&self) -> Determinism {
303 self.determinism
304 }
305
306 #[must_use]
308 pub const fn retry(&self) -> RetryDisposition {
309 self.retry
310 }
311
312 fn port_map(&self) -> BTreeMap<&PortId, &PortContract> {
313 self.ports
314 .iter()
315 .map(|port: &PortContract| (port.port_id(), port))
316 .collect()
317 }
318}
319
320pub fn validate_workflow_contracts(
328 workflow: &WorkflowDefinition,
329 contracts: &[NodeContract],
330 capabilities: &[NodeCapabilities],
331) -> Result<(), ContractValidationError> {
332 validate_workflow_capabilities(workflow, capabilities)?;
333
334 let contract_map: BTreeMap<&NodeId, &NodeContract> = contracts
335 .iter()
336 .map(|contract: &NodeContract| (contract.id(), contract))
337 .collect();
338 let capability_map: BTreeMap<&NodeId, &NodeCapabilities> = capabilities
339 .iter()
340 .map(|capability: &NodeCapabilities| (capability.node_id(), capability))
341 .collect();
342
343 for node in workflow.nodes() {
344 let contract: &NodeContract = contract_map.get(node.id()).copied().ok_or_else(|| {
345 ContractValidationError::UnknownWorkflowNode {
346 node_id: node.id().clone(),
347 }
348 })?;
349 let capability: &NodeCapabilities =
350 capability_map.get(node.id()).copied().ok_or_else(|| {
351 ContractValidationError::MissingCapabilityDescriptor {
352 node_id: node.id().clone(),
353 }
354 })?;
355
356 validate_node_contract(node, contract)?;
357 validate_enforceable_capabilities(contract, capability)?;
358 }
359
360 for contract in contracts {
361 if workflow
362 .nodes()
363 .iter()
364 .all(|node: &pureflow_workflow::NodeDefinition| node.id() != contract.id())
365 {
366 return Err(ContractValidationError::UnknownWorkflowNode {
367 node_id: contract.id().clone(),
368 });
369 }
370 }
371
372 validate_edge_schema_compatibility(workflow, &contract_map)
373}
374
375fn validate_enforceable_capabilities(
376 contract: &NodeContract,
377 capability: &NodeCapabilities,
378) -> Result<(), ContractValidationError> {
379 match contract.execution_mode() {
380 ExecutionMode::Native => Ok(()),
381 ExecutionMode::Wasm | ExecutionMode::Process => {
382 for effect in capability.effects() {
383 if !strict_boundary_supports_effect(contract.execution_mode(), *effect) {
384 return Err(CapabilityValidationError::UnenforceableEffectCapability {
385 node_id: capability.node_id().clone(),
386 effect: *effect,
387 }
388 .into());
389 }
390 }
391
392 Ok(())
393 }
394 }
395}
396
397const fn strict_boundary_supports_effect(
398 _execution_mode: ExecutionMode,
399 _effect: EffectCapability,
400) -> bool {
401 false
404}
405
406fn validate_node_contract(
407 node: &pureflow_workflow::NodeDefinition,
408 contract: &NodeContract,
409) -> Result<(), ContractValidationError> {
410 let workflow_ports: BTreeMap<&PortId, PortDirection> = node
411 .input_ports()
412 .iter()
413 .map(|port_id: &PortId| (port_id, PortDirection::Input))
414 .chain(
415 node.output_ports()
416 .iter()
417 .map(|port_id: &PortId| (port_id, PortDirection::Output)),
418 )
419 .collect();
420 let contract_ports: BTreeMap<&PortId, &PortContract> = contract.port_map();
421
422 for (port_id, workflow_direction) in &workflow_ports {
423 let contract_port: &&PortContract = contract_ports.get(port_id).ok_or_else(|| {
424 ContractValidationError::UnknownWorkflowPort {
425 node_id: node.id().clone(),
426 port_id: (*port_id).clone(),
427 direction: *workflow_direction,
428 }
429 })?;
430
431 if contract_port.direction() != *workflow_direction {
432 return Err(ContractValidationError::PortDirectionMismatch {
433 node_id: node.id().clone(),
434 port_id: (*port_id).clone(),
435 workflow: *workflow_direction,
436 contract: contract_port.direction(),
437 });
438 }
439 }
440
441 for (port_id, contract_port) in &contract_ports {
442 if !workflow_ports.contains_key(port_id) {
443 return Err(ContractValidationError::UnknownWorkflowPort {
444 node_id: node.id().clone(),
445 port_id: (*port_id).clone(),
446 direction: contract_port.direction(),
447 });
448 }
449 }
450
451 Ok(())
452}
453
454fn validate_edge_schema_compatibility(
455 workflow: &WorkflowDefinition,
456 contracts: &BTreeMap<&NodeId, &NodeContract>,
457) -> Result<(), ContractValidationError> {
458 for (edge_index, edge) in workflow.edges().iter().enumerate() {
460 let source_contract: &&NodeContract =
461 contracts.get(edge.source().node_id()).ok_or_else(|| {
462 ContractValidationError::UnknownWorkflowNode {
463 node_id: edge.source().node_id().clone(),
464 }
465 })?;
466 let target_contract: &&NodeContract =
467 contracts.get(edge.target().node_id()).ok_or_else(|| {
468 ContractValidationError::UnknownWorkflowNode {
469 node_id: edge.target().node_id().clone(),
470 }
471 })?;
472
473 let source_schema: Option<&SchemaRef> = source_contract
474 .ports()
475 .iter()
476 .find(|port: &&PortContract| port.port_id() == edge.source().port_id())
477 .and_then(PortContract::schema);
478 let target_schema: Option<&SchemaRef> = target_contract
479 .ports()
480 .iter()
481 .find(|port: &&PortContract| port.port_id() == edge.target().port_id())
482 .and_then(PortContract::schema);
483
484 if let (Some(source_schema), Some(target_schema)) = (source_schema, target_schema)
485 && source_schema != target_schema
486 {
487 return Err(ContractValidationError::SchemaMismatch {
488 edge_index,
489 source_node_id: edge.source().node_id().clone(),
490 source_port_id: edge.source().port_id().clone(),
491 target_node_id: edge.target().node_id().clone(),
492 target_port_id: edge.target().port_id().clone(),
493 });
494 }
495 }
496
497 Ok(())
498}
499
500fn reject_duplicate_ports(
501 node_id: &NodeId,
502 ports: &[PortContract],
503) -> Result<(), ContractValidationError> {
504 let mut seen: BTreeMap<&PortId, PortDirection> = BTreeMap::new();
505
506 for port in ports {
507 if seen.insert(port.port_id(), port.direction()).is_some() {
508 return Err(ContractValidationError::DuplicatePortContract {
509 node_id: node_id.clone(),
510 port_id: port.port_id().clone(),
511 });
512 }
513 }
514
515 Ok(())
516}
517
518const fn port_direction_label(direction: PortDirection) -> &'static str {
519 match direction {
520 PortDirection::Input => "input",
521 PortDirection::Output => "output",
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use std::collections::BTreeSet;
529
530 use pureflow_core::{
531 RetryDisposition,
532 capability::{EffectCapability, NodeCapabilities, PortCapability, PortCapabilityDirection},
533 };
534 use pureflow_test_kit::{NodeBuilder, WorkflowBuilder, node_id, port_id, workflow_id};
535 use pureflow_workflow::{EdgeDefinition, EdgeEndpoint, NodeDefinition};
536 use quickcheck::{Arbitrary, Gen, QuickCheck};
537
538 fn schema(value: &str) -> SchemaRef {
539 SchemaRef::new(value).expect("valid schema ref")
540 }
541
542 fn contract(
543 node: &str,
544 ports: Vec<PortContract>,
545 execution_mode: ExecutionMode,
546 ) -> NodeContract {
547 NodeContract::new(
548 node_id(node),
549 ports,
550 execution_mode,
551 Determinism::Deterministic,
552 RetryDisposition::Safe,
553 )
554 .expect("valid contract")
555 }
556
557 fn capabilities(node: &str, ports: Vec<PortCapability>) -> NodeCapabilities {
558 NodeCapabilities::new(node_id(node), ports, [EffectCapability::Clock])
559 .expect("valid capability")
560 }
561
562 fn passive_capabilities(node: &str, ports: Vec<PortCapability>) -> NodeCapabilities {
563 NodeCapabilities::native_passive(node_id(node), ports).expect("valid capability")
564 }
565
566 #[derive(Debug, Clone)]
567 struct NonEmptySchemaString(String);
568
569 impl Arbitrary for NonEmptySchemaString {
570 fn arbitrary(g: &mut Gen) -> Self {
571 let value = String::arbitrary(g);
572 if value.trim().is_empty() {
573 Self("schema://generated".to_string())
574 } else {
575 Self(value)
576 }
577 }
578 }
579
580 #[derive(Debug, Clone)]
581 struct MatchingWorkflowContractCase {
582 workflow: WorkflowDefinition,
583 contracts: Vec<NodeContract>,
584 capabilities: Vec<NodeCapabilities>,
585 }
586
587 impl Arbitrary for MatchingWorkflowContractCase {
588 fn arbitrary(g: &mut Gen) -> Self {
589 let node_count = usize::arbitrary(g) % 5 + 1;
590 let mut inputs_by_node: Vec<BTreeSet<PortId>> = vec![BTreeSet::new(); node_count];
591 let mut outputs_by_node: Vec<BTreeSet<PortId>> = vec![BTreeSet::new(); node_count];
592 let mut edges = Vec::new();
593
594 for source in 0..node_count {
595 for target in (source + 1)..node_count {
596 if bool::arbitrary(g) {
597 let source_node = generated_node_id(source);
598 let target_node = generated_node_id(target);
599 let source_port = generated_output_port(target);
600 let target_port = generated_input_port(source);
601
602 outputs_by_node[source].insert(source_port.clone());
603 inputs_by_node[target].insert(target_port.clone());
604 edges.push(EdgeDefinition::new(
605 EdgeEndpoint::new(source_node, source_port),
606 EdgeEndpoint::new(target_node, target_port),
607 ));
608 }
609 }
610 }
611
612 let nodes: Vec<NodeDefinition> = (0..node_count)
613 .map(|index| {
614 NodeDefinition::new(
615 generated_node_id(index),
616 inputs_by_node[index].iter().cloned().collect::<Vec<_>>(),
617 outputs_by_node[index].iter().cloned().collect::<Vec<_>>(),
618 )
619 .expect("generated node topology is valid")
620 })
621 .collect();
622 let workflow =
623 WorkflowDefinition::from_parts(workflow_id("generated_flow"), nodes, edges)
624 .expect("generated workflow is acyclic and valid");
625 let mut contracts = Vec::new();
626 let mut capabilities = Vec::new();
627
628 for node in workflow.nodes() {
629 let mut contract_ports = Vec::new();
630 let mut capability_ports = Vec::new();
631
632 for input in node.input_ports() {
633 contract_ports.push(PortContract::new(
634 input.clone(),
635 PortDirection::Input,
636 Some(schema("schema://generated-packet")),
637 ));
638 capability_ports.push(PortCapability::new(
639 input.clone(),
640 PortCapabilityDirection::Receive,
641 ));
642 }
643
644 for output in node.output_ports() {
645 contract_ports.push(PortContract::new(
646 output.clone(),
647 PortDirection::Output,
648 Some(schema("schema://generated-packet")),
649 ));
650 capability_ports.push(PortCapability::new(
651 output.clone(),
652 PortCapabilityDirection::Emit,
653 ));
654 }
655
656 contracts.push(
657 NodeContract::new(
658 node.id().clone(),
659 contract_ports,
660 ExecutionMode::Native,
661 Determinism::Deterministic,
662 RetryDisposition::Safe,
663 )
664 .expect("generated contract matches workflow ports"),
665 );
666 capabilities.push(
667 NodeCapabilities::native_passive(node.id().clone(), capability_ports)
668 .expect("generated capabilities match workflow ports"),
669 );
670 }
671
672 Self {
673 workflow,
674 contracts,
675 capabilities,
676 }
677 }
678 }
679
680 fn generated_node_id(index: usize) -> NodeId {
681 node_id(&format!("node_{index}"))
682 }
683
684 fn generated_input_port(source_index: usize) -> PortId {
685 port_id(&format!("in_from_{source_index}"))
686 }
687
688 fn generated_output_port(target_index: usize) -> PortId {
689 port_id(&format!("out_to_{target_index}"))
690 }
691
692 #[test]
693 fn generated_non_empty_schema_refs_round_trip() {
694 fn property(input: NonEmptySchemaString) -> bool {
695 let schema = SchemaRef::new(input.0.clone()).expect("generated schema ref is valid");
696 schema.as_str() == input.0
697 }
698
699 QuickCheck::new()
700 .tests(128)
701 .quickcheck(property as fn(NonEmptySchemaString) -> bool);
702 }
703
704 #[test]
705 fn generated_matching_workflow_contracts_validate() {
706 fn property(case: MatchingWorkflowContractCase) -> bool {
707 validate_workflow_contracts(&case.workflow, &case.contracts, &case.capabilities).is_ok()
708 }
709
710 QuickCheck::new()
711 .tests(128)
712 .quickcheck(property as fn(MatchingWorkflowContractCase) -> bool);
713 }
714
715 #[test]
716 fn node_contract_rejects_duplicate_ports() {
717 let err = NodeContract::new(
718 node_id("worker"),
719 [
720 PortContract::new(port_id("in"), PortDirection::Input, None),
721 PortContract::new(port_id("in"), PortDirection::Output, None),
722 ],
723 ExecutionMode::Native,
724 Determinism::Deterministic,
725 RetryDisposition::Safe,
726 )
727 .expect_err("duplicate ports must fail");
728
729 assert_eq!(
730 err,
731 ContractValidationError::DuplicatePortContract {
732 node_id: node_id("worker"),
733 port_id: port_id("in")
734 }
735 );
736 }
737
738 #[test]
739 fn schema_ref_rejects_empty_values() {
740 let err = SchemaRef::new(" ").expect_err("empty schema ref must fail");
741
742 assert_eq!(err, ContractValidationError::EmptySchemaRef);
743 }
744
745 #[test]
746 fn validate_workflow_contracts_accepts_matching_contracts() {
747 let workflow = WorkflowBuilder::new("flow")
748 .node(NodeBuilder::new("source").output("out").build())
749 .node(NodeBuilder::new("sink").input("in").build())
750 .edge("source", "out", "sink", "in")
751 .build();
752 let contracts = vec![
753 contract(
754 "source",
755 vec![PortContract::new(
756 port_id("out"),
757 PortDirection::Output,
758 Some(schema("schema://packet")),
759 )],
760 ExecutionMode::Native,
761 ),
762 contract(
763 "sink",
764 vec![PortContract::new(
765 port_id("in"),
766 PortDirection::Input,
767 Some(schema("schema://packet")),
768 )],
769 ExecutionMode::Native,
770 ),
771 ];
772 let capabilities = vec![
773 capabilities(
774 "source",
775 vec![PortCapability::new(
776 port_id("out"),
777 PortCapabilityDirection::Emit,
778 )],
779 ),
780 capabilities(
781 "sink",
782 vec![PortCapability::new(
783 port_id("in"),
784 PortCapabilityDirection::Receive,
785 )],
786 ),
787 ];
788
789 validate_workflow_contracts(&workflow, &contracts, &capabilities)
790 .expect("matching contracts should validate");
791 }
792
793 #[test]
794 fn validate_workflow_contracts_rejects_schema_mismatch() {
795 let workflow = WorkflowBuilder::new("flow")
796 .node(NodeBuilder::new("source").output("out").build())
797 .node(NodeBuilder::new("sink").input("in").build())
798 .edge("source", "out", "sink", "in")
799 .build();
800 let contracts = vec![
801 contract(
802 "source",
803 vec![PortContract::new(
804 port_id("out"),
805 PortDirection::Output,
806 Some(schema("schema://packet-a")),
807 )],
808 ExecutionMode::Native,
809 ),
810 contract(
811 "sink",
812 vec![PortContract::new(
813 port_id("in"),
814 PortDirection::Input,
815 Some(schema("schema://packet-b")),
816 )],
817 ExecutionMode::Native,
818 ),
819 ];
820 let capabilities = vec![
821 capabilities(
822 "source",
823 vec![PortCapability::new(
824 port_id("out"),
825 PortCapabilityDirection::Emit,
826 )],
827 ),
828 capabilities(
829 "sink",
830 vec![PortCapability::new(
831 port_id("in"),
832 PortCapabilityDirection::Receive,
833 )],
834 ),
835 ];
836
837 let err = validate_workflow_contracts(&workflow, &contracts, &capabilities)
838 .expect_err("schema mismatch must fail");
839
840 assert!(matches!(
841 err,
842 ContractValidationError::SchemaMismatch { .. }
843 ));
844 }
845
846 #[test]
847 fn validate_workflow_contracts_rejects_missing_capability_descriptor() {
848 let workflow = WorkflowBuilder::new("flow")
849 .node(NodeBuilder::new("source").output("out").build())
850 .build();
851 let contracts = vec![contract(
852 "source",
853 vec![PortContract::new(
854 port_id("out"),
855 PortDirection::Output,
856 Some(schema("schema://packet")),
857 )],
858 ExecutionMode::Native,
859 )];
860
861 let err = validate_workflow_contracts(&workflow, &contracts, &[])
862 .expect_err("missing capability descriptor must fail");
863
864 assert_eq!(
865 err,
866 ContractValidationError::MissingCapabilityDescriptor {
867 node_id: node_id("source")
868 }
869 );
870 }
871
872 #[test]
873 fn validate_workflow_contracts_accepts_native_advisory_effects() {
874 let workflow = WorkflowBuilder::new("flow")
875 .node(NodeBuilder::new("source").output("out").build())
876 .build();
877 let contracts = vec![contract(
878 "source",
879 vec![PortContract::new(
880 port_id("out"),
881 PortDirection::Output,
882 Some(schema("schema://packet")),
883 )],
884 ExecutionMode::Native,
885 )];
886 let capabilities = vec![capabilities(
887 "source",
888 vec![PortCapability::new(
889 port_id("out"),
890 PortCapabilityDirection::Emit,
891 )],
892 )];
893
894 validate_workflow_contracts(&workflow, &contracts, &capabilities)
895 .expect("native effects are advisory");
896 }
897
898 #[test]
899 fn validate_workflow_contracts_accepts_wasm_without_host_effects() {
900 let workflow = WorkflowBuilder::new("flow")
901 .node(NodeBuilder::new("wasm").input("in").output("out").build())
902 .build();
903 let contracts = vec![contract(
904 "wasm",
905 vec![
906 PortContract::new(
907 port_id("in"),
908 PortDirection::Input,
909 Some(schema("schema://p")),
910 ),
911 PortContract::new(
912 port_id("out"),
913 PortDirection::Output,
914 Some(schema("schema://p")),
915 ),
916 ],
917 ExecutionMode::Wasm,
918 )];
919 let capabilities = vec![passive_capabilities(
920 "wasm",
921 vec![
922 PortCapability::new(port_id("in"), PortCapabilityDirection::Receive),
923 PortCapability::new(port_id("out"), PortCapabilityDirection::Emit),
924 ],
925 )];
926
927 validate_workflow_contracts(&workflow, &contracts, &capabilities)
928 .expect("import-free WASM contract should validate");
929 }
930
931 #[test]
932 fn validate_workflow_contracts_rejects_wasm_effects_without_imports() {
933 let workflow = WorkflowBuilder::new("flow")
934 .node(NodeBuilder::new("wasm").input("in").build())
935 .build();
936 let contracts = vec![contract(
937 "wasm",
938 vec![PortContract::new(
939 port_id("in"),
940 PortDirection::Input,
941 Some(schema("schema://packet")),
942 )],
943 ExecutionMode::Wasm,
944 )];
945 let capabilities = vec![capabilities(
946 "wasm",
947 vec![PortCapability::new(
948 port_id("in"),
949 PortCapabilityDirection::Receive,
950 )],
951 )];
952
953 let err = validate_workflow_contracts(&workflow, &contracts, &capabilities)
954 .expect_err("WASM effect without host import must fail");
955
956 assert_eq!(
957 err,
958 ContractValidationError::Capability {
959 error: CapabilityValidationError::UnenforceableEffectCapability {
960 node_id: node_id("wasm"),
961 effect: EffectCapability::Clock,
962 }
963 }
964 );
965 }
966
967 #[test]
968 fn validate_workflow_contracts_rejects_process_effects_without_adapter() {
969 let workflow = WorkflowBuilder::new("flow")
970 .node(NodeBuilder::new("worker").input("in").build())
971 .build();
972 let contracts = vec![contract(
973 "worker",
974 vec![PortContract::new(
975 port_id("in"),
976 PortDirection::Input,
977 Some(schema("schema://packet")),
978 )],
979 ExecutionMode::Process,
980 )];
981 let capabilities = vec![capabilities(
982 "worker",
983 vec![PortCapability::new(
984 port_id("in"),
985 PortCapabilityDirection::Receive,
986 )],
987 )];
988
989 let err = validate_workflow_contracts(&workflow, &contracts, &capabilities)
990 .expect_err("process effect without adapter must fail");
991
992 assert!(matches!(
993 err,
994 ContractValidationError::Capability {
995 error: CapabilityValidationError::UnenforceableEffectCapability { .. }
996 }
997 ));
998 }
999}