Skip to main content

pureflow_core/
capability.rs

1//! Capability descriptors that constrain runtime behavior without owning graph shape.
2//!
3//! ## Fragment: capability-structure-vs-boundary
4//!
5//! Workflow shape and node capability are intentionally modeled in separate
6//! crates. The workflow model answers "what ports exist and how are they
7//! connected?", while this module answers "what is a node allowed to receive,
8//! emit, or ask the runtime to do?" Keeping those concerns apart prevents the
9//! graph model from quietly becoming a security or isolation policy surface.
10//!
11//! ## Fragment: capability-port-claims
12//!
13//! Port capabilities are duplicated as claims instead of reusing workflow port
14//! declarations directly because they serve a different purpose. Workflow ports
15//! describe topology; capability ports describe permitted runtime behavior. The
16//! duplication is intentional even though later validation must keep the two in
17//! sync.
18//!
19//! ## Fragment: capability-workflow-cross-validation
20//!
21//! Cross-validation lives here rather than in the workflow crate because a
22//! mismatch is still fundamentally a capability problem: the workflow only
23//! promises that a port exists, while this module decides whether a node's
24//! declared permissions line up with that topology. The validator therefore
25//! consumes a `WorkflowDefinition` as read-only structure and keeps the
26//! capability error vocabulary as the single place callers inspect.
27//!
28//! ## Fragment: capability-effect-taxonomy
29//!
30//! The current `EffectCapability` enum is intentionally modest and concrete.
31//! It names host effects the runtime can plausibly mediate today without
32//! pretending that native nodes are sandboxed. For native execution these
33//! capabilities are advisory metadata; for future WASM or process-backed nodes
34//! they become part of a real enforcement boundary. Observability concerns such
35//! as logging, tracing, and metadata collection are deliberately excluded unless
36//! a node asks the host to write to an external sink; routine runtime telemetry
37//! belongs to the metadata layer.
38
39use std::collections::BTreeSet;
40use std::error::Error;
41use std::fmt;
42
43use pureflow_types::{NodeId, PortId};
44use pureflow_workflow::{NodeDefinition, WorkflowDefinition};
45
46/// Direction of message flow a node claims for a port.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PortCapabilityDirection {
49    /// The node may receive messages through the port.
50    Receive,
51    /// The node may emit messages through the port.
52    Emit,
53}
54
55impl PortCapabilityDirection {
56    const fn label(self) -> &'static str {
57        match self {
58            Self::Receive => "receive",
59            Self::Emit => "emit",
60        }
61    }
62}
63
64/// External effect a node may request from the runtime boundary.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
66pub enum EffectCapability {
67    /// Read from host filesystem resources.
68    FileSystemRead,
69    /// Write to host filesystem resources.
70    FileSystemWrite,
71    /// Open outbound network connections.
72    NetworkOutbound,
73    /// Perform an external tool, service, database, or API effect.
74    ExternalEffect,
75    /// Spawn child processes.
76    ProcessSpawn,
77    /// Read process environment.
78    EnvironmentRead,
79    /// Mutate process environment.
80    EnvironmentWrite,
81    /// Use wall-clock time or timers.
82    Clock,
83}
84
85impl EffectCapability {
86    /// Stable machine-facing label for this effect capability.
87    #[must_use]
88    pub const fn as_str(self) -> &'static str {
89        match self {
90            Self::FileSystemRead => "filesystem_read",
91            Self::FileSystemWrite => "filesystem_write",
92            Self::NetworkOutbound => "network_outbound",
93            Self::ExternalEffect => "external_effect",
94            Self::ProcessSpawn => "process_spawn",
95            Self::EnvironmentRead => "environment_read",
96            Self::EnvironmentWrite => "environment_write",
97            Self::Clock => "clock",
98        }
99    }
100}
101
102/// A named claim that a node may use a port in one direction.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct PortCapability {
105    port_id: PortId,
106    direction: PortCapabilityDirection,
107}
108
109impl PortCapability {
110    /// Create a port capability claim.
111    #[must_use]
112    pub const fn new(port_id: PortId, direction: PortCapabilityDirection) -> Self {
113        Self { port_id, direction }
114    }
115
116    /// Port claimed by this capability.
117    #[must_use]
118    pub const fn port_id(&self) -> &PortId {
119        &self.port_id
120    }
121
122    /// Direction claimed by this capability.
123    #[must_use]
124    pub const fn direction(&self) -> PortCapabilityDirection {
125        self.direction
126    }
127}
128
129/// Validation error for node capability descriptors.
130#[derive(Debug, Clone, PartialEq, Eq)]
131pub enum CapabilityValidationError {
132    /// A node declared the same effect capability more than once.
133    DuplicateEffect {
134        /// Node whose capability descriptor is invalid.
135        node_id: NodeId,
136        /// Duplicated effect capability.
137        effect: EffectCapability,
138    },
139    /// A node declared the same port and direction more than once.
140    DuplicatePortCapability {
141        /// Node whose capability descriptor is invalid.
142        node_id: NodeId,
143        /// Duplicated port.
144        port_id: PortId,
145        /// Duplicated direction.
146        direction: PortCapabilityDirection,
147    },
148    /// A node declared one port as both receive and emit.
149    ConflictingPortDirection {
150        /// Node whose capability descriptor is invalid.
151        node_id: NodeId,
152        /// Port with conflicting direction claims.
153        port_id: PortId,
154    },
155    /// A capability descriptor referenced a node absent from the workflow.
156    UnknownCapabilityNode {
157        /// Node whose capability descriptor does not match the workflow.
158        node_id: NodeId,
159    },
160    /// A capability descriptor referenced a port absent from the workflow node.
161    UnknownCapabilityPort {
162        /// Node whose capability descriptor does not match the workflow.
163        node_id: NodeId,
164        /// Port claimed by the capability descriptor.
165        port_id: PortId,
166    },
167    /// A capability descriptor claimed the wrong direction for a declared port.
168    CapabilityDirectionMismatch {
169        /// Node whose capability descriptor does not match the workflow.
170        node_id: NodeId,
171        /// Port whose workflow direction and capability claim disagree.
172        port_id: PortId,
173        /// Direction claimed by the capability descriptor.
174        claimed: PortCapabilityDirection,
175        /// Direction declared by the workflow topology.
176        declared: PortCapabilityDirection,
177    },
178    /// A strict runtime boundary cannot enforce a declared effect capability.
179    UnenforceableEffectCapability {
180        /// Node whose effect cannot be enforced.
181        node_id: NodeId,
182        /// Effect that is not enforceable for the node's execution boundary.
183        effect: EffectCapability,
184    },
185}
186
187impl fmt::Display for CapabilityValidationError {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        match self {
190            Self::DuplicateEffect { node_id, effect } => {
191                write!(
192                    f,
193                    "node `{node_id}` declares duplicate effect capability `{effect:?}`"
194                )
195            }
196            Self::DuplicatePortCapability {
197                node_id,
198                port_id,
199                direction,
200            } => write!(
201                f,
202                "node `{node_id}` declares duplicate {} capability for port `{port_id}`",
203                direction.label()
204            ),
205            Self::ConflictingPortDirection { node_id, port_id } => write!(
206                f,
207                "node `{node_id}` declares port `{port_id}` for both receive and emit"
208            ),
209            Self::UnknownCapabilityNode { node_id } => write!(
210                f,
211                "capability descriptor references unknown workflow node `{node_id}`"
212            ),
213            Self::UnknownCapabilityPort { node_id, port_id } => write!(
214                f,
215                "node `{node_id}` capability references unknown workflow port `{port_id}`"
216            ),
217            Self::CapabilityDirectionMismatch {
218                node_id,
219                port_id,
220                claimed,
221                declared,
222            } => write!(
223                f,
224                "node `{node_id}` capability claims port `{port_id}` may {} but workflow declares {}",
225                claimed.label(),
226                declared.label()
227            ),
228            Self::UnenforceableEffectCapability { node_id, effect } => write!(
229                f,
230                "node `{node_id}` declares effect capability `{effect:?}` that is not enforceable by its execution boundary"
231            ),
232        }
233    }
234}
235
236impl Error for CapabilityValidationError {}
237
238/// Validated capability descriptor for one node.
239#[derive(Debug, Clone, PartialEq, Eq)]
240pub struct NodeCapabilities {
241    node_id: NodeId,
242    ports: Vec<PortCapability>,
243    effects: Vec<EffectCapability>,
244}
245
246impl NodeCapabilities {
247    /// Create a validated node capability descriptor.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if the descriptor repeats an effect, repeats a
252    /// port-direction claim, or declares one port as both receiving and
253    /// emitting.
254    pub fn new(
255        node_id: NodeId,
256        ports: impl Into<Vec<PortCapability>>,
257        effects: impl Into<Vec<EffectCapability>>,
258    ) -> Result<Self, CapabilityValidationError> {
259        let ports: Vec<PortCapability> = ports.into();
260        let effects: Vec<EffectCapability> = effects.into();
261        reject_duplicate_effects(&node_id, &effects)?;
262        reject_invalid_port_capabilities(&node_id, &ports)?;
263
264        Ok(Self {
265            node_id,
266            ports,
267            effects,
268        })
269    }
270
271    /// Create a validated native descriptor with no external effects.
272    ///
273    /// # Errors
274    ///
275    /// Returns an error if the descriptor repeats a port-direction claim or
276    /// declares one port as both receiving and emitting.
277    pub fn native_passive(
278        node_id: NodeId,
279        ports: impl Into<Vec<PortCapability>>,
280    ) -> Result<Self, CapabilityValidationError> {
281        Self::new(node_id, ports, Vec::<EffectCapability>::new())
282    }
283
284    /// Node constrained by this capability descriptor.
285    #[must_use]
286    pub const fn node_id(&self) -> &NodeId {
287        &self.node_id
288    }
289
290    /// Port capabilities claimed by the node.
291    #[must_use]
292    pub fn ports(&self) -> &[PortCapability] {
293        &self.ports
294    }
295
296    /// Effect capabilities claimed by the node.
297    #[must_use]
298    pub fn effects(&self) -> &[EffectCapability] {
299        &self.effects
300    }
301
302    /// Return whether this descriptor grants a specific effect capability.
303    #[must_use]
304    pub fn allows_effect(&self, effect: EffectCapability) -> bool {
305        self.effects.contains(&effect)
306    }
307
308    /// Return whether this descriptor grants a specific port-direction capability.
309    #[must_use]
310    pub fn allows_port(&self, port_id: &PortId, direction: PortCapabilityDirection) -> bool {
311        self.ports
312            .iter()
313            .any(|port: &PortCapability| port.port_id() == port_id && port.direction() == direction)
314    }
315}
316
317/// Validate that node capability descriptors align with one workflow topology.
318///
319/// # Errors
320///
321/// Returns an error if a capability descriptor references an unknown node,
322/// references an unknown port on a known node, or claims a direction that
323/// disagrees with the workflow declaration.
324pub fn validate_workflow_capabilities(
325    workflow: &WorkflowDefinition,
326    capabilities: &[NodeCapabilities],
327) -> Result<(), CapabilityValidationError> {
328    for capability in capabilities {
329        let node: &NodeDefinition = workflow
330            .nodes()
331            .iter()
332            .find(|node: &&NodeDefinition| node.id() == capability.node_id())
333            .ok_or_else(|| CapabilityValidationError::UnknownCapabilityNode {
334                node_id: capability.node_id().clone(),
335            })?;
336
337        for port in capability.ports() {
338            let declared: PortCapabilityDirection = workflow_direction_for(node, port.port_id())
339                .ok_or_else(|| CapabilityValidationError::UnknownCapabilityPort {
340                    node_id: capability.node_id().clone(),
341                    port_id: port.port_id().clone(),
342                })?;
343
344            if port.direction() != declared {
345                return Err(CapabilityValidationError::CapabilityDirectionMismatch {
346                    node_id: capability.node_id().clone(),
347                    port_id: port.port_id().clone(),
348                    claimed: port.direction(),
349                    declared,
350                });
351            }
352        }
353    }
354
355    Ok(())
356}
357
358fn workflow_direction_for(
359    node: &NodeDefinition,
360    port_id: &PortId,
361) -> Option<PortCapabilityDirection> {
362    if node.input_ports().contains(port_id) {
363        Some(PortCapabilityDirection::Receive)
364    } else if node.output_ports().contains(port_id) {
365        Some(PortCapabilityDirection::Emit)
366    } else {
367        None
368    }
369}
370
371fn reject_duplicate_effects(
372    node_id: &NodeId,
373    effects: &[EffectCapability],
374) -> Result<(), CapabilityValidationError> {
375    let mut seen: BTreeSet<EffectCapability> = BTreeSet::new();
376
377    for effect in effects {
378        if !seen.insert(*effect) {
379            return Err(CapabilityValidationError::DuplicateEffect {
380                node_id: node_id.clone(),
381                effect: *effect,
382            });
383        }
384    }
385
386    Ok(())
387}
388
389fn reject_invalid_port_capabilities(
390    node_id: &NodeId,
391    ports: &[PortCapability],
392) -> Result<(), CapabilityValidationError> {
393    let mut receives: BTreeSet<PortId> = BTreeSet::new();
394    let mut emits: BTreeSet<PortId> = BTreeSet::new();
395
396    for port in ports {
397        let current: &mut BTreeSet<PortId> = match port.direction() {
398            PortCapabilityDirection::Receive => &mut receives,
399            PortCapabilityDirection::Emit => &mut emits,
400        };
401
402        if !current.insert(port.port_id().clone()) {
403            return Err(CapabilityValidationError::DuplicatePortCapability {
404                node_id: node_id.clone(),
405                port_id: port.port_id().clone(),
406                direction: port.direction(),
407            });
408        }
409    }
410
411    if let Some(port_id) = receives.intersection(&emits).next() {
412        return Err(CapabilityValidationError::ConflictingPortDirection {
413            node_id: node_id.clone(),
414            port_id: port_id.clone(),
415        });
416    }
417
418    Ok(())
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use pureflow_types::WorkflowId;
425
426    fn node_id(value: &str) -> NodeId {
427        NodeId::new(value).expect("valid node id")
428    }
429
430    fn port_id(value: &str) -> PortId {
431        PortId::new(value).expect("valid port id")
432    }
433
434    fn receive(port: &str) -> PortCapability {
435        PortCapability::new(port_id(port), PortCapabilityDirection::Receive)
436    }
437
438    fn emit(port: &str) -> PortCapability {
439        PortCapability::new(port_id(port), PortCapabilityDirection::Emit)
440    }
441
442    fn workflow() -> WorkflowDefinition {
443        WorkflowDefinition::from_parts(
444            WorkflowId::new("flow").expect("valid workflow id"),
445            [
446                NodeDefinition::new(node_id("source"), Vec::new(), [port_id("out")])
447                    .expect("valid source"),
448                NodeDefinition::new(node_id("sink"), [port_id("in")], Vec::new())
449                    .expect("valid sink"),
450            ],
451            Vec::new(),
452        )
453        .expect("valid workflow")
454    }
455
456    #[test]
457    fn valid_capabilities_keep_ports_and_effects_separate() {
458        let capabilities: NodeCapabilities = NodeCapabilities::new(
459            node_id("reader"),
460            [receive("input"), emit("output")],
461            [EffectCapability::FileSystemRead, EffectCapability::Clock],
462        )
463        .expect("valid capabilities");
464
465        assert_eq!(capabilities.node_id().as_str(), "reader");
466        assert!(capabilities.allows_effect(EffectCapability::FileSystemRead));
467        assert!(capabilities.allows_port(&port_id("input"), PortCapabilityDirection::Receive));
468        assert!(!capabilities.allows_effect(EffectCapability::ProcessSpawn));
469    }
470
471    #[test]
472    fn effect_capability_labels_are_stable() {
473        assert_eq!(EffectCapability::ExternalEffect.as_str(), "external_effect");
474        assert_eq!(
475            EffectCapability::NetworkOutbound.as_str(),
476            "network_outbound"
477        );
478    }
479
480    #[test]
481    fn native_passive_capabilities_have_no_effects() {
482        let capabilities: NodeCapabilities =
483            NodeCapabilities::native_passive(node_id("reader"), [receive("input")])
484                .expect("valid passive capabilities");
485
486        assert_eq!(capabilities.effects(), []);
487        assert!(capabilities.allows_port(&port_id("input"), PortCapabilityDirection::Receive));
488    }
489
490    #[test]
491    fn duplicate_effects_are_rejected() {
492        let err: CapabilityValidationError = NodeCapabilities::new(
493            node_id("reader"),
494            Vec::<PortCapability>::new(),
495            [
496                EffectCapability::FileSystemRead,
497                EffectCapability::FileSystemRead,
498            ],
499        )
500        .expect_err("duplicate effect must fail");
501
502        assert_eq!(
503            err,
504            CapabilityValidationError::DuplicateEffect {
505                node_id: node_id("reader"),
506                effect: EffectCapability::FileSystemRead
507            }
508        );
509    }
510
511    #[test]
512    fn duplicate_port_direction_is_rejected() {
513        let err: CapabilityValidationError = NodeCapabilities::new(
514            node_id("reader"),
515            [receive("input"), receive("input")],
516            Vec::<EffectCapability>::new(),
517        )
518        .expect_err("duplicate port direction must fail");
519
520        assert_eq!(
521            err,
522            CapabilityValidationError::DuplicatePortCapability {
523                node_id: node_id("reader"),
524                port_id: port_id("input"),
525                direction: PortCapabilityDirection::Receive
526            }
527        );
528    }
529
530    #[test]
531    fn conflicting_port_directions_are_rejected() {
532        let err: CapabilityValidationError = NodeCapabilities::new(
533            node_id("router"),
534            [receive("data"), emit("data")],
535            Vec::<EffectCapability>::new(),
536        )
537        .expect_err("conflicting port direction must fail");
538
539        assert_eq!(
540            err,
541            CapabilityValidationError::ConflictingPortDirection {
542                node_id: node_id("router"),
543                port_id: port_id("data")
544            }
545        );
546    }
547
548    #[test]
549    fn workflow_capabilities_accept_matching_node_ports() {
550        let workflow: WorkflowDefinition = workflow();
551        let capabilities: Vec<NodeCapabilities> = vec![
552            NodeCapabilities::new(
553                node_id("source"),
554                [emit("out")],
555                Vec::<EffectCapability>::new(),
556            )
557            .expect("valid source capabilities"),
558            NodeCapabilities::new(
559                node_id("sink"),
560                [receive("in")],
561                Vec::<EffectCapability>::new(),
562            )
563            .expect("valid sink capabilities"),
564        ];
565
566        validate_workflow_capabilities(&workflow, &capabilities)
567            .expect("matching workflow capabilities should validate");
568    }
569
570    #[test]
571    fn workflow_capabilities_reject_unknown_node() {
572        let workflow: WorkflowDefinition = workflow();
573        let capabilities: Vec<NodeCapabilities> = vec![
574            NodeCapabilities::new(
575                node_id("ghost"),
576                [emit("out")],
577                Vec::<EffectCapability>::new(),
578            )
579            .expect("self-consistent capability descriptor"),
580        ];
581
582        let err: CapabilityValidationError =
583            validate_workflow_capabilities(&workflow, &capabilities)
584                .expect_err("unknown workflow node must fail");
585
586        assert_eq!(
587            err,
588            CapabilityValidationError::UnknownCapabilityNode {
589                node_id: node_id("ghost")
590            }
591        );
592    }
593
594    #[test]
595    fn workflow_capabilities_reject_unknown_port() {
596        let workflow: WorkflowDefinition = workflow();
597        let capabilities: Vec<NodeCapabilities> = vec![
598            NodeCapabilities::new(
599                node_id("sink"),
600                [receive("missing")],
601                Vec::<EffectCapability>::new(),
602            )
603            .expect("self-consistent capability descriptor"),
604        ];
605
606        let err: CapabilityValidationError =
607            validate_workflow_capabilities(&workflow, &capabilities)
608                .expect_err("unknown workflow port must fail");
609
610        assert_eq!(
611            err,
612            CapabilityValidationError::UnknownCapabilityPort {
613                node_id: node_id("sink"),
614                port_id: port_id("missing")
615            }
616        );
617    }
618
619    #[test]
620    fn workflow_capabilities_reject_direction_mismatch() {
621        let workflow: WorkflowDefinition = workflow();
622        let capabilities: Vec<NodeCapabilities> = vec![
623            NodeCapabilities::new(
624                node_id("sink"),
625                [emit("in")],
626                Vec::<EffectCapability>::new(),
627            )
628            .expect("self-consistent capability descriptor"),
629        ];
630
631        let err: CapabilityValidationError =
632            validate_workflow_capabilities(&workflow, &capabilities)
633                .expect_err("direction mismatch must fail");
634
635        assert_eq!(
636            err,
637            CapabilityValidationError::CapabilityDirectionMismatch {
638                node_id: node_id("sink"),
639                port_id: port_id("in"),
640                claimed: PortCapabilityDirection::Emit,
641                declared: PortCapabilityDirection::Receive,
642            }
643        );
644    }
645
646    #[test]
647    fn unenforceable_effect_capability_is_a_capability_error() {
648        let err: CapabilityValidationError =
649            CapabilityValidationError::UnenforceableEffectCapability {
650                node_id: node_id("wasm"),
651                effect: EffectCapability::Clock,
652            };
653
654        assert!(err.to_string().contains("not enforceable"));
655    }
656}