use obzenflow_core::control_middleware::{CircuitBreakerContractMode, ControlMiddlewareProvider};
use obzenflow_core::event::types::SeqNo;
use obzenflow_core::{ContractResult, StageId, ViolationCause};
use obzenflow_core::event::types::ViolationCause as EventViolationCause;
use std::sync::Arc;
#[derive(Clone)]
pub enum EdgeContractDecision {
Pass,
Fail(EventViolationCause),
}
pub struct EdgeContext {
pub upstream_stage: StageId,
pub downstream_stage: StageId,
pub advertised_writer_seq: Option<SeqNo>,
pub reader_seq: SeqNo,
}
pub struct PolicyHints {
pub breaker_mode: Option<CircuitBreakerContractMode>,
pub has_opened_since_registration: bool,
pub has_fallback_configured: bool,
}
pub trait ContractPolicy: Send + Sync {
fn apply(
&self,
results: &[ContractResult],
edge: &EdgeContext,
hints: &PolicyHints,
prior: EdgeContractDecision,
) -> EdgeContractDecision;
}
pub struct ContractPolicyStack {
policies: Vec<Box<dyn ContractPolicy>>,
}
impl ContractPolicyStack {
pub fn new(policies: Vec<Box<dyn ContractPolicy>>) -> Self {
Self { policies }
}
pub fn decide(
&self,
results: &[ContractResult],
edge: &EdgeContext,
hints: &PolicyHints,
) -> EdgeContractDecision {
let mut decision = EdgeContractDecision::Pass;
for policy in &self.policies {
decision = policy.apply(results, edge, hints, decision);
}
decision
}
}
pub struct TransportStrictPolicy;
impl ContractPolicy for TransportStrictPolicy {
fn apply(
&self,
results: &[ContractResult],
_edge: &EdgeContext,
_hints: &PolicyHints,
_prior: EdgeContractDecision,
) -> EdgeContractDecision {
for result in results {
if let ContractResult::Failed(violation) = result {
let event_cause = match &violation.cause {
ViolationCause::SeqDivergence { advertised, reader } => {
EventViolationCause::SeqDivergence {
advertised: *advertised,
reader: *reader,
}
}
ViolationCause::ContentMismatch { .. } => {
EventViolationCause::Other("content_mismatch".into())
}
ViolationCause::DeliveryMismatch { .. } => {
EventViolationCause::Other("delivery_mismatch".into())
}
ViolationCause::AccountingMismatch { .. } => {
EventViolationCause::Other("accounting_mismatch".into())
}
ViolationCause::Divergence {
predicate,
observed,
threshold,
window_seconds,
} => EventViolationCause::Divergence {
predicate: predicate.clone(),
observed: *observed,
threshold: *threshold,
window_seconds: *window_seconds,
},
ViolationCause::Other(msg) => EventViolationCause::Other(msg.clone()),
};
return EdgeContractDecision::Fail(event_cause);
}
}
EdgeContractDecision::Pass
}
}
pub struct BreakerAwarePolicy;
impl ContractPolicy for BreakerAwarePolicy {
fn apply(
&self,
_results: &[ContractResult],
_edge: &EdgeContext,
hints: &PolicyHints,
prior: EdgeContractDecision,
) -> EdgeContractDecision {
if hints.breaker_mode != Some(CircuitBreakerContractMode::BreakerAware)
|| !hints.has_opened_since_registration
|| !hints.has_fallback_configured
{
return prior;
}
match prior {
EdgeContractDecision::Fail(EventViolationCause::SeqDivergence { .. }) => {
EdgeContractDecision::Pass
}
EdgeContractDecision::Fail(EventViolationCause::Divergence { predicate, .. })
if predicate == "signal_to_data_ratio" || predicate == "signals_when_no_data" =>
{
EdgeContractDecision::Pass
}
_ => prior,
}
}
}
pub fn build_policy_stack_for_upstream(
upstream_stage: StageId,
control_middleware: &Arc<dyn ControlMiddlewareProvider>,
) -> ContractPolicyStack {
let mut policies: Vec<Box<dyn ContractPolicy>> = Vec::new();
policies.push(Box::new(TransportStrictPolicy));
if let Some(info) = control_middleware.circuit_breaker_contract_info(&upstream_stage) {
if info.mode == CircuitBreakerContractMode::BreakerAware {
policies.push(Box::new(BreakerAwarePolicy));
}
}
ContractPolicyStack::new(policies)
}
#[cfg(test)]
mod tests {
use super::*;
use obzenflow_core::event::types::SeqNo;
use obzenflow_core::{
ContractEvidence, ContractResult, ContractViolation, StageId, ViolationCause,
};
use serde_json::json;
fn make_violation(cause: ViolationCause) -> ContractResult {
ContractResult::Failed(ContractViolation {
contract_name: "transport".to_string(),
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
detected_at: chrono::Utc::now(),
cause,
details: json!({}),
})
}
#[test]
fn transport_strict_policy_passes_on_no_failures() {
let policy = TransportStrictPolicy;
let edge = EdgeContext {
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
advertised_writer_seq: None,
reader_seq: SeqNo(0),
};
let hints = PolicyHints {
breaker_mode: None,
has_opened_since_registration: false,
has_fallback_configured: false,
};
let results = vec![ContractResult::Passed(ContractEvidence {
contract_name: "transport".to_string(),
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
verified_at: chrono::Utc::now(),
details: json!({}),
})];
let decision = policy.apply(&results, &edge, &hints, EdgeContractDecision::Pass);
assert!(matches!(decision, EdgeContractDecision::Pass));
}
#[test]
fn transport_strict_policy_fails_on_seq_divergence() {
let policy = TransportStrictPolicy;
let edge = EdgeContext {
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
advertised_writer_seq: Some(SeqNo(3)),
reader_seq: SeqNo(1),
};
let hints = PolicyHints {
breaker_mode: None,
has_opened_since_registration: false,
has_fallback_configured: false,
};
let results = vec![make_violation(ViolationCause::SeqDivergence {
advertised: Some(SeqNo(3)),
reader: SeqNo(1),
})];
let decision = policy.apply(&results, &edge, &hints, EdgeContractDecision::Pass);
match decision {
EdgeContractDecision::Fail(EventViolationCause::SeqDivergence {
advertised,
reader,
}) => {
assert_eq!(advertised, Some(SeqNo(3)));
assert_eq!(reader, SeqNo(1));
}
_ => panic!("expected SeqDivergence failure, got different decision"),
}
}
#[test]
fn breaker_aware_policy_respects_prior_when_not_configured() {
let policy = BreakerAwarePolicy;
let edge = EdgeContext {
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
advertised_writer_seq: Some(SeqNo(3)),
reader_seq: SeqNo(1),
};
let hints = PolicyHints {
breaker_mode: Some(CircuitBreakerContractMode::Strict),
has_opened_since_registration: false,
has_fallback_configured: false,
};
let prior = EdgeContractDecision::Fail(EventViolationCause::SeqDivergence {
advertised: Some(SeqNo(3)),
reader: SeqNo(1),
});
let decision = policy.apply(&[], &edge, &hints, prior);
assert!(matches!(
decision,
EdgeContractDecision::Fail(EventViolationCause::SeqDivergence { .. })
));
}
#[test]
fn breaker_aware_policy_requires_open_and_fallback() {
let policy = BreakerAwarePolicy;
let edge = EdgeContext {
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
advertised_writer_seq: Some(SeqNo(3)),
reader_seq: SeqNo(1),
};
let prior = EdgeContractDecision::Fail(EventViolationCause::SeqDivergence {
advertised: Some(SeqNo(3)),
reader: SeqNo(1),
});
let hints_never_opened = PolicyHints {
breaker_mode: Some(CircuitBreakerContractMode::BreakerAware),
has_opened_since_registration: false,
has_fallback_configured: true,
};
let decision = policy.apply(&[], &edge, &hints_never_opened, prior.clone());
assert!(matches!(
decision,
EdgeContractDecision::Fail(EventViolationCause::SeqDivergence { .. })
));
let hints_no_fallback = PolicyHints {
breaker_mode: Some(CircuitBreakerContractMode::BreakerAware),
has_opened_since_registration: true,
has_fallback_configured: false,
};
let decision = policy.apply(&[], &edge, &hints_no_fallback, prior.clone());
assert!(matches!(
decision,
EdgeContractDecision::Fail(EventViolationCause::SeqDivergence { .. })
));
}
#[test]
fn breaker_aware_policy_overrides_seq_divergence_when_safe() {
let policy = BreakerAwarePolicy;
let edge = EdgeContext {
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
advertised_writer_seq: Some(SeqNo(3)),
reader_seq: SeqNo(1),
};
let hints = PolicyHints {
breaker_mode: Some(CircuitBreakerContractMode::BreakerAware),
has_opened_since_registration: true,
has_fallback_configured: true,
};
let prior = EdgeContractDecision::Fail(EventViolationCause::SeqDivergence {
advertised: Some(SeqNo(3)),
reader: SeqNo(1),
});
let decision = policy.apply(&[], &edge, &hints, prior);
assert!(matches!(decision, EdgeContractDecision::Pass));
}
#[test]
fn breaker_aware_policy_does_not_override_other_failures() {
let policy = BreakerAwarePolicy;
let edge = EdgeContext {
upstream_stage: StageId::new(),
downstream_stage: StageId::new(),
advertised_writer_seq: Some(SeqNo(3)),
reader_seq: SeqNo(1),
};
let hints = PolicyHints {
breaker_mode: Some(CircuitBreakerContractMode::BreakerAware),
has_opened_since_registration: true,
has_fallback_configured: true,
};
let prior =
EdgeContractDecision::Fail(EventViolationCause::Other("content_mismatch".into()));
let decision = policy.apply(&[], &edge, &hints, prior);
match decision {
EdgeContractDecision::Fail(EventViolationCause::Other(reason)) => {
assert_eq!(reason, "content_mismatch");
}
_ => panic!("expected Other failure to be preserved by BreakerAwarePolicy"),
}
}
}