use super::{FabricCapability, FabricCapabilityGrant, FabricCapabilityId, GrantedFabricToken};
use crate::cx::Cx;
use crate::messaging::ir::{MorphismPlan, MorphismTransform, SubjectFamily};
use crate::messaging::subject::{Subject, SubjectPattern, SubjectPatternError, SubjectToken};
use std::fmt;
use thiserror::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RoutingDirection {
Import,
#[default]
Export,
}
impl fmt::Display for RoutingDirection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Import => "import",
Self::Export => "export",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum RoutingOperationKind {
#[default]
Publish,
Subscribe,
CreateStream,
TransformSpace,
}
impl RoutingOperationKind {
fn capability_for(self, subject: SubjectPattern) -> FabricCapability {
match self {
Self::Publish => FabricCapability::Publish { subject },
Self::Subscribe => FabricCapability::Subscribe { subject },
Self::CreateStream => FabricCapability::CreateStream { subject },
Self::TransformSpace => FabricCapability::TransformSpace { subject },
}
}
}
impl fmt::Display for RoutingOperationKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Publish => "publish",
Self::Subscribe => "subscribe",
Self::CreateStream => "create_stream",
Self::TransformSpace => "transform_space",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoutingRequest {
Publish(Subject),
Subscribe(SubjectPattern),
CreateStream(SubjectPattern),
TransformSpace(SubjectPattern),
}
impl RoutingRequest {
#[must_use]
pub const fn operation(&self) -> RoutingOperationKind {
match self {
Self::Publish(_) => RoutingOperationKind::Publish,
Self::Subscribe(_) => RoutingOperationKind::Subscribe,
Self::CreateStream(_) => RoutingOperationKind::CreateStream,
Self::TransformSpace(_) => RoutingOperationKind::TransformSpace,
}
}
#[must_use]
pub fn subject_pattern(&self) -> SubjectPattern {
match self {
Self::Publish(subject) => SubjectPattern::from(subject),
Self::Subscribe(pattern)
| Self::CreateStream(pattern)
| Self::TransformSpace(pattern) => pattern.clone(),
}
}
fn required_capability(&self) -> FabricCapability {
self.operation().capability_for(self.subject_pattern())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RoutingProgramStep {
MatchSourcePattern,
CheckAllowedFamily,
CheckCapability,
RewriteTargetPrefix,
EmitAudit,
}
impl fmt::Display for RoutingProgramStep {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::MatchSourcePattern => "match_source_pattern",
Self::CheckAllowedFamily => "check_allowed_family",
Self::CheckCapability => "check_capability",
Self::RewriteTargetPrefix => "rewrite_target_prefix",
Self::EmitAudit => "emit_audit",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutingProgram {
name: String,
direction: RoutingDirection,
operation: RoutingOperationKind,
source_pattern: SubjectPattern,
target_prefix: SubjectPattern,
allowed_families: Vec<SubjectFamily>,
steps: Vec<RoutingProgramStep>,
}
impl RoutingProgram {
pub fn compile_import(
plan: &MorphismPlan,
operation: RoutingOperationKind,
) -> Result<Self, RoutingProgramCompileError> {
Self::compile(plan, RoutingDirection::Import, operation)
}
pub fn compile_export(
plan: &MorphismPlan,
operation: RoutingOperationKind,
) -> Result<Self, RoutingProgramCompileError> {
Self::compile(plan, RoutingDirection::Export, operation)
}
fn compile(
plan: &MorphismPlan,
direction: RoutingDirection,
operation: RoutingOperationKind,
) -> Result<Self, RoutingProgramCompileError> {
let target_prefix_raw = effective_target_prefix(plan);
let target_prefix = SubjectPattern::parse(&target_prefix_raw).map_err(|source| {
RoutingProgramCompileError::InvalidTargetPrefix {
prefix: target_prefix_raw.clone(),
source,
}
})?;
ensure_literal_only_prefix(&target_prefix).map_err(|source| {
RoutingProgramCompileError::NonLiteralTargetPrefix {
prefix: target_prefix_raw.clone(),
source,
}
})?;
let allowed_families = effective_allowed_families(plan);
if allowed_families.is_empty() {
return Err(RoutingProgramCompileError::NoAllowedFamilies {
program: plan.name.clone(),
});
}
let source_pattern =
SubjectPattern::parse(plan.source_pattern.as_str()).expect("validated morphism plan");
ensure_literal_source_anchor(&source_pattern).map_err(|()| {
RoutingProgramCompileError::NonLiteralSourcePatternAnchor {
pattern: source_pattern.as_str().to_owned(),
}
})?;
Ok(Self {
name: plan.name.clone(),
direction,
operation,
source_pattern,
target_prefix,
allowed_families,
steps: vec![
RoutingProgramStep::MatchSourcePattern,
RoutingProgramStep::CheckAllowedFamily,
RoutingProgramStep::CheckCapability,
RoutingProgramStep::RewriteTargetPrefix,
RoutingProgramStep::EmitAudit,
],
})
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub const fn direction(&self) -> RoutingDirection {
self.direction
}
#[must_use]
pub const fn operation(&self) -> RoutingOperationKind {
self.operation
}
#[must_use]
pub fn allowed_families(&self) -> &[SubjectFamily] {
&self.allowed_families
}
#[must_use]
pub fn source_pattern(&self) -> &SubjectPattern {
&self.source_pattern
}
#[must_use]
pub fn target_prefix(&self) -> &SubjectPattern {
&self.target_prefix
}
#[must_use]
pub fn steps(&self) -> &[RoutingProgramStep] {
&self.steps
}
pub fn authorize_in_process<T>(
&self,
token: &GrantedFabricToken<T>,
family: SubjectFamily,
request: &RoutingRequest,
) -> Result<AuthorizedRoute, CapabilityRoutingError> {
self.authorize_inner(family, request, |required| {
token.grant().capability().allows(required)
})
.map(|route| {
route.with_authorization(RoutingAuthorization::InProcess {
grant_id: token.grant_id(),
})
})
}
pub fn authorize_with_grant(
&self,
grant: &FabricCapabilityGrant,
family: SubjectFamily,
request: &RoutingRequest,
) -> Result<AuthorizedRoute, CapabilityRoutingError> {
self.authorize_inner(family, request, |required| {
grant.capability().allows(required)
})
.map(|route| {
route.with_authorization(RoutingAuthorization::InProcess {
grant_id: grant.id(),
})
})
}
pub fn authorize_distributed<Caps>(
&self,
cx: &Cx<Caps>,
family: SubjectFamily,
request: &RoutingRequest,
) -> Result<AuthorizedRoute, CapabilityRoutingError> {
self.authorize_inner(family, request, |required| {
cx.check_fabric_capability(required)
})
.map(|route| route.with_authorization(RoutingAuthorization::Distributed))
}
fn authorize_inner(
&self,
family: SubjectFamily,
request: &RoutingRequest,
allows: impl Fn(&FabricCapability) -> bool,
) -> Result<AuthorizedRoute, CapabilityRoutingError> {
if request.operation() != self.operation {
return Err(CapabilityRoutingError::OperationMismatch {
program: self.name.clone(),
expected: self.operation,
actual: request.operation(),
});
}
if !self.allowed_families.contains(&family) {
return Err(CapabilityRoutingError::UnsupportedFamily {
program: self.name.clone(),
family,
});
}
let required = request.required_capability();
let admitted = self.operation.capability_for(self.source_pattern.clone());
if !admitted.allows(&required) {
return Err(CapabilityRoutingError::SubjectOutsideProgram {
program: self.name.clone(),
source_pattern: self.source_pattern.as_str().to_owned(),
requested: request.subject_pattern().as_str().to_owned(),
});
}
if !allows(&required) {
return Err(CapabilityRoutingError::CapabilityDenied {
program: self.name.clone(),
required,
});
}
let source = request.subject_pattern();
let destination = rewrite_destination(&source, &self.source_pattern, &self.target_prefix);
Ok(AuthorizedRoute {
destination,
audit: RoutingAuditTrail {
program: self.name.clone(),
direction: self.direction,
family,
authorized_capability: request.required_capability(),
authorization: None,
steps: self.steps.clone(),
},
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthorizedRoute {
destination: SubjectPattern,
audit: RoutingAuditTrail,
}
impl AuthorizedRoute {
#[must_use]
fn with_authorization(mut self, authorization: RoutingAuthorization) -> Self {
self.audit.authorization = Some(authorization);
self
}
#[must_use]
pub fn destination(&self) -> &SubjectPattern {
&self.destination
}
#[must_use]
pub fn audit(&self) -> &RoutingAuditTrail {
&self.audit
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RoutingAuthorization {
InProcess {
grant_id: FabricCapabilityId,
},
Distributed,
}
impl RoutingAuthorization {
#[must_use]
pub const fn grant_id(&self) -> Option<FabricCapabilityId> {
match self {
Self::InProcess { grant_id } => Some(*grant_id),
Self::Distributed => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RoutingAuditTrail {
program: String,
direction: RoutingDirection,
family: SubjectFamily,
authorized_capability: FabricCapability,
authorization: Option<RoutingAuthorization>,
steps: Vec<RoutingProgramStep>,
}
impl RoutingAuditTrail {
#[must_use]
pub fn program(&self) -> &str {
&self.program
}
#[must_use]
pub const fn direction(&self) -> RoutingDirection {
self.direction
}
#[must_use]
pub const fn family(&self) -> SubjectFamily {
self.family
}
#[must_use]
pub fn authorized_capability(&self) -> &FabricCapability {
&self.authorized_capability
}
#[must_use]
pub fn authorization(&self) -> Option<&RoutingAuthorization> {
self.authorization.as_ref()
}
#[must_use]
pub fn steps(&self) -> &[RoutingProgramStep] {
&self.steps
}
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum RoutingProgramCompileError {
#[error("routing program target prefix `{prefix}` is invalid")]
InvalidTargetPrefix {
prefix: String,
source: SubjectPatternError,
},
#[error("routing program target prefix `{prefix}` must contain only literal segments")]
NonLiteralTargetPrefix {
prefix: String,
source: SubjectPatternError,
},
#[error("routing program `{program}` must admit at least one subject family")]
NoAllowedFamilies {
program: String,
},
#[error("routing program source pattern `{pattern}` must start with a literal segment")]
NonLiteralSourcePatternAnchor {
pattern: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum CapabilityRoutingError {
#[error("routing program `{program}` expects `{expected}` operations but received `{actual}`")]
OperationMismatch {
program: String,
expected: RoutingOperationKind,
actual: RoutingOperationKind,
},
#[error("routing program `{program}` does not admit subject family `{family:?}`")]
UnsupportedFamily {
program: String,
family: SubjectFamily,
},
#[error(
"routing program `{program}` only covers source pattern `{source_pattern}`, but received `{requested}`"
)]
SubjectOutsideProgram {
program: String,
source_pattern: String,
requested: String,
},
#[error("routing program `{program}` denied missing capability `{required}`")]
CapabilityDenied {
program: String,
required: FabricCapability,
},
}
fn effective_allowed_families(plan: &MorphismPlan) -> Vec<SubjectFamily> {
let mut allowed = Vec::new();
for family in plan.allowed_families.iter().copied() {
if !allowed.contains(&family) {
allowed.push(family);
}
}
for transform in &plan.transforms {
if let MorphismTransform::FilterFamily { family } = transform {
allowed.retain(|candidate| candidate == family);
}
}
allowed
}
fn effective_target_prefix(plan: &MorphismPlan) -> String {
plan.transforms
.iter()
.rev()
.find_map(|transform| match transform {
MorphismTransform::RenamePrefix { to, .. } => Some(to.clone()),
MorphismTransform::FilterFamily { .. }
| MorphismTransform::EscalateDeliveryClass { .. }
| MorphismTransform::PreserveReplySpace
| MorphismTransform::AttachEvidencePolicy { .. } => None,
})
.unwrap_or_else(|| plan.target_prefix.clone())
}
fn ensure_literal_only_prefix(pattern: &SubjectPattern) -> Result<(), SubjectPatternError> {
for segment in pattern.segments() {
match segment {
SubjectToken::Literal(_) => {}
SubjectToken::One | SubjectToken::Tail => {
return Err(SubjectPatternError::LiteralOnlyPatternRequired(
pattern.as_str().to_owned(),
));
}
}
}
Ok(())
}
fn ensure_literal_source_anchor(pattern: &SubjectPattern) -> Result<(), ()> {
match pattern.segments().first() {
Some(SubjectToken::Literal(_)) => Ok(()),
Some(SubjectToken::One | SubjectToken::Tail) | None => Err(()),
}
}
fn leading_literal_prefix(pattern: &SubjectPattern) -> Vec<String> {
let mut prefix = Vec::new();
for segment in pattern.segments() {
match segment {
SubjectToken::Literal(value) => prefix.push(value.clone()),
SubjectToken::One | SubjectToken::Tail => break,
}
}
prefix
}
fn literal_target_tokens(pattern: &SubjectPattern) -> Vec<SubjectToken> {
pattern
.segments()
.iter()
.map(|segment| match segment {
SubjectToken::Literal(value) => SubjectToken::Literal(value.clone()),
SubjectToken::One | SubjectToken::Tail => {
unreachable!("target prefix is validated as literal-only during compilation")
}
})
.collect()
}
fn rewrite_destination(
source: &SubjectPattern,
source_pattern: &SubjectPattern,
target_prefix: &SubjectPattern,
) -> SubjectPattern {
let literal_prefix = leading_literal_prefix(source_pattern);
let literal_prefix_len = literal_prefix.len();
let remainder = source
.segments()
.iter()
.skip(literal_prefix_len)
.cloned()
.collect::<Vec<_>>();
let mut destination = literal_target_tokens(target_prefix);
destination.extend(remainder);
SubjectPattern::from_tokens(destination)
.expect("rewritten routing destination must remain syntactically valid")
}
#[cfg(test)]
mod tests {
use super::super::{EventFamily, PublishPermit};
use super::*;
use crate::Budget;
use crate::messaging::class::DeliveryClass;
use crate::messaging::ir::{
CapabilityPermission, CapabilityTokenSchema, MorphismPlan, MorphismTransform,
SubjectFamily, SubjectPattern as IrSubjectPattern,
};
use crate::types::{RegionId, TaskId};
use crate::util::ArenaIndex;
fn test_cx() -> Cx {
Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn publish_schema() -> CapabilityTokenSchema {
CapabilityTokenSchema {
name: "fabric.route.publish".to_owned(),
families: vec![SubjectFamily::Event],
delivery_classes: vec![DeliveryClass::EphemeralInteractive],
permissions: vec![CapabilityPermission::Publish],
}
}
fn export_plan() -> MorphismPlan {
MorphismPlan {
name: "orders-export".to_owned(),
source_pattern: IrSubjectPattern::new("orders.>"),
target_prefix: "federated.orders".to_owned(),
allowed_families: vec![SubjectFamily::Event, SubjectFamily::Command],
transforms: vec![
MorphismTransform::FilterFamily {
family: SubjectFamily::Event,
},
MorphismTransform::PreserveReplySpace,
],
}
}
#[test]
fn compile_export_program_rejects_non_literal_target_prefix() {
let mut plan = export_plan();
plan.target_prefix = "federated.>".to_owned();
let error = RoutingProgram::compile_export(&plan, RoutingOperationKind::Publish)
.expect_err("wildcard target prefixes must fail closed");
assert_eq!(
error,
RoutingProgramCompileError::NonLiteralTargetPrefix {
prefix: "federated.>".to_owned(),
source: SubjectPatternError::LiteralOnlyPatternRequired("federated.>".to_owned()),
}
);
}
#[test]
fn compile_export_program_rejects_source_pattern_without_literal_anchor() {
let mut plan = export_plan();
plan.source_pattern = IrSubjectPattern::new("*.orders.created");
let error = RoutingProgram::compile_export(&plan, RoutingOperationKind::Publish)
.expect_err("wildcard-leading source patterns must fail closed");
assert_eq!(
error,
RoutingProgramCompileError::NonLiteralSourcePatternAnchor {
pattern: "*.orders.created".to_owned(),
}
);
}
#[test]
fn distributed_publish_route_requires_capability_and_records_audit() {
let program = RoutingProgram::compile_export(&export_plan(), RoutingOperationKind::Publish)
.expect("routing program");
let cx = test_cx();
cx.grant_fabric_capability(FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
})
.expect("publish grant");
let route = program
.authorize_distributed(
&cx,
SubjectFamily::Event,
&RoutingRequest::Publish(Subject::new("orders.created")),
)
.expect("distributed route should pass");
assert_eq!(route.destination().as_str(), "federated.orders.created");
assert_eq!(route.audit().program(), "orders-export");
assert_eq!(route.audit().direction(), RoutingDirection::Export);
assert_eq!(route.audit().family(), SubjectFamily::Event);
assert_eq!(
route.audit().authorized_capability(),
&FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
}
);
assert_eq!(
route.audit().authorization(),
Some(&RoutingAuthorization::Distributed)
);
assert_eq!(route.audit().steps(), program.steps());
}
#[test]
fn distributed_publish_route_denies_missing_capability() {
let program = RoutingProgram::compile_export(&export_plan(), RoutingOperationKind::Publish)
.expect("routing program");
let cx = test_cx();
let error = program
.authorize_distributed(
&cx,
SubjectFamily::Event,
&RoutingRequest::Publish(Subject::new("orders.created")),
)
.expect_err("missing capability must fail closed");
assert_eq!(
error,
CapabilityRoutingError::CapabilityDenied {
program: "orders-export".to_owned(),
required: FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
},
}
);
}
#[test]
fn distributed_publish_route_rejects_bare_prefix_for_tail_program() {
let program = RoutingProgram::compile_export(&export_plan(), RoutingOperationKind::Publish)
.expect("routing program");
let cx = test_cx();
cx.grant_fabric_capability(FabricCapability::Publish {
subject: SubjectPattern::new("orders.>"),
})
.expect("publish grant");
let error = program
.authorize_distributed(
&cx,
SubjectFamily::Event,
&RoutingRequest::Publish(Subject::new("orders")),
)
.expect_err("bare prefix must be outside the tail-wildcard program");
assert_eq!(
error,
CapabilityRoutingError::SubjectOutsideProgram {
program: "orders-export".to_owned(),
source_pattern: "orders.>".to_owned(),
requested: "orders".to_owned(),
}
);
}
#[test]
fn in_process_and_distributed_publish_routes_are_equivalent() {
let program = RoutingProgram::compile_export(&export_plan(), RoutingOperationKind::Publish)
.expect("routing program");
let cx = test_cx();
let token: GrantedFabricToken<PublishPermit<EventFamily>> = cx
.grant_publish_capability::<EventFamily>(
SubjectPattern::new("orders.>"),
&publish_schema(),
DeliveryClass::EphemeralInteractive,
)
.expect("publish token");
let request = RoutingRequest::Publish(Subject::new("orders.shipped.eu"));
let in_process = program
.authorize_in_process(&token, SubjectFamily::Event, &request)
.expect("in-process route");
let distributed = program
.authorize_distributed(&cx, SubjectFamily::Event, &request)
.expect("distributed route");
assert_eq!(in_process.destination(), distributed.destination());
assert_eq!(
in_process.audit().authorized_capability(),
distributed.audit().authorized_capability()
);
assert_eq!(in_process.audit().family(), distributed.audit().family());
assert_eq!(
in_process.audit().authorization(),
Some(&RoutingAuthorization::InProcess {
grant_id: token.grant_id(),
})
);
assert_eq!(
distributed.audit().authorization(),
Some(&RoutingAuthorization::Distributed)
);
}
#[test]
fn family_filters_fail_closed_before_routing() {
let program = RoutingProgram::compile_export(&export_plan(), RoutingOperationKind::Publish)
.expect("routing program");
let cx = test_cx();
cx.grant_fabric_capability(FabricCapability::Publish {
subject: SubjectPattern::new("orders.created"),
})
.expect("publish grant");
let error = program
.authorize_distributed(
&cx,
SubjectFamily::Command,
&RoutingRequest::Publish(Subject::new("orders.created")),
)
.expect_err("filtered-out families must fail");
assert_eq!(
error,
CapabilityRoutingError::UnsupportedFamily {
program: "orders-export".to_owned(),
family: SubjectFamily::Command,
}
);
}
}