use std::collections::BTreeSet;
use std::error::Error;
use std::fmt;
use pureflow_types::{NodeId, PortId};
use pureflow_workflow::{NodeDefinition, WorkflowDefinition};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PortCapabilityDirection {
Receive,
Emit,
}
impl PortCapabilityDirection {
const fn label(self) -> &'static str {
match self {
Self::Receive => "receive",
Self::Emit => "emit",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum EffectCapability {
FileSystemRead,
FileSystemWrite,
NetworkOutbound,
ExternalEffect,
ProcessSpawn,
EnvironmentRead,
EnvironmentWrite,
Clock,
}
impl EffectCapability {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::FileSystemRead => "filesystem_read",
Self::FileSystemWrite => "filesystem_write",
Self::NetworkOutbound => "network_outbound",
Self::ExternalEffect => "external_effect",
Self::ProcessSpawn => "process_spawn",
Self::EnvironmentRead => "environment_read",
Self::EnvironmentWrite => "environment_write",
Self::Clock => "clock",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PortCapability {
port_id: PortId,
direction: PortCapabilityDirection,
}
impl PortCapability {
#[must_use]
pub const fn new(port_id: PortId, direction: PortCapabilityDirection) -> Self {
Self { port_id, direction }
}
#[must_use]
pub const fn port_id(&self) -> &PortId {
&self.port_id
}
#[must_use]
pub const fn direction(&self) -> PortCapabilityDirection {
self.direction
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CapabilityValidationError {
DuplicateEffect {
node_id: NodeId,
effect: EffectCapability,
},
DuplicatePortCapability {
node_id: NodeId,
port_id: PortId,
direction: PortCapabilityDirection,
},
ConflictingPortDirection {
node_id: NodeId,
port_id: PortId,
},
UnknownCapabilityNode {
node_id: NodeId,
},
UnknownCapabilityPort {
node_id: NodeId,
port_id: PortId,
},
CapabilityDirectionMismatch {
node_id: NodeId,
port_id: PortId,
claimed: PortCapabilityDirection,
declared: PortCapabilityDirection,
},
UnenforceableEffectCapability {
node_id: NodeId,
effect: EffectCapability,
},
}
impl fmt::Display for CapabilityValidationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DuplicateEffect { node_id, effect } => {
write!(
f,
"node `{node_id}` declares duplicate effect capability `{effect:?}`"
)
}
Self::DuplicatePortCapability {
node_id,
port_id,
direction,
} => write!(
f,
"node `{node_id}` declares duplicate {} capability for port `{port_id}`",
direction.label()
),
Self::ConflictingPortDirection { node_id, port_id } => write!(
f,
"node `{node_id}` declares port `{port_id}` for both receive and emit"
),
Self::UnknownCapabilityNode { node_id } => write!(
f,
"capability descriptor references unknown workflow node `{node_id}`"
),
Self::UnknownCapabilityPort { node_id, port_id } => write!(
f,
"node `{node_id}` capability references unknown workflow port `{port_id}`"
),
Self::CapabilityDirectionMismatch {
node_id,
port_id,
claimed,
declared,
} => write!(
f,
"node `{node_id}` capability claims port `{port_id}` may {} but workflow declares {}",
claimed.label(),
declared.label()
),
Self::UnenforceableEffectCapability { node_id, effect } => write!(
f,
"node `{node_id}` declares effect capability `{effect:?}` that is not enforceable by its execution boundary"
),
}
}
}
impl Error for CapabilityValidationError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeCapabilities {
node_id: NodeId,
ports: Vec<PortCapability>,
effects: Vec<EffectCapability>,
}
impl NodeCapabilities {
pub fn new(
node_id: NodeId,
ports: impl Into<Vec<PortCapability>>,
effects: impl Into<Vec<EffectCapability>>,
) -> Result<Self, CapabilityValidationError> {
let ports: Vec<PortCapability> = ports.into();
let effects: Vec<EffectCapability> = effects.into();
reject_duplicate_effects(&node_id, &effects)?;
reject_invalid_port_capabilities(&node_id, &ports)?;
Ok(Self {
node_id,
ports,
effects,
})
}
pub fn native_passive(
node_id: NodeId,
ports: impl Into<Vec<PortCapability>>,
) -> Result<Self, CapabilityValidationError> {
Self::new(node_id, ports, Vec::<EffectCapability>::new())
}
#[must_use]
pub const fn node_id(&self) -> &NodeId {
&self.node_id
}
#[must_use]
pub fn ports(&self) -> &[PortCapability] {
&self.ports
}
#[must_use]
pub fn effects(&self) -> &[EffectCapability] {
&self.effects
}
#[must_use]
pub fn allows_effect(&self, effect: EffectCapability) -> bool {
self.effects.contains(&effect)
}
#[must_use]
pub fn allows_port(&self, port_id: &PortId, direction: PortCapabilityDirection) -> bool {
self.ports
.iter()
.any(|port: &PortCapability| port.port_id() == port_id && port.direction() == direction)
}
}
pub fn validate_workflow_capabilities(
workflow: &WorkflowDefinition,
capabilities: &[NodeCapabilities],
) -> Result<(), CapabilityValidationError> {
for capability in capabilities {
let node: &NodeDefinition = workflow
.nodes()
.iter()
.find(|node: &&NodeDefinition| node.id() == capability.node_id())
.ok_or_else(|| CapabilityValidationError::UnknownCapabilityNode {
node_id: capability.node_id().clone(),
})?;
for port in capability.ports() {
let declared: PortCapabilityDirection = workflow_direction_for(node, port.port_id())
.ok_or_else(|| CapabilityValidationError::UnknownCapabilityPort {
node_id: capability.node_id().clone(),
port_id: port.port_id().clone(),
})?;
if port.direction() != declared {
return Err(CapabilityValidationError::CapabilityDirectionMismatch {
node_id: capability.node_id().clone(),
port_id: port.port_id().clone(),
claimed: port.direction(),
declared,
});
}
}
}
Ok(())
}
fn workflow_direction_for(
node: &NodeDefinition,
port_id: &PortId,
) -> Option<PortCapabilityDirection> {
if node.input_ports().contains(port_id) {
Some(PortCapabilityDirection::Receive)
} else if node.output_ports().contains(port_id) {
Some(PortCapabilityDirection::Emit)
} else {
None
}
}
fn reject_duplicate_effects(
node_id: &NodeId,
effects: &[EffectCapability],
) -> Result<(), CapabilityValidationError> {
let mut seen: BTreeSet<EffectCapability> = BTreeSet::new();
for effect in effects {
if !seen.insert(*effect) {
return Err(CapabilityValidationError::DuplicateEffect {
node_id: node_id.clone(),
effect: *effect,
});
}
}
Ok(())
}
fn reject_invalid_port_capabilities(
node_id: &NodeId,
ports: &[PortCapability],
) -> Result<(), CapabilityValidationError> {
let mut receives: BTreeSet<PortId> = BTreeSet::new();
let mut emits: BTreeSet<PortId> = BTreeSet::new();
for port in ports {
let current: &mut BTreeSet<PortId> = match port.direction() {
PortCapabilityDirection::Receive => &mut receives,
PortCapabilityDirection::Emit => &mut emits,
};
if !current.insert(port.port_id().clone()) {
return Err(CapabilityValidationError::DuplicatePortCapability {
node_id: node_id.clone(),
port_id: port.port_id().clone(),
direction: port.direction(),
});
}
}
if let Some(port_id) = receives.intersection(&emits).next() {
return Err(CapabilityValidationError::ConflictingPortDirection {
node_id: node_id.clone(),
port_id: port_id.clone(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use pureflow_types::WorkflowId;
fn node_id(value: &str) -> NodeId {
NodeId::new(value).expect("valid node id")
}
fn port_id(value: &str) -> PortId {
PortId::new(value).expect("valid port id")
}
fn receive(port: &str) -> PortCapability {
PortCapability::new(port_id(port), PortCapabilityDirection::Receive)
}
fn emit(port: &str) -> PortCapability {
PortCapability::new(port_id(port), PortCapabilityDirection::Emit)
}
fn workflow() -> WorkflowDefinition {
WorkflowDefinition::from_parts(
WorkflowId::new("flow").expect("valid workflow id"),
[
NodeDefinition::new(node_id("source"), Vec::new(), [port_id("out")])
.expect("valid source"),
NodeDefinition::new(node_id("sink"), [port_id("in")], Vec::new())
.expect("valid sink"),
],
Vec::new(),
)
.expect("valid workflow")
}
#[test]
fn valid_capabilities_keep_ports_and_effects_separate() {
let capabilities: NodeCapabilities = NodeCapabilities::new(
node_id("reader"),
[receive("input"), emit("output")],
[EffectCapability::FileSystemRead, EffectCapability::Clock],
)
.expect("valid capabilities");
assert_eq!(capabilities.node_id().as_str(), "reader");
assert!(capabilities.allows_effect(EffectCapability::FileSystemRead));
assert!(capabilities.allows_port(&port_id("input"), PortCapabilityDirection::Receive));
assert!(!capabilities.allows_effect(EffectCapability::ProcessSpawn));
}
#[test]
fn effect_capability_labels_are_stable() {
assert_eq!(EffectCapability::ExternalEffect.as_str(), "external_effect");
assert_eq!(
EffectCapability::NetworkOutbound.as_str(),
"network_outbound"
);
}
#[test]
fn native_passive_capabilities_have_no_effects() {
let capabilities: NodeCapabilities =
NodeCapabilities::native_passive(node_id("reader"), [receive("input")])
.expect("valid passive capabilities");
assert_eq!(capabilities.effects(), []);
assert!(capabilities.allows_port(&port_id("input"), PortCapabilityDirection::Receive));
}
#[test]
fn duplicate_effects_are_rejected() {
let err: CapabilityValidationError = NodeCapabilities::new(
node_id("reader"),
Vec::<PortCapability>::new(),
[
EffectCapability::FileSystemRead,
EffectCapability::FileSystemRead,
],
)
.expect_err("duplicate effect must fail");
assert_eq!(
err,
CapabilityValidationError::DuplicateEffect {
node_id: node_id("reader"),
effect: EffectCapability::FileSystemRead
}
);
}
#[test]
fn duplicate_port_direction_is_rejected() {
let err: CapabilityValidationError = NodeCapabilities::new(
node_id("reader"),
[receive("input"), receive("input")],
Vec::<EffectCapability>::new(),
)
.expect_err("duplicate port direction must fail");
assert_eq!(
err,
CapabilityValidationError::DuplicatePortCapability {
node_id: node_id("reader"),
port_id: port_id("input"),
direction: PortCapabilityDirection::Receive
}
);
}
#[test]
fn conflicting_port_directions_are_rejected() {
let err: CapabilityValidationError = NodeCapabilities::new(
node_id("router"),
[receive("data"), emit("data")],
Vec::<EffectCapability>::new(),
)
.expect_err("conflicting port direction must fail");
assert_eq!(
err,
CapabilityValidationError::ConflictingPortDirection {
node_id: node_id("router"),
port_id: port_id("data")
}
);
}
#[test]
fn workflow_capabilities_accept_matching_node_ports() {
let workflow: WorkflowDefinition = workflow();
let capabilities: Vec<NodeCapabilities> = vec![
NodeCapabilities::new(
node_id("source"),
[emit("out")],
Vec::<EffectCapability>::new(),
)
.expect("valid source capabilities"),
NodeCapabilities::new(
node_id("sink"),
[receive("in")],
Vec::<EffectCapability>::new(),
)
.expect("valid sink capabilities"),
];
validate_workflow_capabilities(&workflow, &capabilities)
.expect("matching workflow capabilities should validate");
}
#[test]
fn workflow_capabilities_reject_unknown_node() {
let workflow: WorkflowDefinition = workflow();
let capabilities: Vec<NodeCapabilities> = vec![
NodeCapabilities::new(
node_id("ghost"),
[emit("out")],
Vec::<EffectCapability>::new(),
)
.expect("self-consistent capability descriptor"),
];
let err: CapabilityValidationError =
validate_workflow_capabilities(&workflow, &capabilities)
.expect_err("unknown workflow node must fail");
assert_eq!(
err,
CapabilityValidationError::UnknownCapabilityNode {
node_id: node_id("ghost")
}
);
}
#[test]
fn workflow_capabilities_reject_unknown_port() {
let workflow: WorkflowDefinition = workflow();
let capabilities: Vec<NodeCapabilities> = vec![
NodeCapabilities::new(
node_id("sink"),
[receive("missing")],
Vec::<EffectCapability>::new(),
)
.expect("self-consistent capability descriptor"),
];
let err: CapabilityValidationError =
validate_workflow_capabilities(&workflow, &capabilities)
.expect_err("unknown workflow port must fail");
assert_eq!(
err,
CapabilityValidationError::UnknownCapabilityPort {
node_id: node_id("sink"),
port_id: port_id("missing")
}
);
}
#[test]
fn workflow_capabilities_reject_direction_mismatch() {
let workflow: WorkflowDefinition = workflow();
let capabilities: Vec<NodeCapabilities> = vec![
NodeCapabilities::new(
node_id("sink"),
[emit("in")],
Vec::<EffectCapability>::new(),
)
.expect("self-consistent capability descriptor"),
];
let err: CapabilityValidationError =
validate_workflow_capabilities(&workflow, &capabilities)
.expect_err("direction mismatch must fail");
assert_eq!(
err,
CapabilityValidationError::CapabilityDirectionMismatch {
node_id: node_id("sink"),
port_id: port_id("in"),
claimed: PortCapabilityDirection::Emit,
declared: PortCapabilityDirection::Receive,
}
);
}
#[test]
fn unenforceable_effect_capability_is_a_capability_error() {
let err: CapabilityValidationError =
CapabilityValidationError::UnenforceableEffectCapability {
node_id: node_id("wasm"),
effect: EffectCapability::Clock,
};
assert!(err.to_string().contains("not enforceable"));
}
}