1use std::collections::BTreeSet;
40use std::error::Error;
41use std::fmt;
42
43use pureflow_types::{NodeId, PortId};
44use pureflow_workflow::{NodeDefinition, WorkflowDefinition};
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PortCapabilityDirection {
49 Receive,
51 Emit,
53}
54
55impl PortCapabilityDirection {
56 const fn label(self) -> &'static str {
57 match self {
58 Self::Receive => "receive",
59 Self::Emit => "emit",
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
66pub enum EffectCapability {
67 FileSystemRead,
69 FileSystemWrite,
71 NetworkOutbound,
73 ExternalEffect,
75 ProcessSpawn,
77 EnvironmentRead,
79 EnvironmentWrite,
81 Clock,
83}
84
85impl EffectCapability {
86 #[must_use]
88 pub const fn as_str(self) -> &'static str {
89 match self {
90 Self::FileSystemRead => "filesystem_read",
91 Self::FileSystemWrite => "filesystem_write",
92 Self::NetworkOutbound => "network_outbound",
93 Self::ExternalEffect => "external_effect",
94 Self::ProcessSpawn => "process_spawn",
95 Self::EnvironmentRead => "environment_read",
96 Self::EnvironmentWrite => "environment_write",
97 Self::Clock => "clock",
98 }
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct PortCapability {
105 port_id: PortId,
106 direction: PortCapabilityDirection,
107}
108
109impl PortCapability {
110 #[must_use]
112 pub const fn new(port_id: PortId, direction: PortCapabilityDirection) -> Self {
113 Self { port_id, direction }
114 }
115
116 #[must_use]
118 pub const fn port_id(&self) -> &PortId {
119 &self.port_id
120 }
121
122 #[must_use]
124 pub const fn direction(&self) -> PortCapabilityDirection {
125 self.direction
126 }
127}
128
129#[derive(Debug, Clone, PartialEq, Eq)]
131pub enum CapabilityValidationError {
132 DuplicateEffect {
134 node_id: NodeId,
136 effect: EffectCapability,
138 },
139 DuplicatePortCapability {
141 node_id: NodeId,
143 port_id: PortId,
145 direction: PortCapabilityDirection,
147 },
148 ConflictingPortDirection {
150 node_id: NodeId,
152 port_id: PortId,
154 },
155 UnknownCapabilityNode {
157 node_id: NodeId,
159 },
160 UnknownCapabilityPort {
162 node_id: NodeId,
164 port_id: PortId,
166 },
167 CapabilityDirectionMismatch {
169 node_id: NodeId,
171 port_id: PortId,
173 claimed: PortCapabilityDirection,
175 declared: PortCapabilityDirection,
177 },
178 UnenforceableEffectCapability {
180 node_id: NodeId,
182 effect: EffectCapability,
184 },
185}
186
187impl fmt::Display for CapabilityValidationError {
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 match self {
190 Self::DuplicateEffect { node_id, effect } => {
191 write!(
192 f,
193 "node `{node_id}` declares duplicate effect capability `{effect:?}`"
194 )
195 }
196 Self::DuplicatePortCapability {
197 node_id,
198 port_id,
199 direction,
200 } => write!(
201 f,
202 "node `{node_id}` declares duplicate {} capability for port `{port_id}`",
203 direction.label()
204 ),
205 Self::ConflictingPortDirection { node_id, port_id } => write!(
206 f,
207 "node `{node_id}` declares port `{port_id}` for both receive and emit"
208 ),
209 Self::UnknownCapabilityNode { node_id } => write!(
210 f,
211 "capability descriptor references unknown workflow node `{node_id}`"
212 ),
213 Self::UnknownCapabilityPort { node_id, port_id } => write!(
214 f,
215 "node `{node_id}` capability references unknown workflow port `{port_id}`"
216 ),
217 Self::CapabilityDirectionMismatch {
218 node_id,
219 port_id,
220 claimed,
221 declared,
222 } => write!(
223 f,
224 "node `{node_id}` capability claims port `{port_id}` may {} but workflow declares {}",
225 claimed.label(),
226 declared.label()
227 ),
228 Self::UnenforceableEffectCapability { node_id, effect } => write!(
229 f,
230 "node `{node_id}` declares effect capability `{effect:?}` that is not enforceable by its execution boundary"
231 ),
232 }
233 }
234}
235
236impl Error for CapabilityValidationError {}
237
238#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct NodeCapabilities {
241 node_id: NodeId,
242 ports: Vec<PortCapability>,
243 effects: Vec<EffectCapability>,
244}
245
246impl NodeCapabilities {
247 pub fn new(
255 node_id: NodeId,
256 ports: impl Into<Vec<PortCapability>>,
257 effects: impl Into<Vec<EffectCapability>>,
258 ) -> Result<Self, CapabilityValidationError> {
259 let ports: Vec<PortCapability> = ports.into();
260 let effects: Vec<EffectCapability> = effects.into();
261 reject_duplicate_effects(&node_id, &effects)?;
262 reject_invalid_port_capabilities(&node_id, &ports)?;
263
264 Ok(Self {
265 node_id,
266 ports,
267 effects,
268 })
269 }
270
271 pub fn native_passive(
278 node_id: NodeId,
279 ports: impl Into<Vec<PortCapability>>,
280 ) -> Result<Self, CapabilityValidationError> {
281 Self::new(node_id, ports, Vec::<EffectCapability>::new())
282 }
283
284 #[must_use]
286 pub const fn node_id(&self) -> &NodeId {
287 &self.node_id
288 }
289
290 #[must_use]
292 pub fn ports(&self) -> &[PortCapability] {
293 &self.ports
294 }
295
296 #[must_use]
298 pub fn effects(&self) -> &[EffectCapability] {
299 &self.effects
300 }
301
302 #[must_use]
304 pub fn allows_effect(&self, effect: EffectCapability) -> bool {
305 self.effects.contains(&effect)
306 }
307
308 #[must_use]
310 pub fn allows_port(&self, port_id: &PortId, direction: PortCapabilityDirection) -> bool {
311 self.ports
312 .iter()
313 .any(|port: &PortCapability| port.port_id() == port_id && port.direction() == direction)
314 }
315}
316
317pub fn validate_workflow_capabilities(
325 workflow: &WorkflowDefinition,
326 capabilities: &[NodeCapabilities],
327) -> Result<(), CapabilityValidationError> {
328 for capability in capabilities {
329 let node: &NodeDefinition = workflow
330 .nodes()
331 .iter()
332 .find(|node: &&NodeDefinition| node.id() == capability.node_id())
333 .ok_or_else(|| CapabilityValidationError::UnknownCapabilityNode {
334 node_id: capability.node_id().clone(),
335 })?;
336
337 for port in capability.ports() {
338 let declared: PortCapabilityDirection = workflow_direction_for(node, port.port_id())
339 .ok_or_else(|| CapabilityValidationError::UnknownCapabilityPort {
340 node_id: capability.node_id().clone(),
341 port_id: port.port_id().clone(),
342 })?;
343
344 if port.direction() != declared {
345 return Err(CapabilityValidationError::CapabilityDirectionMismatch {
346 node_id: capability.node_id().clone(),
347 port_id: port.port_id().clone(),
348 claimed: port.direction(),
349 declared,
350 });
351 }
352 }
353 }
354
355 Ok(())
356}
357
358fn workflow_direction_for(
359 node: &NodeDefinition,
360 port_id: &PortId,
361) -> Option<PortCapabilityDirection> {
362 if node.input_ports().contains(port_id) {
363 Some(PortCapabilityDirection::Receive)
364 } else if node.output_ports().contains(port_id) {
365 Some(PortCapabilityDirection::Emit)
366 } else {
367 None
368 }
369}
370
371fn reject_duplicate_effects(
372 node_id: &NodeId,
373 effects: &[EffectCapability],
374) -> Result<(), CapabilityValidationError> {
375 let mut seen: BTreeSet<EffectCapability> = BTreeSet::new();
376
377 for effect in effects {
378 if !seen.insert(*effect) {
379 return Err(CapabilityValidationError::DuplicateEffect {
380 node_id: node_id.clone(),
381 effect: *effect,
382 });
383 }
384 }
385
386 Ok(())
387}
388
389fn reject_invalid_port_capabilities(
390 node_id: &NodeId,
391 ports: &[PortCapability],
392) -> Result<(), CapabilityValidationError> {
393 let mut receives: BTreeSet<PortId> = BTreeSet::new();
394 let mut emits: BTreeSet<PortId> = BTreeSet::new();
395
396 for port in ports {
397 let current: &mut BTreeSet<PortId> = match port.direction() {
398 PortCapabilityDirection::Receive => &mut receives,
399 PortCapabilityDirection::Emit => &mut emits,
400 };
401
402 if !current.insert(port.port_id().clone()) {
403 return Err(CapabilityValidationError::DuplicatePortCapability {
404 node_id: node_id.clone(),
405 port_id: port.port_id().clone(),
406 direction: port.direction(),
407 });
408 }
409 }
410
411 if let Some(port_id) = receives.intersection(&emits).next() {
412 return Err(CapabilityValidationError::ConflictingPortDirection {
413 node_id: node_id.clone(),
414 port_id: port_id.clone(),
415 });
416 }
417
418 Ok(())
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use pureflow_types::WorkflowId;
425
426 fn node_id(value: &str) -> NodeId {
427 NodeId::new(value).expect("valid node id")
428 }
429
430 fn port_id(value: &str) -> PortId {
431 PortId::new(value).expect("valid port id")
432 }
433
434 fn receive(port: &str) -> PortCapability {
435 PortCapability::new(port_id(port), PortCapabilityDirection::Receive)
436 }
437
438 fn emit(port: &str) -> PortCapability {
439 PortCapability::new(port_id(port), PortCapabilityDirection::Emit)
440 }
441
442 fn workflow() -> WorkflowDefinition {
443 WorkflowDefinition::from_parts(
444 WorkflowId::new("flow").expect("valid workflow id"),
445 [
446 NodeDefinition::new(node_id("source"), Vec::new(), [port_id("out")])
447 .expect("valid source"),
448 NodeDefinition::new(node_id("sink"), [port_id("in")], Vec::new())
449 .expect("valid sink"),
450 ],
451 Vec::new(),
452 )
453 .expect("valid workflow")
454 }
455
456 #[test]
457 fn valid_capabilities_keep_ports_and_effects_separate() {
458 let capabilities: NodeCapabilities = NodeCapabilities::new(
459 node_id("reader"),
460 [receive("input"), emit("output")],
461 [EffectCapability::FileSystemRead, EffectCapability::Clock],
462 )
463 .expect("valid capabilities");
464
465 assert_eq!(capabilities.node_id().as_str(), "reader");
466 assert!(capabilities.allows_effect(EffectCapability::FileSystemRead));
467 assert!(capabilities.allows_port(&port_id("input"), PortCapabilityDirection::Receive));
468 assert!(!capabilities.allows_effect(EffectCapability::ProcessSpawn));
469 }
470
471 #[test]
472 fn effect_capability_labels_are_stable() {
473 assert_eq!(EffectCapability::ExternalEffect.as_str(), "external_effect");
474 assert_eq!(
475 EffectCapability::NetworkOutbound.as_str(),
476 "network_outbound"
477 );
478 }
479
480 #[test]
481 fn native_passive_capabilities_have_no_effects() {
482 let capabilities: NodeCapabilities =
483 NodeCapabilities::native_passive(node_id("reader"), [receive("input")])
484 .expect("valid passive capabilities");
485
486 assert_eq!(capabilities.effects(), []);
487 assert!(capabilities.allows_port(&port_id("input"), PortCapabilityDirection::Receive));
488 }
489
490 #[test]
491 fn duplicate_effects_are_rejected() {
492 let err: CapabilityValidationError = NodeCapabilities::new(
493 node_id("reader"),
494 Vec::<PortCapability>::new(),
495 [
496 EffectCapability::FileSystemRead,
497 EffectCapability::FileSystemRead,
498 ],
499 )
500 .expect_err("duplicate effect must fail");
501
502 assert_eq!(
503 err,
504 CapabilityValidationError::DuplicateEffect {
505 node_id: node_id("reader"),
506 effect: EffectCapability::FileSystemRead
507 }
508 );
509 }
510
511 #[test]
512 fn duplicate_port_direction_is_rejected() {
513 let err: CapabilityValidationError = NodeCapabilities::new(
514 node_id("reader"),
515 [receive("input"), receive("input")],
516 Vec::<EffectCapability>::new(),
517 )
518 .expect_err("duplicate port direction must fail");
519
520 assert_eq!(
521 err,
522 CapabilityValidationError::DuplicatePortCapability {
523 node_id: node_id("reader"),
524 port_id: port_id("input"),
525 direction: PortCapabilityDirection::Receive
526 }
527 );
528 }
529
530 #[test]
531 fn conflicting_port_directions_are_rejected() {
532 let err: CapabilityValidationError = NodeCapabilities::new(
533 node_id("router"),
534 [receive("data"), emit("data")],
535 Vec::<EffectCapability>::new(),
536 )
537 .expect_err("conflicting port direction must fail");
538
539 assert_eq!(
540 err,
541 CapabilityValidationError::ConflictingPortDirection {
542 node_id: node_id("router"),
543 port_id: port_id("data")
544 }
545 );
546 }
547
548 #[test]
549 fn workflow_capabilities_accept_matching_node_ports() {
550 let workflow: WorkflowDefinition = workflow();
551 let capabilities: Vec<NodeCapabilities> = vec![
552 NodeCapabilities::new(
553 node_id("source"),
554 [emit("out")],
555 Vec::<EffectCapability>::new(),
556 )
557 .expect("valid source capabilities"),
558 NodeCapabilities::new(
559 node_id("sink"),
560 [receive("in")],
561 Vec::<EffectCapability>::new(),
562 )
563 .expect("valid sink capabilities"),
564 ];
565
566 validate_workflow_capabilities(&workflow, &capabilities)
567 .expect("matching workflow capabilities should validate");
568 }
569
570 #[test]
571 fn workflow_capabilities_reject_unknown_node() {
572 let workflow: WorkflowDefinition = workflow();
573 let capabilities: Vec<NodeCapabilities> = vec![
574 NodeCapabilities::new(
575 node_id("ghost"),
576 [emit("out")],
577 Vec::<EffectCapability>::new(),
578 )
579 .expect("self-consistent capability descriptor"),
580 ];
581
582 let err: CapabilityValidationError =
583 validate_workflow_capabilities(&workflow, &capabilities)
584 .expect_err("unknown workflow node must fail");
585
586 assert_eq!(
587 err,
588 CapabilityValidationError::UnknownCapabilityNode {
589 node_id: node_id("ghost")
590 }
591 );
592 }
593
594 #[test]
595 fn workflow_capabilities_reject_unknown_port() {
596 let workflow: WorkflowDefinition = workflow();
597 let capabilities: Vec<NodeCapabilities> = vec![
598 NodeCapabilities::new(
599 node_id("sink"),
600 [receive("missing")],
601 Vec::<EffectCapability>::new(),
602 )
603 .expect("self-consistent capability descriptor"),
604 ];
605
606 let err: CapabilityValidationError =
607 validate_workflow_capabilities(&workflow, &capabilities)
608 .expect_err("unknown workflow port must fail");
609
610 assert_eq!(
611 err,
612 CapabilityValidationError::UnknownCapabilityPort {
613 node_id: node_id("sink"),
614 port_id: port_id("missing")
615 }
616 );
617 }
618
619 #[test]
620 fn workflow_capabilities_reject_direction_mismatch() {
621 let workflow: WorkflowDefinition = workflow();
622 let capabilities: Vec<NodeCapabilities> = vec![
623 NodeCapabilities::new(
624 node_id("sink"),
625 [emit("in")],
626 Vec::<EffectCapability>::new(),
627 )
628 .expect("self-consistent capability descriptor"),
629 ];
630
631 let err: CapabilityValidationError =
632 validate_workflow_capabilities(&workflow, &capabilities)
633 .expect_err("direction mismatch must fail");
634
635 assert_eq!(
636 err,
637 CapabilityValidationError::CapabilityDirectionMismatch {
638 node_id: node_id("sink"),
639 port_id: port_id("in"),
640 claimed: PortCapabilityDirection::Emit,
641 declared: PortCapabilityDirection::Receive,
642 }
643 );
644 }
645
646 #[test]
647 fn unenforceable_effect_capability_is_a_capability_error() {
648 let err: CapabilityValidationError =
649 CapabilityValidationError::UnenforceableEffectCapability {
650 node_id: node_id("wasm"),
651 effect: EffectCapability::Clock,
652 };
653
654 assert!(err.to_string().contains("not enforceable"));
655 }
656}