use super::class::{AckKind, DeliveryClass, DeliveryClassPolicy, DeliveryClassPolicyError};
use super::control::{
ControlHandlerId, ControlRegistry, ControlRegistryError, NamespaceControlScope,
SystemSubjectFamily,
};
use super::morphism::{ExportPlan, ImportPlan, Morphism, MorphismCompileError};
use super::subject::{NamespaceKernel, NamespaceKernelError, Subject};
use crate::lab::conformal::{HealthThresholdCalibrator, HealthThresholdConfig, ThresholdMode};
use crate::obligation::eprocess::{
AlertState as EProcessAlertState, LeakMonitor, MonitorConfig as LeakMonitorConfig,
};
use crate::obligation::ledger::{ObligationLedger, ObligationToken};
use crate::record::{ObligationAbortReason, ObligationKind, ObligationState, SourceLocation};
use crate::types::{ObligationId, RegionId, TaskId, Time};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::fmt;
use std::panic::Location;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum PayloadShape {
#[default]
Empty,
JsonDocument,
BinaryBlob,
SubjectEncoded,
NamedSchema {
schema: String,
},
}
impl PayloadShape {
fn validate(&self, field: &str) -> Result<(), ServiceContractError> {
if let Self::NamedSchema { schema } = self
&& schema.trim().is_empty()
{
return Err(ServiceContractError::EmptyNamedSchema {
field: field.to_owned(),
});
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum ReplyShape {
#[default]
None,
Unary {
shape: PayloadShape,
},
Stream {
shape: PayloadShape,
},
}
impl ReplyShape {
fn validate(&self, field: &str) -> Result<(), ServiceContractError> {
match self {
Self::None => Ok(()),
Self::Unary { shape } | Self::Stream { shape } => shape.validate(field),
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum CleanupUrgency {
Background,
#[default]
Prompt,
Immediate,
}
impl fmt::Display for CleanupUrgency {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Background => "background",
Self::Prompt => "prompt",
Self::Immediate => "immediate",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BudgetSemantics {
pub cleanup_urgency: CleanupUrgency,
pub default_timeout: Option<Duration>,
pub allow_timeout_override: bool,
pub honor_priority_hints: bool,
}
impl Default for BudgetSemantics {
fn default() -> Self {
Self {
cleanup_urgency: CleanupUrgency::Prompt,
default_timeout: Some(Duration::from_secs(30)),
allow_timeout_override: true,
honor_priority_hints: false,
}
}
}
impl BudgetSemantics {
fn validate(&self) -> Result<(), ServiceContractError> {
if self
.default_timeout
.is_some_and(|timeout| timeout.is_zero())
{
return Err(ServiceContractError::ZeroDuration {
field: "budget_semantics.default_timeout".to_owned(),
});
}
Ok(())
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum CancellationObligations {
BestEffortDrain,
#[default]
DrainBeforeReply,
DrainAndCompensate,
}
impl fmt::Display for CancellationObligations {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::BestEffortDrain => "best-effort-drain",
Self::DrainBeforeReply => "drain-before-reply",
Self::DrainAndCompensate => "drain-and-compensate",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[allow(clippy::struct_excessive_bools)]
pub struct CaptureRules {
pub capture_requests: bool,
pub capture_replies: bool,
pub record_payload_hashes: bool,
pub record_branch_artifacts: bool,
}
impl Default for CaptureRules {
fn default() -> Self {
Self {
capture_requests: true,
capture_replies: true,
record_payload_hashes: true,
record_branch_artifacts: false,
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum CompensationSemantics {
#[default]
None,
BestEffort,
Required,
}
impl fmt::Display for CompensationSemantics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::None => "none",
Self::BestEffort => "best-effort",
Self::Required => "required",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum MobilityConstraint {
#[default]
Unrestricted,
BoundedRegion {
region: String,
},
Pinned,
}
impl MobilityConstraint {
fn validate(&self, field: &str) -> Result<(), ServiceContractError> {
if let Self::BoundedRegion { region } = self
&& region.trim().is_empty()
{
return Err(ServiceContractError::EmptyBoundedRegion {
field: field.to_owned(),
});
}
Ok(())
}
#[must_use]
pub fn satisfies(&self, required: &Self) -> bool {
match required {
Self::Unrestricted => true,
Self::BoundedRegion { region } => match self {
Self::BoundedRegion {
region: provider_region,
} => provider_region == region,
Self::Pinned => true,
Self::Unrestricted => false,
},
Self::Pinned => matches!(self, Self::Pinned),
}
}
}
impl fmt::Display for MobilityConstraint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Unrestricted => write!(f, "unrestricted"),
Self::BoundedRegion { region } => write!(f, "bounded-region({region})"),
Self::Pinned => write!(f, "pinned"),
}
}
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize,
)]
#[serde(rename_all = "snake_case")]
pub enum EvidenceLevel {
Minimal,
#[default]
Standard,
Detailed,
Forensic,
}
impl fmt::Display for EvidenceLevel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Minimal => "minimal",
Self::Standard => "standard",
Self::Detailed => "detailed",
Self::Forensic => "forensic",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum OverloadPolicy {
#[default]
RejectNew,
QueueWithinBudget {
max_pending: u32,
},
DropEphemeral,
FailFast,
}
impl OverloadPolicy {
fn validate(&self) -> Result<(), ServiceContractError> {
if let Self::QueueWithinBudget { max_pending } = self
&& *max_pending == 0
{
return Err(ServiceContractError::InvalidQueueCapacity);
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServiceContractSchema {
pub request_shape: PayloadShape,
pub reply_shape: ReplyShape,
pub cancellation_obligations: CancellationObligations,
pub budget_semantics: BudgetSemantics,
pub durability_class: DeliveryClass,
pub capture_rules: CaptureRules,
pub compensation_semantics: CompensationSemantics,
pub mobility_constraints: MobilityConstraint,
pub evidence_requirements: EvidenceLevel,
pub overload_policy: OverloadPolicy,
}
impl Default for ServiceContractSchema {
fn default() -> Self {
Self {
request_shape: PayloadShape::JsonDocument,
reply_shape: ReplyShape::Unary {
shape: PayloadShape::JsonDocument,
},
cancellation_obligations: CancellationObligations::default(),
budget_semantics: BudgetSemantics::default(),
durability_class: DeliveryClass::ObligationBacked,
capture_rules: CaptureRules::default(),
compensation_semantics: CompensationSemantics::None,
mobility_constraints: MobilityConstraint::Unrestricted,
evidence_requirements: EvidenceLevel::Standard,
overload_policy: OverloadPolicy::default(),
}
}
}
impl ServiceContractSchema {
pub fn validate(&self) -> Result<(), ServiceContractError> {
self.request_shape.validate("request_shape")?;
self.reply_shape.validate("reply_shape")?;
self.budget_semantics.validate()?;
self.mobility_constraints.validate("mobility_constraints")?;
self.overload_policy.validate()?;
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProviderTerms {
pub admissible_classes: DeliveryClassPolicy,
pub guaranteed_durability: DeliveryClass,
pub compensation_policy: CompensationSemantics,
pub mobility_constraint: MobilityConstraint,
pub evidence_level: EvidenceLevel,
}
impl ProviderTerms {
pub fn validate_against(
&self,
contract: &ServiceContractSchema,
) -> Result<(), ServiceContractError> {
self.mobility_constraint
.validate("provider_terms.mobility_constraint")?;
if self.guaranteed_durability < contract.durability_class {
return Err(ServiceContractError::ProviderGuaranteeBelowContractFloor {
guaranteed_durability: self.guaranteed_durability,
required_durability: contract.durability_class,
});
}
if self.compensation_policy < contract.compensation_semantics {
return Err(ServiceContractError::ProviderCompensationBelowContract {
provider: self.compensation_policy,
required: contract.compensation_semantics,
});
}
if self.evidence_level < contract.evidence_requirements {
return Err(ServiceContractError::ProviderEvidenceBelowContract {
provider: self.evidence_level,
required: contract.evidence_requirements,
});
}
if !self
.mobility_constraint
.satisfies(&contract.mobility_constraints)
{
return Err(ServiceContractError::ProviderMobilityIncompatible {
provider: self.mobility_constraint.clone(),
required: contract.mobility_constraints.clone(),
});
}
for class in self.admissible_classes.admissible_classes() {
if *class < contract.durability_class {
return Err(ServiceContractError::ProviderClassBelowContractFloor {
class: *class,
required_durability: contract.durability_class,
});
}
if *class > self.guaranteed_durability {
return Err(
ServiceContractError::ProviderClassAboveGuaranteedDurability {
class: *class,
guaranteed_durability: self.guaranteed_durability,
},
);
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct CallerOptions {
pub requested_class: Option<DeliveryClass>,
pub timeout_override: Option<Duration>,
pub priority_hint: Option<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ValidatedServiceRequest {
pub delivery_class: DeliveryClass,
pub timeout: Option<Duration>,
pub priority_hint: Option<u8>,
pub guaranteed_durability: DeliveryClass,
pub evidence_level: EvidenceLevel,
pub mobility_constraint: MobilityConstraint,
pub compensation_policy: CompensationSemantics,
pub overload_policy: OverloadPolicy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ServiceFailure {
Cancelled,
TimedOut,
Rejected,
Overloaded,
TransportError,
ApplicationError,
}
impl ServiceFailure {
fn abort_reason(self) -> ObligationAbortReason {
match self {
Self::Cancelled => ObligationAbortReason::Cancel,
Self::TimedOut | Self::Rejected => ObligationAbortReason::Explicit,
Self::Overloaded | Self::TransportError | Self::ApplicationError => {
ObligationAbortReason::Error
}
}
}
}
impl fmt::Display for ServiceFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Cancelled => "cancelled",
Self::TimedOut => "timed_out",
Self::Rejected => "rejected",
Self::Overloaded => "overloaded",
Self::TransportError => "transport_error",
Self::ApplicationError => "application_error",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServiceTransferHop {
pub morphism: String,
pub callee: String,
pub subject: String,
pub transferred_at: Time,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RequestCertificate {
pub request_id: String,
pub caller: String,
pub subject: String,
pub delivery_class: DeliveryClass,
pub reply_space_rule: super::ir::ReplySpaceRule,
pub service_class: String,
pub capability_fingerprint: u64,
pub issued_at: Time,
pub timeout: Option<Duration>,
}
impl RequestCertificate {
#[must_use]
#[allow(clippy::too_many_arguments)]
pub fn from_validated(
request_id: String,
caller: String,
subject: String,
validated: &ValidatedServiceRequest,
reply_space_rule: super::ir::ReplySpaceRule,
service_class: String,
capability_fingerprint: u64,
issued_at: Time,
) -> Self {
Self {
request_id,
caller,
subject,
delivery_class: validated.delivery_class,
reply_space_rule,
service_class,
capability_fingerprint,
issued_at,
timeout: validated.timeout,
}
}
pub fn validate(&self) -> Result<(), ServiceObligationError> {
validate_service_text("request_id", &self.request_id)?;
validate_service_text("caller", &self.caller)?;
validate_service_text("subject", &self.subject)?;
validate_service_text("service_class", &self.service_class)?;
match &self.reply_space_rule {
super::ir::ReplySpaceRule::CallerInbox => {}
super::ir::ReplySpaceRule::SharedPrefix { prefix }
| super::ir::ReplySpaceRule::DedicatedPrefix { prefix } => {
validate_service_text("reply_space_rule.prefix", prefix)?;
}
}
if self.timeout.is_some_and(|d| d.is_zero()) {
return Err(ServiceObligationError::ZeroTimeout);
}
Ok(())
}
#[must_use]
pub fn digest(&self) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = crate::util::DetHasher::default();
self.request_id.hash(&mut hasher);
self.caller.hash(&mut hasher);
self.subject.hash(&mut hasher);
(self.delivery_class as u8).hash(&mut hasher);
match &self.reply_space_rule {
super::ir::ReplySpaceRule::CallerInbox => 0u8.hash(&mut hasher),
super::ir::ReplySpaceRule::SharedPrefix { prefix } => {
1u8.hash(&mut hasher);
prefix.hash(&mut hasher);
}
super::ir::ReplySpaceRule::DedicatedPrefix { prefix } => {
2u8.hash(&mut hasher);
prefix.hash(&mut hasher);
}
}
self.service_class.hash(&mut hasher);
self.capability_fingerprint.hash(&mut hasher);
self.issued_at.hash(&mut hasher);
self.timeout.hash(&mut hasher);
hasher.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReplyCertificate {
pub request_id: String,
pub callee: String,
pub delivery_class: DeliveryClass,
pub service_obligation_id: Option<ObligationId>,
pub payload_digest: u64,
pub is_chunked: bool,
pub total_chunks: Option<u32>,
pub issued_at: Time,
pub service_latency: Duration,
}
impl ReplyCertificate {
#[must_use]
pub fn from_commit(
commit: &ServiceReplyCommit,
callee: String,
issued_at: Time,
service_latency: Duration,
) -> Self {
use std::hash::{Hash, Hasher};
let mut hasher = crate::util::DetHasher::default();
commit.payload.hash(&mut hasher);
let payload_digest = hasher.finish();
Self {
request_id: commit.request_id.clone(),
callee,
delivery_class: commit.delivery_class,
service_obligation_id: commit.service_obligation_id,
payload_digest,
is_chunked: false,
total_chunks: None,
issued_at,
service_latency,
}
}
pub fn validate(&self) -> Result<(), ServiceObligationError> {
validate_service_text("request_id", &self.request_id)?;
validate_service_text("callee", &self.callee)?;
if self.is_chunked {
if self.total_chunks.is_none() {
return Err(ServiceObligationError::ChunkedReplyMissingCount);
}
} else if self.total_chunks.is_some() {
return Err(ServiceObligationError::UnaryReplyChunkCountPresent);
}
if self.delivery_class >= DeliveryClass::ObligationBacked
&& self.service_obligation_id.is_none()
{
return Err(
ServiceObligationError::TrackedReplyMissingParentObligationId {
delivery_class: self.delivery_class,
},
);
}
Ok(())
}
#[must_use]
pub fn digest(&self) -> u64 {
use std::hash::{Hash, Hasher};
let mut hasher = crate::util::DetHasher::default();
self.request_id.hash(&mut hasher);
self.callee.hash(&mut hasher);
(self.delivery_class as u8).hash(&mut hasher);
self.service_obligation_id.hash(&mut hasher);
self.payload_digest.hash(&mut hasher);
self.is_chunked.hash(&mut hasher);
self.total_chunks.hash(&mut hasher);
self.issued_at.hash(&mut hasher);
self.service_latency.hash(&mut hasher);
hasher.finish()
}
}
#[derive(Debug)]
pub struct ChunkedReplyObligation {
pub family_id: String,
pub service_obligation_id: Option<ObligationId>,
pub request_id: String,
pub expected_chunks: Option<u32>,
received_chunks: u32,
finalized: bool,
pub delivery_class: DeliveryClass,
pub chunk_ack_boundary: AckKind,
}
impl ChunkedReplyObligation {
pub fn new(
family_id: String,
request_id: String,
service_obligation_id: Option<ObligationId>,
expected_chunks: Option<u32>,
delivery_class: DeliveryClass,
chunk_ack_boundary: AckKind,
) -> Result<Self, ServiceObligationError> {
validate_service_text("family_id", &family_id)?;
validate_service_text("request_id", &request_id)?;
if expected_chunks == Some(0) {
return Err(ServiceObligationError::ChunkedReplyZeroExpected);
}
if delivery_class >= DeliveryClass::ObligationBacked && service_obligation_id.is_none() {
return Err(
ServiceObligationError::TrackedReplyMissingParentObligationId { delivery_class },
);
}
validate_reply_boundary(delivery_class, chunk_ack_boundary, false)?;
Ok(Self {
family_id,
service_obligation_id,
request_id,
expected_chunks,
received_chunks: 0,
finalized: false,
delivery_class,
chunk_ack_boundary,
})
}
pub fn receive_chunk(&mut self) -> Result<u32, ServiceObligationError> {
if self.finalized {
return Err(ServiceObligationError::AlreadyResolved {
operation: "receive chunk on finalized stream",
});
}
if let Some(expected) = self.expected_chunks {
if self.received_chunks >= expected {
return Err(ServiceObligationError::ChunkedReplyOverflow {
expected,
received: self.received_chunks + 1,
});
}
}
let index = self.received_chunks;
self.received_chunks = self.received_chunks.saturating_add(1);
Ok(index)
}
pub fn finalize(&mut self) -> Result<u32, ServiceObligationError> {
if self.finalized {
return Err(ServiceObligationError::AlreadyResolved {
operation: "finalize chunked reply",
});
}
if let Some(expected) = self.expected_chunks
&& self.received_chunks != expected
{
return Err(ServiceObligationError::ChunkedReplyIncomplete {
expected,
received: self.received_chunks,
});
}
self.finalized = true;
Ok(self.received_chunks)
}
#[must_use]
pub fn is_complete(&self) -> bool {
self.expected_chunks
.is_some_and(|expected| self.received_chunks >= expected)
}
#[must_use]
pub fn received_chunks(&self) -> u32 {
self.received_chunks
}
#[must_use]
pub fn is_finalized(&self) -> bool {
self.finalized
}
pub fn certificate(
&self,
callee: String,
payload_digest: u64,
issued_at: Time,
service_latency: Duration,
) -> Result<ReplyCertificate, ServiceObligationError> {
if !self.finalized {
return Err(ServiceObligationError::ChunkedReplyNotFinalized);
}
if let Some(expected) = self.expected_chunks
&& self.received_chunks != expected
{
return Err(ServiceObligationError::ChunkedReplyIncomplete {
expected,
received: self.received_chunks,
});
}
Ok(ReplyCertificate {
request_id: self.request_id.clone(),
callee,
delivery_class: self.delivery_class,
service_obligation_id: self.service_obligation_id,
payload_digest,
is_chunked: true,
total_chunks: Some(self.received_chunks),
issued_at,
service_latency,
})
}
}
#[derive(Debug)]
pub struct ServiceObligation {
pub request_id: String,
pub caller: String,
pub callee: String,
pub subject: String,
pub delivery_class: DeliveryClass,
pub created_at: Time,
pub timeout: Option<Duration>,
pub lineage: Vec<ServiceTransferHop>,
resolved: bool,
token: Option<ObligationToken>,
}
impl ServiceObligation {
#[track_caller]
#[allow(clippy::too_many_arguments)]
pub fn allocate(
ledger: &mut ObligationLedger,
request_id: impl Into<String>,
caller: impl Into<String>,
target: impl Into<String>,
subject: impl Into<String>,
delivery_class: DeliveryClass,
holder: TaskId,
region: RegionId,
created_at: Time,
timeout: Option<Duration>,
) -> Result<Self, ServiceObligationError> {
let request_id = request_id.into();
let caller = caller.into();
let service_target = target.into();
let subject = subject.into();
validate_service_text("request_id", &request_id)?;
validate_service_text("caller", &caller)?;
validate_service_text("callee", &service_target)?;
validate_service_text("subject", &subject)?;
if timeout.is_some_and(|value| value.is_zero()) {
return Err(ServiceObligationError::ZeroTimeout);
}
let token = if delivery_class >= DeliveryClass::ObligationBacked {
let description = format!("service request {request_id}: {caller} -> {service_target}");
Some(ledger.acquire_with_context(
ObligationKind::Lease,
holder,
region,
created_at,
SourceLocation::from_panic_location(Location::caller()),
None,
Some(description),
))
} else {
None
};
Ok(Self {
request_id,
caller,
callee: service_target,
subject,
delivery_class,
created_at,
timeout,
lineage: Vec::new(),
resolved: false,
token,
})
}
fn ensure_active(&self, operation: &'static str) -> Result<(), ServiceObligationError> {
if self.resolved {
return Err(ServiceObligationError::AlreadyResolved { operation });
}
Ok(())
}
#[must_use]
pub fn obligation_id(&self) -> Option<ObligationId> {
self.token.as_ref().map(ObligationToken::id)
}
#[must_use]
pub fn is_tracked(&self) -> bool {
self.token.is_some()
}
pub fn transfer(
&mut self,
callee: impl Into<String>,
subject: impl Into<String>,
morphism: impl Into<String>,
transferred_at: Time,
) -> Result<(), ServiceObligationError> {
self.ensure_active("transfer")?;
let callee = callee.into();
let subject = subject.into();
let morphism = morphism.into();
validate_service_text("transfer.callee", &callee)?;
validate_service_text("transfer.subject", &subject)?;
validate_service_text("transfer.morphism", &morphism)?;
self.callee.clone_from(&callee);
self.subject.clone_from(&subject);
self.lineage.push(ServiceTransferHop {
morphism,
callee,
subject,
transferred_at,
});
Ok(())
}
#[track_caller]
pub fn commit_with_reply(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
payload: impl Into<Vec<u8>>,
delivery_boundary: AckKind,
receipt_required: bool,
) -> Result<ServiceReplyCommit, ServiceObligationError> {
self.ensure_active("commit_with_reply")?;
validate_reply_boundary(self.delivery_class, delivery_boundary, receipt_required)?;
let service_obligation_id = self.obligation_id();
let payload = payload.into();
let reply_obligation = if let Some(token) = self.token.take() {
let holder = token.holder();
let region = token.region();
let service_obligation_id = token.id();
ledger.commit(token, now);
if requires_follow_up_reply(delivery_boundary, receipt_required) {
Some(ReplyObligation::allocate(
ledger,
service_obligation_id,
holder,
region,
now,
payload.clone(),
delivery_boundary,
receipt_required,
))
} else {
None
}
} else if requires_follow_up_reply(delivery_boundary, receipt_required) {
return Err(ServiceObligationError::ReplyTrackingUnavailable {
delivery_class: self.delivery_class,
requested_boundary: delivery_boundary,
receipt_required,
});
} else {
None
};
self.resolved = true;
Ok(ServiceReplyCommit {
request_id: self.request_id.clone(),
service_obligation_id,
payload,
delivery_class: self.delivery_class,
reply_obligation,
})
}
pub fn abort(
mut self,
ledger: &mut ObligationLedger,
now: Time,
failure: ServiceFailure,
) -> Result<ServiceAbortReceipt, ServiceObligationError> {
self.ensure_active("abort")?;
let obligation_id = self.obligation_id();
if let Some(token) = self.token.take() {
ledger.abort(token, now, failure.abort_reason());
}
Ok(ServiceAbortReceipt {
request_id: self.request_id,
obligation_id,
failure,
delivery_class: self.delivery_class,
})
}
pub fn timeout(
self,
ledger: &mut ObligationLedger,
now: Time,
) -> Result<ServiceAbortReceipt, ServiceObligationError> {
self.ensure_active("timeout")?;
self.abort(ledger, now, ServiceFailure::TimedOut)
}
}
#[derive(Debug)]
pub struct ServiceReplyCommit {
pub request_id: String,
pub service_obligation_id: Option<ObligationId>,
pub payload: Vec<u8>,
pub delivery_class: DeliveryClass,
pub reply_obligation: Option<ReplyObligation>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServiceAbortReceipt {
pub request_id: String,
pub obligation_id: Option<ObligationId>,
pub failure: ServiceFailure,
pub delivery_class: DeliveryClass,
}
#[derive(Debug)]
pub struct ReplyObligation {
pub service_obligation_id: ObligationId,
pub delivery_boundary: AckKind,
pub receipt_required: bool,
pub payload: Vec<u8>,
obligation_id: ObligationId,
token: Option<ObligationToken>,
}
impl ReplyObligation {
#[track_caller]
#[allow(clippy::too_many_arguments)]
fn allocate(
ledger: &mut ObligationLedger,
service_obligation_id: ObligationId,
holder: TaskId,
region: RegionId,
created_at: Time,
payload: Vec<u8>,
delivery_boundary: AckKind,
receipt_required: bool,
) -> Self {
let description = format!("reply obligation for service {service_obligation_id:?}");
let token = ledger.acquire_with_context(
ObligationKind::Ack,
holder,
region,
created_at,
SourceLocation::from_panic_location(Location::caller()),
None,
Some(description),
);
let obligation_id = token.id();
Self {
service_obligation_id,
delivery_boundary,
receipt_required,
payload,
obligation_id,
token: Some(token),
}
}
#[must_use]
pub const fn obligation_id(&self) -> ObligationId {
self.obligation_id
}
pub fn commit_delivery(
mut self,
ledger: &mut ObligationLedger,
now: Time,
) -> ReplyDeliveryReceipt {
let token = self
.token
.take()
.expect("reply obligation token must be present until resolved");
ledger.commit(token, now);
ReplyDeliveryReceipt {
obligation_id: self.obligation_id,
service_obligation_id: self.service_obligation_id,
delivery_boundary: self.delivery_boundary,
receipt_required: self.receipt_required,
}
}
pub fn abort_delivery(
mut self,
ledger: &mut ObligationLedger,
now: Time,
failure: ServiceFailure,
) -> ReplyAbortReceipt {
let token = self
.token
.take()
.expect("reply obligation token must be present until resolved");
ledger.abort(token, now, failure.abort_reason());
ReplyAbortReceipt {
obligation_id: self.obligation_id,
service_obligation_id: self.service_obligation_id,
delivery_boundary: self.delivery_boundary,
failure,
}
}
pub fn timeout(self, ledger: &mut ObligationLedger, now: Time) -> ReplyAbortReceipt {
self.abort_delivery(ledger, now, ServiceFailure::TimedOut)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplyDeliveryReceipt {
pub obligation_id: ObligationId,
pub service_obligation_id: ObligationId,
pub delivery_boundary: AckKind,
pub receipt_required: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplyAbortReceipt {
pub obligation_id: ObligationId,
pub service_obligation_id: ObligationId,
pub delivery_boundary: AckKind,
pub failure: ServiceFailure,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum RetryLaw {
None,
Fixed {
interval: Duration,
max_attempts: u32,
},
ExponentialBackoff {
initial_delay: Duration,
multiplier: f64,
max_delay: Duration,
max_attempts: u32,
},
BudgetBounded {
interval: Duration,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum MonitoringPolicy {
Passive,
Sampled {
sampling_ratio: f64,
window_size: u32,
},
EProcess {
confidence: f64,
max_evidence: f64,
},
Conformal {
target_coverage: f64,
calibration_size: u32,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct QuantitativeContract {
pub name: String,
pub delivery_class: DeliveryClass,
pub target_latency: Duration,
pub target_probability: f64,
pub retry_law: RetryLaw,
pub monitoring_policy: MonitoringPolicy,
pub record_violations: bool,
}
#[derive(Debug, Clone, PartialEq, Error)]
pub enum QuantitativeContractError {
#[error("quantitative contract name must not be empty")]
EmptyName,
#[error("target latency must be greater than zero")]
ZeroLatency,
#[error("target probability {0} must be in (0.0, 1.0]")]
InvalidProbability(f64),
#[error("exponential backoff multiplier {0} must be > 1.0")]
InvalidMultiplier(f64),
#[error("max_attempts must be >= 1")]
ZeroMaxAttempts,
#[error("sampling ratio {0} must be in (0.0, 1.0]")]
InvalidSamplingRatio(f64),
#[error("e-process confidence {0} must be in (0.0, 1.0)")]
InvalidConfidence(f64),
#[error("conformal target coverage {0} must be in (0.0, 1.0)")]
InvalidCoverage(f64),
#[error("conformal calibration_size must be > 0")]
ZeroCalibrationSize,
#[error("retry interval must be > 0")]
ZeroRetryInterval,
#[error("max delay must be > 0")]
ZeroMaxDelay,
#[error(
"max delay {max_delay:?} must be greater than or equal to initial delay {initial_delay:?}"
)]
MaxDelayBelowInitialDelay {
initial_delay: Duration,
max_delay: Duration,
},
#[error("initial delay must be > 0")]
ZeroInitialDelay,
#[error("monitoring window_size must be > 0")]
ZeroWindowSize,
#[error("e-process max_evidence must be > 0")]
ZeroMaxEvidence,
#[error(
"e-process max_evidence {max_evidence} must be greater than or equal to alert threshold {threshold}"
)]
MaxEvidenceBelowAlertThreshold {
max_evidence: f64,
threshold: f64,
},
}
impl QuantitativeContract {
pub fn validate(&self) -> Result<(), QuantitativeContractError> {
if self.name.trim().is_empty() {
return Err(QuantitativeContractError::EmptyName);
}
if self.target_latency.is_zero() {
return Err(QuantitativeContractError::ZeroLatency);
}
if !is_finite_probability(self.target_probability) {
return Err(QuantitativeContractError::InvalidProbability(
self.target_probability,
));
}
self.validate_retry_law()?;
self.validate_monitoring_policy()?;
Ok(())
}
fn validate_retry_law(&self) -> Result<(), QuantitativeContractError> {
match &self.retry_law {
RetryLaw::None => {}
RetryLaw::Fixed {
interval,
max_attempts,
} => {
if interval.is_zero() {
return Err(QuantitativeContractError::ZeroRetryInterval);
}
if *max_attempts == 0 {
return Err(QuantitativeContractError::ZeroMaxAttempts);
}
}
RetryLaw::ExponentialBackoff {
initial_delay,
multiplier,
max_delay,
max_attempts,
} => {
if initial_delay.is_zero() {
return Err(QuantitativeContractError::ZeroInitialDelay);
}
if !is_finite_gt_one(*multiplier) {
return Err(QuantitativeContractError::InvalidMultiplier(*multiplier));
}
if max_delay.is_zero() {
return Err(QuantitativeContractError::ZeroMaxDelay);
}
if max_delay < initial_delay {
return Err(QuantitativeContractError::MaxDelayBelowInitialDelay {
initial_delay: *initial_delay,
max_delay: *max_delay,
});
}
if *max_attempts == 0 {
return Err(QuantitativeContractError::ZeroMaxAttempts);
}
}
RetryLaw::BudgetBounded { interval } => {
if interval.is_zero() {
return Err(QuantitativeContractError::ZeroRetryInterval);
}
}
}
Ok(())
}
fn validate_monitoring_policy(&self) -> Result<(), QuantitativeContractError> {
match &self.monitoring_policy {
MonitoringPolicy::Passive => {}
MonitoringPolicy::Sampled {
sampling_ratio,
window_size,
} => {
if !is_finite_probability(*sampling_ratio) {
return Err(QuantitativeContractError::InvalidSamplingRatio(
*sampling_ratio,
));
}
if *window_size == 0 {
return Err(QuantitativeContractError::ZeroWindowSize);
}
}
MonitoringPolicy::EProcess {
confidence,
max_evidence,
} => {
if !is_finite_open_probability(*confidence) {
return Err(QuantitativeContractError::InvalidConfidence(*confidence));
}
if !is_finite_positive(*max_evidence) {
return Err(QuantitativeContractError::ZeroMaxEvidence);
}
let threshold = quantitative_eprocess_threshold(*confidence);
if *max_evidence < threshold {
return Err(QuantitativeContractError::MaxEvidenceBelowAlertThreshold {
max_evidence: *max_evidence,
threshold,
});
}
}
MonitoringPolicy::Conformal {
target_coverage,
calibration_size,
} => {
if !is_finite_open_probability(*target_coverage) {
return Err(QuantitativeContractError::InvalidCoverage(*target_coverage));
}
if *calibration_size == 0 {
return Err(QuantitativeContractError::ZeroCalibrationSize);
}
}
}
Ok(())
}
#[must_use]
pub fn latency_satisfies(&self, measured: Duration) -> bool {
measured <= self.target_latency
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum QuantitativeContractState {
#[default]
Healthy,
AtRisk,
Violated,
}
impl fmt::Display for QuantitativeContractState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::Healthy => "healthy",
Self::AtRisk => "at_risk",
Self::Violated => "violated",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum QuantitativePolicyRecommendation {
#[default]
KeepCurrent,
ApplyRetryLaw,
Escalate,
}
impl fmt::Display for QuantitativePolicyRecommendation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let name = match self {
Self::KeepCurrent => "keep_current",
Self::ApplyRetryLaw => "apply_retry_law",
Self::Escalate => "escalate",
};
write!(f, "{name}")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QuantitativeMonitorAlertState {
Clear,
Watching,
Alert,
}
impl From<EProcessAlertState> for QuantitativeMonitorAlertState {
fn from(value: EProcessAlertState) -> Self {
match value {
EProcessAlertState::Clear => Self::Clear,
EProcessAlertState::Watching => Self::Watching,
EProcessAlertState::Alert => Self::Alert,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum QuantitativeMonitorEvidence {
Passive,
Sampled {
sampling_ratio: f64,
sampled_observations: u64,
window_size: u32,
window_hit_rate: f64,
},
EProcess {
confidence: f64,
e_value: f64,
threshold: f64,
max_evidence: f64,
capped: bool,
alert_state: QuantitativeMonitorAlertState,
},
Conformal {
target_coverage: f64,
calibration_size: u32,
calibration_samples: usize,
threshold_latency: Option<Duration>,
coverage: Option<f64>,
latest_conforming: Option<bool>,
},
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct QuantitativeContractEvaluation {
pub contract_name: String,
pub delivery_class: DeliveryClass,
pub latest_latency: Duration,
pub target_latency: Duration,
pub observations: u64,
pub hit_count: u64,
pub observed_probability: f64,
pub target_probability: f64,
pub state: QuantitativeContractState,
pub recommendation: QuantitativePolicyRecommendation,
pub evidence: QuantitativeMonitorEvidence,
}
#[derive(Debug)]
pub struct QuantitativeContractMonitor {
contract: QuantitativeContract,
observations: u64,
hit_count: u64,
sampled_window: VecDeque<bool>,
sampled_observations: u64,
eprocess: Option<LeakMonitor>,
conformal: Option<HealthThresholdCalibrator>,
last_evaluation: Option<QuantitativeContractEvaluation>,
}
impl QuantitativeContractMonitor {
pub fn new(contract: QuantitativeContract) -> Result<Self, QuantitativeContractError> {
contract.validate()?;
let eprocess = match contract.monitoring_policy {
MonitoringPolicy::EProcess {
confidence,
max_evidence: _,
} => Some(new_quantitative_eprocess(
contract.target_latency,
confidence,
)),
_ => None,
};
let conformal = match contract.monitoring_policy {
MonitoringPolicy::Conformal {
target_coverage,
calibration_size,
} => Some(HealthThresholdCalibrator::new(
HealthThresholdConfig::new(1.0 - target_coverage, ThresholdMode::Upper)
.min_samples(usize::try_from(calibration_size).expect("u32 fits usize")),
)),
_ => None,
};
Ok(Self {
contract,
observations: 0,
hit_count: 0,
sampled_window: VecDeque::new(),
sampled_observations: 0,
eprocess,
conformal,
last_evaluation: None,
})
}
#[must_use]
pub fn contract(&self) -> &QuantitativeContract {
&self.contract
}
#[must_use]
pub fn observed_probability(&self) -> f64 {
ratio(self.hit_count, self.observations)
}
#[must_use]
pub fn last_evaluation(&self) -> Option<&QuantitativeContractEvaluation> {
self.last_evaluation.as_ref()
}
#[must_use]
pub fn policy_change_evidence(&self) -> Option<QuantitativeContractEvaluation> {
let evaluation = self.last_evaluation.as_ref()?;
if self.contract.record_violations && evaluation.state != QuantitativeContractState::Healthy
{
Some(evaluation.clone())
} else {
None
}
}
#[allow(clippy::too_many_lines)]
pub fn observe_latency(&mut self, latency: Duration) -> QuantitativeContractEvaluation {
self.observations = self.observations.saturating_add(1);
let hit = self.contract.latency_satisfies(latency);
if hit {
self.hit_count = self.hit_count.saturating_add(1);
}
let observed_probability = self.observed_probability();
let mut state = self.baseline_state(hit, observed_probability);
let evidence = match &self.contract.monitoring_policy {
MonitoringPolicy::Passive => QuantitativeMonitorEvidence::Passive,
MonitoringPolicy::Sampled {
sampling_ratio,
window_size,
} => {
let window_size_usize = usize::try_from(*window_size).expect("u32 fits usize");
if should_sample_observation(self.observations, *sampling_ratio) {
self.sampled_observations = self.sampled_observations.saturating_add(1);
self.sampled_window.push_back(hit);
while self.sampled_window.len() > window_size_usize {
self.sampled_window.pop_front();
}
}
let window_hit_rate = bool_ratio(&self.sampled_window);
if self.sampled_window.len() >= window_size_usize
&& window_hit_rate < self.contract.target_probability
{
state = state.max(QuantitativeContractState::Violated);
}
QuantitativeMonitorEvidence::Sampled {
sampling_ratio: *sampling_ratio,
sampled_observations: self.sampled_observations,
window_size: *window_size,
window_hit_rate,
}
}
MonitoringPolicy::EProcess {
confidence,
max_evidence,
} => {
let (reported_e_value, threshold, capped, alert_state) = {
let Some(monitor) = self.eprocess.as_mut() else {
unreachable!("e-process monitor must exist for e-process policies");
};
monitor.observe(duration_to_monitor_nanos(latency));
let raw_e_value = monitor.e_value();
let capped = raw_e_value >= *max_evidence;
(
raw_e_value.min(*max_evidence),
monitor.threshold(),
capped,
QuantitativeMonitorAlertState::from(monitor.alert_state()),
)
};
state = match alert_state {
QuantitativeMonitorAlertState::Clear => state,
QuantitativeMonitorAlertState::Watching => {
state.max(QuantitativeContractState::AtRisk)
}
QuantitativeMonitorAlertState::Alert => QuantitativeContractState::Violated,
};
if capped && matches!(alert_state, QuantitativeMonitorAlertState::Alert) {
self.eprocess = Some(new_quantitative_eprocess(
self.contract.target_latency,
*confidence,
));
}
QuantitativeMonitorEvidence::EProcess {
confidence: *confidence,
e_value: reported_e_value,
threshold,
max_evidence: *max_evidence,
capped,
alert_state,
}
}
MonitoringPolicy::Conformal {
target_coverage,
calibration_size,
} => {
let Some(calibrator) = self.conformal.as_mut() else {
unreachable!("conformal monitor must exist for conformal policies");
};
let metric = self.contract.name.as_str();
let latency_ms = duration_to_millis(latency);
let (threshold_latency, coverage, latest_conforming) = if calibrator
.is_metric_calibrated(metric)
{
let Some(check) = calibrator.check_and_track(metric, latency_ms) else {
unreachable!("calibrated conformal monitor must return a check");
};
let coverage = calibrator.coverage_rates().get(metric).copied();
if !check.conforming || coverage.is_some_and(|value| value < *target_coverage) {
state = state.max(QuantitativeContractState::AtRisk);
}
(
Some(duration_from_millis(check.threshold)),
coverage,
Some(check.conforming),
)
} else {
calibrator.calibrate(metric, latency_ms);
(
calibrator.threshold(metric).map(duration_from_millis),
None,
None,
)
};
let calibration_samples =
calibrator.metric_counts().get(metric).copied().unwrap_or(0);
QuantitativeMonitorEvidence::Conformal {
target_coverage: *target_coverage,
calibration_size: *calibration_size,
calibration_samples,
threshold_latency,
coverage,
latest_conforming,
}
}
};
let recommendation = match state {
QuantitativeContractState::Healthy => QuantitativePolicyRecommendation::KeepCurrent,
QuantitativeContractState::AtRisk => {
if matches!(self.contract.retry_law, RetryLaw::None) {
QuantitativePolicyRecommendation::Escalate
} else {
QuantitativePolicyRecommendation::ApplyRetryLaw
}
}
QuantitativeContractState::Violated => QuantitativePolicyRecommendation::Escalate,
};
let evaluation = QuantitativeContractEvaluation {
contract_name: self.contract.name.clone(),
delivery_class: self.contract.delivery_class,
latest_latency: latency,
target_latency: self.contract.target_latency,
observations: self.observations,
hit_count: self.hit_count,
observed_probability,
target_probability: self.contract.target_probability,
state,
recommendation,
evidence,
};
self.last_evaluation = Some(evaluation.clone());
evaluation
}
fn baseline_state(&self, hit: bool, observed_probability: f64) -> QuantitativeContractState {
if self.observations >= 3 && observed_probability < self.contract.target_probability {
QuantitativeContractState::Violated
} else if !hit || observed_probability < self.contract.target_probability {
QuantitativeContractState::AtRisk
} else {
QuantitativeContractState::Healthy
}
}
}
fn new_quantitative_eprocess(target_latency: Duration, confidence: f64) -> LeakMonitor {
let alpha = quantitative_eprocess_alpha(confidence);
LeakMonitor::new(LeakMonitorConfig {
alpha,
expected_lifetime_ns: duration_to_monitor_nanos(target_latency),
min_observations: 3,
})
}
fn quantitative_eprocess_alpha(confidence: f64) -> f64 {
(1.0 - confidence).clamp(f64::EPSILON, 1.0 - f64::EPSILON)
}
fn quantitative_eprocess_threshold(confidence: f64) -> f64 {
1.0 / quantitative_eprocess_alpha(confidence)
}
#[allow(clippy::cast_possible_truncation)]
fn duration_to_monitor_nanos(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
fn duration_to_millis(duration: Duration) -> f64 {
duration.as_secs_f64() * 1000.0
}
fn duration_from_millis(millis: f64) -> Duration {
if !millis.is_finite() || millis <= 0.0 {
Duration::ZERO
} else {
Duration::from_secs_f64(millis / 1000.0)
}
}
#[allow(clippy::cast_precision_loss)]
fn ratio(numerator: u64, denominator: u64) -> f64 {
if denominator == 0 {
0.0
} else {
numerator as f64 / denominator as f64
}
}
fn bool_ratio(values: &VecDeque<bool>) -> f64 {
let hits =
u64::try_from(values.iter().filter(|value| **value).count()).expect("usize fits u64");
let len = u64::try_from(values.len()).expect("usize fits u64");
ratio(hits, len)
}
#[allow(clippy::cast_precision_loss)]
fn should_sample_observation(index: u64, sampling_ratio: f64) -> bool {
if sampling_ratio >= 1.0 {
return true;
}
if index == 0 {
return false;
}
let previous_bucket = ((index - 1) as f64 * sampling_ratio).floor();
let current_bucket = (index as f64 * sampling_ratio).floor();
current_bucket > previous_bucket
}
fn is_finite_positive(value: f64) -> bool {
value.is_finite() && value > 0.0
}
fn is_finite_gt_one(value: f64) -> bool {
value.is_finite() && value > 1.0
}
fn is_finite_probability(value: f64) -> bool {
value.is_finite() && value > 0.0 && value <= 1.0
}
fn is_finite_open_probability(value: f64) -> bool {
value.is_finite() && value > 0.0 && value < 1.0
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum ServiceObligationError {
#[error("service obligation field `{field}` must not be empty")]
EmptyField {
field: &'static str,
},
#[error("service obligation timeout must be greater than zero")]
ZeroTimeout,
#[error("service obligation already resolved; cannot {operation}")]
AlreadyResolved {
operation: &'static str,
},
#[error(
"reply boundary `{requested_boundary}` is weaker than minimum `{minimum_boundary}` for delivery class `{delivery_class}`"
)]
ReplyBoundaryBelowMinimum {
delivery_class: DeliveryClass,
minimum_boundary: AckKind,
requested_boundary: AckKind,
},
#[error(
"receipt-required replies must use the `received` boundary, not `{requested_boundary}`"
)]
ReceiptRequiresReceivedBoundary {
requested_boundary: AckKind,
},
#[error(
"delivery class `{delivery_class}` cannot support tracked reply boundary `{requested_boundary}` (receipt_required={receipt_required})"
)]
ReplyTrackingUnavailable {
delivery_class: DeliveryClass,
requested_boundary: AckKind,
receipt_required: bool,
},
#[error(
"delivery class `{delivery_class}` requires a parent service obligation id for tracked reply state"
)]
TrackedReplyMissingParentObligationId {
delivery_class: DeliveryClass,
},
#[error("chunked reply certificate must declare total_chunks")]
ChunkedReplyMissingCount,
#[error("unary reply certificate must not declare total_chunks")]
UnaryReplyChunkCountPresent,
#[error("chunked reply expected_chunks must be > 0")]
ChunkedReplyZeroExpected,
#[error("chunked reply certificate requires a finalized stream")]
ChunkedReplyNotFinalized,
#[error("chunked reply incomplete: expected {expected}, received {received}")]
ChunkedReplyIncomplete {
expected: u32,
received: u32,
},
#[error("chunked reply overflow: expected {expected}, received {received}")]
ChunkedReplyOverflow {
expected: u32,
received: u32,
},
}
fn validate_service_text(field: &'static str, value: &str) -> Result<(), ServiceObligationError> {
if value.trim().is_empty() {
return Err(ServiceObligationError::EmptyField { field });
}
Ok(())
}
fn requires_follow_up_reply(delivery_boundary: AckKind, receipt_required: bool) -> bool {
receipt_required || delivery_boundary > AckKind::Served
}
fn validate_reply_boundary(
delivery_class: DeliveryClass,
delivery_boundary: AckKind,
receipt_required: bool,
) -> Result<(), ServiceObligationError> {
let minimum_boundary = delivery_class.minimum_ack();
if delivery_boundary < minimum_boundary {
return Err(ServiceObligationError::ReplyBoundaryBelowMinimum {
delivery_class,
minimum_boundary,
requested_boundary: delivery_boundary,
});
}
if receipt_required && delivery_boundary != AckKind::Received {
return Err(ServiceObligationError::ReceiptRequiresReceivedBoundary {
requested_boundary: delivery_boundary,
});
}
if delivery_class < DeliveryClass::ObligationBacked
&& requires_follow_up_reply(delivery_boundary, receipt_required)
{
return Err(ServiceObligationError::ReplyTrackingUnavailable {
delivery_class,
requested_boundary: delivery_boundary,
receipt_required,
});
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServiceRegistration {
pub service_name: String,
pub contract: ServiceContractSchema,
pub provider_terms: ProviderTerms,
}
impl ServiceRegistration {
pub fn new(
service_name: impl Into<String>,
contract: ServiceContractSchema,
provider_terms: ProviderTerms,
) -> Result<Self, ServiceContractError> {
let service_name = service_name.into();
if service_name.trim().is_empty() {
return Err(ServiceContractError::EmptyServiceName);
}
contract.validate()?;
provider_terms.validate_against(&contract)?;
Ok(Self {
service_name,
contract,
provider_terms,
})
}
pub fn validate_caller(
&self,
caller: &CallerOptions,
) -> Result<ValidatedServiceRequest, ServiceContractError> {
if caller
.timeout_override
.is_some_and(|timeout| timeout.is_zero())
{
return Err(ServiceContractError::ZeroDuration {
field: "caller_options.timeout_override".to_owned(),
});
}
if caller.timeout_override.is_some()
&& !self.contract.budget_semantics.allow_timeout_override
{
return Err(ServiceContractError::TimeoutOverrideNotAllowed);
}
if let (Some(requested_timeout), Some(default_timeout)) = (
caller.timeout_override,
self.contract.budget_semantics.default_timeout,
) && requested_timeout > default_timeout
{
return Err(ServiceContractError::TimeoutOverrideExceedsDefault {
requested_timeout,
default_timeout,
});
}
if caller.priority_hint.is_some() && !self.contract.budget_semantics.honor_priority_hints {
return Err(ServiceContractError::PriorityHintsNotAllowed);
}
let delivery_class = self
.provider_terms
.admissible_classes
.select_for_caller(caller.requested_class)?;
Ok(ValidatedServiceRequest {
delivery_class,
timeout: caller
.timeout_override
.or(self.contract.budget_semantics.default_timeout),
priority_hint: caller.priority_hint,
guaranteed_durability: self.provider_terms.guaranteed_durability,
evidence_level: self.provider_terms.evidence_level,
mobility_constraint: self.provider_terms.mobility_constraint.clone(),
compensation_policy: self.provider_terms.compensation_policy,
overload_policy: self.contract.overload_policy.clone(),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum ServiceBoundaryError {
#[error("service subject `{subject}` is outside namespace `{namespace}`")]
SubjectOutsideNamespace {
subject: String,
namespace: String,
},
#[error(
"service boundary subject `{subject}` is not covered by morphism source language `{source_language}`"
)]
MorphismDoesNotCoverBoundary {
subject: String,
source_language: String,
},
#[error(
"target subject `{subject}` is not covered by morphism destination language `{dest_language}`"
)]
TargetOutsideMorphismDestination {
subject: String,
dest_language: String,
},
#[error(
"subject-only service transfer to `{target_subject}` requires exact morphism destination, got `{dest_language}`"
)]
TransferRequiresExactMorphismDestination {
target_subject: String,
dest_language: String,
},
#[error(
"subject-only service transfer to `{target_subject}` requires unrestricted mobility, got `{constraint}`"
)]
TransferRequiresUnrestrictedMobility {
constraint: MobilityConstraint,
target_subject: String,
},
#[error(
"service admission request `{admission_request_id}` does not match obligation request `{obligation_request_id}`"
)]
AdmissionRequestMismatch {
admission_request_id: String,
obligation_request_id: String,
},
#[error(
"service admission subject `{admission_subject}` does not match boundary subject `{boundary_subject}`"
)]
AdmissionSubjectMismatch {
admission_subject: String,
boundary_subject: String,
},
#[error(
"service admission class `{admission_service}` does not match boundary service `{boundary_service}`"
)]
AdmissionServiceMismatch {
admission_service: String,
boundary_service: String,
},
#[error(
"service admission caller `{admission_caller}` does not match obligation caller `{obligation_caller}`"
)]
AdmissionCallerMismatch {
admission_caller: String,
obligation_caller: String,
},
#[error(
"service admission delivery class `{admission_class}` does not match obligation delivery class `{obligation_class}`"
)]
AdmissionDeliveryClassMismatch {
admission_class: DeliveryClass,
obligation_class: DeliveryClass,
},
#[error(
"service admission timeout {admission_timeout:?} does not match obligation timeout {obligation_timeout:?}"
)]
AdmissionTimeoutMismatch {
admission_timeout: Option<Duration>,
obligation_timeout: Option<Duration>,
},
#[error(
"service obligation subject `{subject}` is not currently owned by boundary subject `{boundary_subject}`"
)]
ObligationOutsideBoundary {
subject: String,
boundary_subject: String,
},
#[error(transparent)]
Namespace(#[from] NamespaceKernelError),
#[error(transparent)]
Contract(#[from] ServiceContractError),
#[error(transparent)]
ControlRegistry(#[from] ControlRegistryError),
#[error(transparent)]
MorphismCompile(#[from] MorphismCompileError),
#[error(transparent)]
Obligation(#[from] ServiceObligationError),
#[error(
"recursive import transfer: target subject `{target_subject}` matches boundary subject `{boundary_subject}`"
)]
RecursiveImportTransfer {
target_subject: String,
boundary_subject: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ServiceControlSubjects {
pub discovery: Subject,
pub health: Subject,
pub advisories: Subject,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ServiceControlHandlers {
pub health: ControlHandlerId,
pub advisories: ControlHandlerId,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServiceAdmission {
pub validated: ValidatedServiceRequest,
pub certificate: RequestCertificate,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ImportTransferRequest {
pub callee: String,
pub target_subject: Subject,
pub morphism_name: String,
pub requested_reply_space: Option<super::ir::ReplySpaceRule>,
pub transferred_at: Time,
}
impl ImportTransferRequest {
#[must_use]
pub fn new(
callee: impl Into<String>,
target_subject: Subject,
morphism_name: impl Into<String>,
requested_reply_space: Option<super::ir::ReplySpaceRule>,
transferred_at: Time,
) -> Self {
Self {
callee: callee.into(),
target_subject,
morphism_name: morphism_name.into(),
requested_reply_space,
transferred_at,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FabricServiceBoundary {
namespace: NamespaceKernel,
request_subject: Subject,
registration: ServiceRegistration,
}
impl FabricServiceBoundary {
pub fn new(
namespace: NamespaceKernel,
request_subject: Subject,
registration: ServiceRegistration,
) -> Result<Self, ServiceBoundaryError> {
if !namespace.owns_subject(&request_subject) {
return Err(ServiceBoundaryError::SubjectOutsideNamespace {
subject: request_subject.as_str().to_owned(),
namespace: namespace.service_pattern().as_str().to_owned(),
});
}
Ok(Self {
namespace,
request_subject,
registration,
})
}
#[must_use]
pub fn namespace(&self) -> &NamespaceKernel {
&self.namespace
}
#[must_use]
pub fn request_subject(&self) -> &Subject {
&self.request_subject
}
#[must_use]
pub fn registration(&self) -> &ServiceRegistration {
&self.registration
}
pub fn control_subjects(&self) -> Result<ServiceControlSubjects, ServiceBoundaryError> {
Ok(ServiceControlSubjects {
discovery: self.namespace.service_discovery_subject(),
health: self.health_scope().subject("status")?,
advisories: self.advisory_scope().subject("advisory")?,
})
}
pub fn register_control_handlers(
&self,
registry: &mut ControlRegistry,
) -> Result<ServiceControlHandlers, ServiceBoundaryError> {
Ok(ServiceControlHandlers {
health: registry.register_namespace_default(&self.health_scope())?,
advisories: registry.register_namespace_default(&self.advisory_scope())?,
})
}
pub fn admit_request(
&self,
request_id: impl Into<String>,
caller: impl Into<String>,
caller_options: &CallerOptions,
reply_space_rule: super::ir::ReplySpaceRule,
capability_fingerprint: u64,
issued_at: Time,
) -> Result<ServiceAdmission, ServiceBoundaryError> {
let validated = self.registration.validate_caller(caller_options)?;
let certificate = RequestCertificate::from_validated(
request_id.into(),
caller.into(),
self.request_subject.as_str().to_owned(),
&validated,
reply_space_rule,
self.registration.service_name.clone(),
capability_fingerprint,
issued_at,
);
certificate.validate()?;
Ok(ServiceAdmission {
validated,
certificate,
})
}
pub fn compile_import_plan(
&self,
morphism: &Morphism,
requested_reply_space: Option<super::ir::ReplySpaceRule>,
) -> Result<ImportPlan, ServiceBoundaryError> {
self.ensure_morphism_covers_boundary(morphism)?;
Ok(morphism.compile_import_plan(requested_reply_space)?)
}
pub fn compile_export_plan(
&self,
morphism: &Morphism,
requested_reply_space: Option<super::ir::ReplySpaceRule>,
) -> Result<ExportPlan, ServiceBoundaryError> {
self.ensure_morphism_covers_boundary(morphism)?;
Ok(morphism.compile_export_plan(requested_reply_space)?)
}
pub fn transfer_request_via_import(
&self,
admission: &ServiceAdmission,
obligation: &mut ServiceObligation,
request: ImportTransferRequest,
morphism: &Morphism,
) -> Result<ImportPlan, ServiceBoundaryError> {
let ImportTransferRequest {
callee,
target_subject,
morphism_name,
requested_reply_space,
transferred_at,
} = request;
self.ensure_transfer_request_matches(admission, obligation)?;
Self::ensure_transfer_mobility(&admission.validated, &target_subject)?;
Self::ensure_morphism_target(morphism, &target_subject)?;
Self::ensure_exact_morphism_target(morphism, &target_subject)?;
if target_subject.as_str() == self.request_subject.as_str() {
return Err(ServiceBoundaryError::RecursiveImportTransfer {
target_subject: target_subject.as_str().to_owned(),
boundary_subject: self.request_subject.as_str().to_owned(),
});
}
let plan = self.compile_import_plan(morphism, requested_reply_space)?;
obligation.transfer(
callee,
target_subject.as_str(),
morphism_name,
transferred_at,
)?;
Ok(plan)
}
pub fn cancel_request(
&self,
obligation: ServiceObligation,
ledger: &mut ObligationLedger,
aborted_at: Time,
) -> Result<ServiceAbortReceipt, ServiceBoundaryError> {
Ok(obligation.abort(ledger, aborted_at, ServiceFailure::Cancelled)?)
}
fn health_scope(&self) -> NamespaceControlScope {
NamespaceControlScope::from_namespace(SystemSubjectFamily::Health, &self.namespace)
}
fn advisory_scope(&self) -> NamespaceControlScope {
NamespaceControlScope::from_namespace(SystemSubjectFamily::Route, &self.namespace)
}
fn ensure_morphism_covers_boundary(
&self,
morphism: &Morphism,
) -> Result<(), ServiceBoundaryError> {
if morphism.source_language.matches(&self.request_subject) {
Ok(())
} else {
Err(ServiceBoundaryError::MorphismDoesNotCoverBoundary {
subject: self.request_subject.as_str().to_owned(),
source_language: morphism.source_language.as_str().to_owned(),
})
}
}
fn ensure_morphism_target(
morphism: &Morphism,
target_subject: &Subject,
) -> Result<(), ServiceBoundaryError> {
if morphism.dest_language.matches(target_subject) {
Ok(())
} else {
Err(ServiceBoundaryError::TargetOutsideMorphismDestination {
subject: target_subject.as_str().to_owned(),
dest_language: morphism.dest_language.as_str().to_owned(),
})
}
}
fn ensure_exact_morphism_target(
morphism: &Morphism,
target_subject: &Subject,
) -> Result<(), ServiceBoundaryError> {
if morphism.dest_language.as_str() == target_subject.as_str() {
Ok(())
} else {
Err(
ServiceBoundaryError::TransferRequiresExactMorphismDestination {
target_subject: target_subject.as_str().to_owned(),
dest_language: morphism.dest_language.as_str().to_owned(),
},
)
}
}
fn ensure_transfer_mobility(
validated: &ValidatedServiceRequest,
target_subject: &Subject,
) -> Result<(), ServiceBoundaryError> {
if validated.mobility_constraint == MobilityConstraint::Unrestricted {
Ok(())
} else {
Err(ServiceBoundaryError::TransferRequiresUnrestrictedMobility {
constraint: validated.mobility_constraint.clone(),
target_subject: target_subject.as_str().to_owned(),
})
}
}
fn ensure_transfer_request_matches(
&self,
admission: &ServiceAdmission,
obligation: &ServiceObligation,
) -> Result<(), ServiceBoundaryError> {
if admission.certificate.request_id != obligation.request_id {
return Err(ServiceBoundaryError::AdmissionRequestMismatch {
admission_request_id: admission.certificate.request_id.clone(),
obligation_request_id: obligation.request_id.clone(),
});
}
if admission.certificate.subject != self.request_subject.as_str() {
return Err(ServiceBoundaryError::AdmissionSubjectMismatch {
admission_subject: admission.certificate.subject.clone(),
boundary_subject: self.request_subject.as_str().to_owned(),
});
}
if admission.certificate.service_class != self.registration.service_name {
return Err(ServiceBoundaryError::AdmissionServiceMismatch {
admission_service: admission.certificate.service_class.clone(),
boundary_service: self.registration.service_name.clone(),
});
}
if admission.certificate.caller != obligation.caller {
return Err(ServiceBoundaryError::AdmissionCallerMismatch {
admission_caller: admission.certificate.caller.clone(),
obligation_caller: obligation.caller.clone(),
});
}
if admission.certificate.delivery_class != obligation.delivery_class {
return Err(ServiceBoundaryError::AdmissionDeliveryClassMismatch {
admission_class: admission.certificate.delivery_class,
obligation_class: obligation.delivery_class,
});
}
if admission.certificate.timeout != obligation.timeout {
return Err(ServiceBoundaryError::AdmissionTimeoutMismatch {
admission_timeout: admission.certificate.timeout,
obligation_timeout: obligation.timeout,
});
}
if obligation.subject == self.request_subject.as_str() {
Ok(())
} else {
Err(ServiceBoundaryError::ObligationOutsideBoundary {
subject: obligation.subject.clone(),
boundary_subject: self.request_subject.as_str().to_owned(),
})
}
}
}
fn validate_workflow_text(field: &'static str, value: &str) -> Result<(), WorkflowStateError> {
if value.trim().is_empty() {
return Err(WorkflowStateError::EmptyField { field });
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum WorkflowStateError {
#[error("workflow field `{field}` must not be empty")]
EmptyField {
field: &'static str,
},
#[error("workflow duration field `{field}` must be greater than zero")]
ZeroDuration {
field: &'static str,
},
#[error("workflow step `{step_id}` cannot {operation} while in status `{status}`")]
InvalidStepTransition {
step_id: String,
operation: &'static str,
status: &'static str,
},
#[error("workflow saga `{saga_id}` must contain at least one step")]
EmptySaga {
saga_id: String,
},
#[error("workflow saga `{saga_id}` has no runnable step")]
NoRunnableStep {
saga_id: String,
},
#[error(
"workflow saga `{saga_id}` current_step {current_step} is out of bounds for {len} steps"
)]
InvalidCurrentStep {
saga_id: String,
current_step: usize,
len: usize,
},
#[error("workflow obligation `{obligation_id:?}` is not present in the ledger")]
UnknownObligation {
obligation_id: ObligationId,
},
#[error(
"workflow obligation `{obligation_id:?}` is still pending but no live token is available to commit it"
)]
MissingLiveToken {
obligation_id: ObligationId,
},
#[error("workflow obligation `{obligation_id:?}` is already resolved as `{state:?}`")]
ObligationAlreadyResolved {
obligation_id: ObligationId,
state: ObligationState,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum WorkflowObligationRole {
Reply {
delivery_boundary: AckKind,
receipt_required: bool,
},
Lease {
resource: String,
},
Timeout,
Compensation {
subject: Subject,
},
Deadline {
deadline: Time,
},
}
impl WorkflowObligationRole {
fn validate(&self) -> Result<(), WorkflowStateError> {
if let Self::Lease { resource } = self {
validate_workflow_text("workflow_obligation.lease.resource", resource)?;
}
Ok(())
}
const fn obligation_kind(&self) -> ObligationKind {
match self {
Self::Reply { .. } => ObligationKind::Ack,
Self::Lease { .. } => ObligationKind::Lease,
Self::Timeout | Self::Deadline { .. } => ObligationKind::IoOp,
Self::Compensation { .. } => ObligationKind::SendPermit,
}
}
fn label(&self) -> String {
match self {
Self::Reply {
delivery_boundary,
receipt_required,
} => {
format!("reply boundary {delivery_boundary} (receipt_required={receipt_required})")
}
Self::Lease { resource } => format!("lease {resource}"),
Self::Timeout => "timeout".to_owned(),
Self::Compensation { subject } => format!("compensation {}", subject.as_str()),
Self::Deadline { deadline } => format!("deadline at {deadline:?}"),
}
}
const fn is_compensation(&self) -> bool {
matches!(self, Self::Compensation { .. })
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WorkflowObligationHandle {
pub role: WorkflowObligationRole,
pub obligation_id: ObligationId,
pub description: String,
pub allocated_at: Time,
#[serde(skip, default)]
token: Option<ObligationToken>,
}
impl WorkflowObligationHandle {
#[track_caller]
fn allocate(
ledger: &mut ObligationLedger,
role: WorkflowObligationRole,
description: String,
holder: TaskId,
region: RegionId,
now: Time,
) -> Result<Self, WorkflowStateError> {
role.validate()?;
validate_workflow_text("workflow_obligation.description", &description)?;
let token = ledger.acquire_with_context(
role.obligation_kind(),
holder,
region,
now,
SourceLocation::from_panic_location(Location::caller()),
None,
Some(description.clone()),
);
Ok(Self {
role,
obligation_id: token.id(),
description,
allocated_at: now,
token: Some(token),
})
}
pub fn state(&self, ledger: &ObligationLedger) -> Result<ObligationState, WorkflowStateError> {
ledger
.get(self.obligation_id)
.map(|record| record.state)
.ok_or(WorkflowStateError::UnknownObligation {
obligation_id: self.obligation_id,
})
}
pub fn is_owed(&self, ledger: &ObligationLedger) -> Result<bool, WorkflowStateError> {
Ok(self.state(ledger)? == ObligationState::Reserved)
}
fn commit(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
) -> Result<(), WorkflowStateError> {
if let Some(token) = self.token.take() {
ledger.commit(token, now);
return Ok(());
}
let state = ledger
.get(self.obligation_id)
.map(|record| record.state)
.ok_or(WorkflowStateError::UnknownObligation {
obligation_id: self.obligation_id,
})?;
if state == ObligationState::Reserved {
return Err(WorkflowStateError::MissingLiveToken {
obligation_id: self.obligation_id,
});
}
Err(WorkflowStateError::ObligationAlreadyResolved {
obligation_id: self.obligation_id,
state,
})
}
fn abort(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
reason: ObligationAbortReason,
) -> Result<(), WorkflowStateError> {
if let Some(token) = self.token.take() {
ledger.abort(token, now, reason);
return Ok(());
}
let state = ledger
.get(self.obligation_id)
.map(|record| record.state)
.ok_or(WorkflowStateError::UnknownObligation {
obligation_id: self.obligation_id,
})?;
if state == ObligationState::Reserved {
ledger.abort_by_id(self.obligation_id, now, reason);
return Ok(());
}
Err(WorkflowStateError::ObligationAlreadyResolved {
obligation_id: self.obligation_id,
state,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WorkflowOwedObligation {
pub step_id: String,
pub subject: Subject,
pub role: WorkflowObligationRole,
pub obligation_id: ObligationId,
pub description: String,
pub state: ObligationState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum WorkflowStepStatus {
Pending,
Active,
Completed,
Failed {
failure: ServiceFailure,
},
Compensating {
failure: ServiceFailure,
},
Compensated {
failure: ServiceFailure,
},
}
impl WorkflowStepStatus {
const fn as_str(self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Active => "active",
Self::Completed => "completed",
Self::Failed { .. } => "failed",
Self::Compensating { .. } => "compensating",
Self::Compensated { .. } => "compensated",
}
}
const fn is_terminal(self) -> bool {
matches!(
self,
Self::Completed | Self::Failed { .. } | Self::Compensated { .. }
)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct WorkflowStep {
pub step_id: String,
pub subject: Subject,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub obligations: Vec<WorkflowObligationHandle>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub compensation_path: Vec<Subject>,
pub timeout: Option<Duration>,
pub status: WorkflowStepStatus,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
declared_obligations: Vec<WorkflowObligationRole>,
}
impl WorkflowStep {
pub fn new(
step_id: impl Into<String>,
subject: Subject,
obligations: Vec<WorkflowObligationRole>,
compensation_path: Vec<Subject>,
timeout: Option<Duration>,
) -> Result<Self, WorkflowStateError> {
let step_id = step_id.into();
validate_workflow_text("workflow_step.step_id", &step_id)?;
if timeout.is_some_and(|value| value.is_zero()) {
return Err(WorkflowStateError::ZeroDuration {
field: "workflow_step.timeout",
});
}
for obligation in &obligations {
obligation.validate()?;
}
Ok(Self {
step_id,
subject,
obligations: Vec::new(),
compensation_path,
timeout,
status: WorkflowStepStatus::Pending,
declared_obligations: obligations,
})
}
fn outstanding_ids(
&self,
ledger: &ObligationLedger,
) -> Result<Vec<ObligationId>, WorkflowStateError> {
let mut ids = Vec::new();
for obligation in &self.obligations {
if obligation.is_owed(ledger)? {
ids.push(obligation.obligation_id);
}
}
Ok(ids)
}
fn ensure_status(
&self,
expected: WorkflowStepStatus,
operation: &'static str,
) -> Result<(), WorkflowStateError> {
if self.status != expected {
return Err(WorkflowStateError::InvalidStepTransition {
step_id: self.step_id.clone(),
operation,
status: self.status.as_str(),
});
}
Ok(())
}
#[track_caller]
pub fn start(
&mut self,
ledger: &mut ObligationLedger,
holder: TaskId,
region: RegionId,
now: Time,
) -> Result<(), WorkflowStateError> {
self.ensure_status(WorkflowStepStatus::Pending, "start")?;
for role in self.declared_obligations.iter().cloned() {
let description = format!(
"workflow step {} on {} owes {}",
self.step_id,
self.subject.as_str(),
role.label()
);
self.obligations.push(WorkflowObligationHandle::allocate(
ledger,
role,
description,
holder,
region,
now,
)?);
}
self.status = WorkflowStepStatus::Active;
Ok(())
}
pub fn complete(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
) -> Result<(), WorkflowStateError> {
self.ensure_status(WorkflowStepStatus::Active, "complete")?;
for obligation in &mut self.obligations {
if obligation.role.is_compensation() || !obligation.is_owed(ledger)? {
continue;
}
obligation.commit(ledger, now)?;
}
self.status = WorkflowStepStatus::Completed;
Ok(())
}
#[track_caller]
pub fn fail(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
failure: ServiceFailure,
holder: TaskId,
region: RegionId,
) -> Result<(), WorkflowStateError> {
self.ensure_status(WorkflowStepStatus::Active, "fail")?;
for obligation in &mut self.obligations {
if obligation.role.is_compensation() || !obligation.is_owed(ledger)? {
continue;
}
obligation.abort(ledger, now, failure.abort_reason())?;
}
if self.compensation_path.is_empty() {
self.status = WorkflowStepStatus::Failed { failure };
return Ok(());
}
for subject in self.compensation_path.iter().cloned() {
let role = WorkflowObligationRole::Compensation { subject };
let description = format!(
"workflow step {} compensation for {} via {}",
self.step_id,
self.subject.as_str(),
role.label()
);
self.obligations.push(WorkflowObligationHandle::allocate(
ledger,
role,
description,
holder,
region,
now,
)?);
}
self.status = WorkflowStepStatus::Compensating { failure };
Ok(())
}
pub fn complete_compensation(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
) -> Result<(), WorkflowStateError> {
let WorkflowStepStatus::Compensating { failure } = self.status else {
return Err(WorkflowStateError::InvalidStepTransition {
step_id: self.step_id.clone(),
operation: "complete compensation for",
status: self.status.as_str(),
});
};
for obligation in &mut self.obligations {
if !obligation.role.is_compensation() || !obligation.is_owed(ledger)? {
continue;
}
obligation.commit(ledger, now)?;
}
self.status = WorkflowStepStatus::Compensated { failure };
Ok(())
}
pub fn what_is_still_owed(
&self,
ledger: &ObligationLedger,
) -> Result<Vec<WorkflowOwedObligation>, WorkflowStateError> {
let mut owed = Vec::new();
for obligation in &self.obligations {
let state = obligation.state(ledger)?;
if state == ObligationState::Reserved {
owed.push(WorkflowOwedObligation {
step_id: self.step_id.clone(),
subject: self.subject.clone(),
role: obligation.role.clone(),
obligation_id: obligation.obligation_id,
description: obligation.description.clone(),
state,
});
}
}
Ok(owed)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum SagaEvidenceEvent {
Started,
StepCompleted,
StepFailed {
failure: ServiceFailure,
},
CompensationActivated,
CompensationCompleted,
Recovered,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SagaEvidenceRecord {
pub recorded_at: Time,
pub step_id: Option<String>,
pub subject: Option<Subject>,
pub event: SagaEvidenceEvent,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub outstanding_obligations: Vec<ObligationId>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SagaState {
pub saga_id: String,
pub steps: Vec<WorkflowStep>,
pub current_step: Option<usize>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub compensated_steps: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub evidence_trail: Vec<SagaEvidenceRecord>,
}
impl SagaState {
pub fn new(
saga_id: impl Into<String>,
steps: Vec<WorkflowStep>,
) -> Result<Self, WorkflowStateError> {
let saga_id = saga_id.into();
validate_workflow_text("saga_state.saga_id", &saga_id)?;
if steps.is_empty() {
return Err(WorkflowStateError::EmptySaga { saga_id });
}
Ok(Self {
saga_id,
steps,
current_step: None,
compensated_steps: Vec::new(),
evidence_trail: Vec::new(),
})
}
fn first_non_terminal_step(&self) -> Option<usize> {
self.steps
.iter()
.position(|step| !step.status.is_terminal())
}
fn current_index(&self) -> Result<usize, WorkflowStateError> {
let index = self
.current_step
.ok_or_else(|| WorkflowStateError::NoRunnableStep {
saga_id: self.saga_id.clone(),
})?;
if index >= self.steps.len() {
return Err(WorkflowStateError::InvalidCurrentStep {
saga_id: self.saga_id.clone(),
current_step: index,
len: self.steps.len(),
});
}
Ok(index)
}
fn push_evidence(
&mut self,
step_index: Option<usize>,
recorded_at: Time,
event: SagaEvidenceEvent,
outstanding_obligations: Vec<ObligationId>,
) {
let (step_id, subject) = step_index
.and_then(|index| self.steps.get(index))
.map_or((None, None), |step| {
(Some(step.step_id.clone()), Some(step.subject.clone()))
});
self.evidence_trail.push(SagaEvidenceRecord {
recorded_at,
step_id,
subject,
event,
outstanding_obligations,
});
}
#[track_caller]
pub fn start_next_step(
&mut self,
ledger: &mut ObligationLedger,
holder: TaskId,
region: RegionId,
now: Time,
) -> Result<(), WorkflowStateError> {
let index = match self.current_step {
Some(index) if index < self.steps.len() => match self.steps[index].status {
WorkflowStepStatus::Pending => index,
WorkflowStepStatus::Active | WorkflowStepStatus::Compensating { .. } => {
return Ok(());
}
WorkflowStepStatus::Completed
| WorkflowStepStatus::Failed { .. }
| WorkflowStepStatus::Compensated { .. } => self
.steps
.iter()
.position(|step| matches!(step.status, WorkflowStepStatus::Pending))
.ok_or_else(|| WorkflowStateError::NoRunnableStep {
saga_id: self.saga_id.clone(),
})?,
},
Some(index) => {
return Err(WorkflowStateError::InvalidCurrentStep {
saga_id: self.saga_id.clone(),
current_step: index,
len: self.steps.len(),
});
}
None => self
.steps
.iter()
.position(|step| matches!(step.status, WorkflowStepStatus::Pending))
.ok_or_else(|| WorkflowStateError::NoRunnableStep {
saga_id: self.saga_id.clone(),
})?,
};
self.steps[index].start(ledger, holder, region, now)?;
self.current_step = Some(index);
let outstanding = self.steps[index].outstanding_ids(ledger)?;
self.push_evidence(Some(index), now, SagaEvidenceEvent::Started, outstanding);
Ok(())
}
pub fn complete_current_step(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
) -> Result<(), WorkflowStateError> {
let index = self.current_index()?;
self.steps[index].complete(ledger, now)?;
let outstanding = self.steps[index].outstanding_ids(ledger)?;
self.push_evidence(
Some(index),
now,
SagaEvidenceEvent::StepCompleted,
outstanding,
);
self.current_step = self
.steps
.iter()
.enumerate()
.skip(index + 1)
.find(|(_, step)| !step.status.is_terminal())
.map(|(next, _)| next);
Ok(())
}
#[track_caller]
pub fn fail_current_step(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
failure: ServiceFailure,
holder: TaskId,
region: RegionId,
) -> Result<(), WorkflowStateError> {
let index = self.current_index()?;
self.steps[index].fail(ledger, now, failure, holder, region)?;
let outstanding = self.steps[index].outstanding_ids(ledger)?;
self.push_evidence(
Some(index),
now,
SagaEvidenceEvent::StepFailed { failure },
outstanding.clone(),
);
if matches!(
self.steps[index].status,
WorkflowStepStatus::Compensating { .. }
) {
self.push_evidence(
Some(index),
now,
SagaEvidenceEvent::CompensationActivated,
outstanding,
);
}
Ok(())
}
pub fn complete_current_compensation(
&mut self,
ledger: &mut ObligationLedger,
now: Time,
) -> Result<(), WorkflowStateError> {
let index = self.current_index()?;
self.steps[index].complete_compensation(ledger, now)?;
if !self
.compensated_steps
.iter()
.any(|step_id| step_id == &self.steps[index].step_id)
{
self.compensated_steps
.push(self.steps[index].step_id.clone());
}
let outstanding = self.steps[index].outstanding_ids(ledger)?;
self.push_evidence(
Some(index),
now,
SagaEvidenceEvent::CompensationCompleted,
outstanding,
);
self.current_step = self.first_non_terminal_step();
Ok(())
}
pub fn what_is_still_owed(
&self,
ledger: &ObligationLedger,
) -> Result<Vec<WorkflowOwedObligation>, WorkflowStateError> {
let mut owed = Vec::new();
for step in &self.steps {
owed.extend(step.what_is_still_owed(ledger)?);
}
Ok(owed)
}
pub fn recover_from_replay(
mut self,
ledger: &ObligationLedger,
recovered_at: Time,
) -> Result<Self, WorkflowStateError> {
for step in &self.steps {
for obligation in &step.obligations {
obligation.state(ledger)?;
}
}
self.current_step = match self.current_step {
Some(index) if index < self.steps.len() && !self.steps[index].status.is_terminal() => {
Some(index)
}
Some(index) if index >= self.steps.len() => {
return Err(WorkflowStateError::InvalidCurrentStep {
saga_id: self.saga_id.clone(),
current_step: index,
len: self.steps.len(),
});
}
_ => self.first_non_terminal_step(),
};
let outstanding = self
.what_is_still_owed(ledger)?
.into_iter()
.map(|owed| owed.obligation_id)
.collect::<Vec<_>>();
self.push_evidence(
self.current_step,
recovered_at,
SagaEvidenceEvent::Recovered,
outstanding,
);
Ok(self)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum ServiceContractError {
#[error("service name must not be empty")]
EmptyServiceName,
#[error("named schema at `{field}` must not be empty")]
EmptyNamedSchema {
field: String,
},
#[error("bounded-region mobility constraint at `{field}` must declare a region label")]
EmptyBoundedRegion {
field: String,
},
#[error("duration at `{field}` must be greater than zero")]
ZeroDuration {
field: String,
},
#[error("queue-within-budget overload policy must declare max_pending > 0")]
InvalidQueueCapacity,
#[error(
"provider guaranteed durability {guaranteed_durability} is weaker than contract floor {required_durability}"
)]
ProviderGuaranteeBelowContractFloor {
guaranteed_durability: DeliveryClass,
required_durability: DeliveryClass,
},
#[error("provider compensation `{provider}` is weaker than contract requirement `{required}`")]
ProviderCompensationBelowContract {
provider: CompensationSemantics,
required: CompensationSemantics,
},
#[error(
"provider evidence level `{provider}` is weaker than contract requirement `{required}`"
)]
ProviderEvidenceBelowContract {
provider: EvidenceLevel,
required: EvidenceLevel,
},
#[error("provider mobility `{provider}` does not satisfy contract requirement `{required}`")]
ProviderMobilityIncompatible {
provider: MobilityConstraint,
required: MobilityConstraint,
},
#[error("provider admitted delivery class {class} below contract floor {required_durability}")]
ProviderClassBelowContractFloor {
class: DeliveryClass,
required_durability: DeliveryClass,
},
#[error(
"provider admitted delivery class {class} above guaranteed durability {guaranteed_durability}"
)]
ProviderClassAboveGuaranteedDurability {
class: DeliveryClass,
guaranteed_durability: DeliveryClass,
},
#[error("caller timeout overrides are not allowed by the contract budget semantics")]
TimeoutOverrideNotAllowed,
#[error(
"caller timeout override {requested_timeout:?} exceeds contract default timeout {default_timeout:?}"
)]
TimeoutOverrideExceedsDefault {
requested_timeout: Duration,
default_timeout: Duration,
},
#[error("caller priority hints are not allowed by the contract budget semantics")]
PriorityHintsNotAllowed,
#[error(transparent)]
DeliveryClassPolicy(#[from] DeliveryClassPolicyError),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::messaging::ir::ReplySpaceRule;
use crate::messaging::morphism::{
FabricCapability, MorphismClass, MorphismPlanDirection, ResponsePolicy,
ReversibilityRequirement, SharingPolicy, SubjectTransform,
};
use crate::messaging::subject::SubjectPattern;
use crate::record::{ObligationAbortReason, ObligationState};
use crate::util::ArenaIndex;
fn provider_terms() -> ProviderTerms {
ProviderTerms {
admissible_classes: DeliveryClassPolicy::new(
DeliveryClass::ObligationBacked,
[DeliveryClass::ObligationBacked, DeliveryClass::MobilitySafe],
)
.expect("provider policy"),
guaranteed_durability: DeliveryClass::MobilitySafe,
compensation_policy: CompensationSemantics::BestEffort,
mobility_constraint: MobilityConstraint::Pinned,
evidence_level: EvidenceLevel::Detailed,
}
}
fn provider_terms_with_mobility(mobility_constraint: MobilityConstraint) -> ProviderTerms {
ProviderTerms {
mobility_constraint,
..provider_terms()
}
}
fn contract() -> ServiceContractSchema {
ServiceContractSchema {
budget_semantics: BudgetSemantics {
honor_priority_hints: true,
..BudgetSemantics::default()
},
compensation_semantics: CompensationSemantics::BestEffort,
mobility_constraints: MobilityConstraint::Unrestricted,
evidence_requirements: EvidenceLevel::Standard,
..ServiceContractSchema::default()
}
}
fn namespace() -> NamespaceKernel {
NamespaceKernel::new("acme", "orders").expect("namespace kernel")
}
fn service_subject() -> Subject {
Subject::new("tenant.acme.service.orders.lookup")
}
fn service_boundary() -> FabricServiceBoundary {
let registration =
ServiceRegistration::new("fabric.echo", contract(), provider_terms()).expect("valid");
FabricServiceBoundary::new(namespace(), service_subject(), registration)
.expect("valid service boundary")
}
fn transferable_service_boundary() -> FabricServiceBoundary {
let registration = ServiceRegistration::new(
"fabric.echo",
contract(),
provider_terms_with_mobility(MobilityConstraint::Unrestricted),
)
.expect("valid");
FabricServiceBoundary::new(namespace(), service_subject(), registration)
.expect("valid service boundary")
}
fn service_boundary_with_name(service_name: &str) -> FabricServiceBoundary {
let registration = ServiceRegistration::new(
service_name,
contract(),
provider_terms_with_mobility(MobilityConstraint::Unrestricted),
)
.expect("valid");
FabricServiceBoundary::new(namespace(), service_subject(), registration)
.expect("valid service boundary")
}
fn inventory_service_boundary() -> FabricServiceBoundary {
let registration = ServiceRegistration::new(
"fabric.inventory",
contract(),
provider_terms_with_mobility(MobilityConstraint::Unrestricted),
)
.expect("valid");
FabricServiceBoundary::new(
NamespaceKernel::new("acme", "inventory").expect("namespace kernel"),
Subject::new("tenant.acme.service.inventory.lookup"),
registration,
)
.expect("valid service boundary")
}
fn authoritative_import_morphism() -> Morphism {
Morphism {
source_language: SubjectPattern::new("tenant.acme.service.orders.lookup"),
dest_language: SubjectPattern::new("tenant.acme.service.edge-orders.lookup"),
class: MorphismClass::Authoritative,
transform: SubjectTransform::RenamePrefix {
from: SubjectPattern::new("tenant.acme.service.orders.lookup"),
to: SubjectPattern::new("tenant.acme.service.edge-orders.lookup"),
},
reversibility: ReversibilityRequirement::Bijective,
capability_requirements: vec![
FabricCapability::CarryAuthority,
FabricCapability::ReplyAuthority,
],
sharing_policy: SharingPolicy::TenantScoped,
response_policy: ResponsePolicy::ReplyAuthoritative,
..Morphism::default()
}
}
fn make_task() -> TaskId {
TaskId::from_arena(ArenaIndex::new(11, 0))
}
fn make_region() -> RegionId {
RegionId::from_arena(ArenaIndex::new(7, 0))
}
fn workflow_step(
step_id: &str,
subject: &str,
obligations: Vec<WorkflowObligationRole>,
compensation_path: &[&str],
) -> WorkflowStep {
WorkflowStep::new(
step_id,
Subject::new(subject),
obligations,
compensation_path
.iter()
.map(|subject| Subject::new(*subject))
.collect(),
Some(Duration::from_secs(5)),
)
.expect("valid workflow step")
}
#[test]
fn service_registration_accepts_valid_contract() {
let registration =
ServiceRegistration::new("fabric.echo", contract(), provider_terms()).expect("valid");
assert_eq!(registration.service_name, "fabric.echo");
assert_eq!(
registration.provider_terms.guaranteed_durability,
DeliveryClass::MobilitySafe
);
}
#[test]
fn service_boundary_binds_request_and_control_subjects() {
let boundary = service_boundary();
let subjects = boundary.control_subjects().expect("subjects");
assert_eq!(
boundary.request_subject().as_str(),
"tenant.acme.service.orders.lookup"
);
assert_eq!(
subjects.discovery.as_str(),
"tenant.acme.service.orders.discover"
);
assert_eq!(
subjects.health.as_str(),
"$SYS.FABRIC.HEALTH.TENANT.acme.SERVICE.orders.status"
);
assert_eq!(
subjects.advisories.as_str(),
"$SYS.FABRIC.ROUTE.TENANT.acme.SERVICE.orders.advisory"
);
}
#[test]
fn service_boundary_registers_namespace_control_handlers() {
let boundary = service_boundary();
let subjects = boundary.control_subjects().expect("subjects");
let mut registry = ControlRegistry::new();
let handlers = boundary
.register_control_handlers(&mut registry)
.expect("register handlers");
let health_matches = registry.matching_handlers(&subjects.health);
assert_eq!(health_matches.len(), 1);
assert_eq!(health_matches[0].id, handlers.health);
let advisory_matches = registry.matching_handlers(&subjects.advisories);
assert_eq!(advisory_matches.len(), 1);
assert_eq!(advisory_matches[0].id, handlers.advisories);
}
#[test]
fn service_boundary_admission_issues_request_certificate() {
let boundary = service_boundary();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
timeout_override: Some(Duration::from_secs(5)),
priority_hint: Some(200),
};
let admission = boundary
.admit_request(
"req-service-boundary",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
0xfeed_u64,
Time::from_nanos(10),
)
.expect("admit request");
assert_eq!(
admission.validated.delivery_class,
DeliveryClass::MobilitySafe
);
assert_eq!(
admission.certificate.subject,
"tenant.acme.service.orders.lookup"
);
assert_eq!(admission.certificate.service_class, "fabric.echo");
assert_eq!(
admission.certificate.reply_space_rule,
ReplySpaceRule::CallerInbox
);
}
#[test]
fn service_boundary_transfer_compiles_import_plan_and_updates_obligation() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
99,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
let plan = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
Some(ReplySpaceRule::DedicatedPrefix {
prefix: "tenant.acme.service.edge-orders.lookup".to_owned(),
}),
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect("transfer through import plan");
assert_eq!(obligation.subject, "tenant.acme.service.edge-orders.lookup");
assert_eq!(obligation.callee, "orders-edge");
assert_eq!(obligation.lineage.len(), 1);
assert_eq!(obligation.lineage[0].morphism, "import/orders->edge");
assert_eq!(plan.direction, MorphismPlanDirection::Import);
assert_eq!(
plan.selected_reply_space,
Some(ReplySpaceRule::DedicatedPrefix {
prefix: "tenant.acme.service.edge-orders.lookup".to_owned(),
})
);
}
#[test]
fn service_boundary_rejects_import_morphism_outside_request_subject() {
let boundary = service_boundary();
let morphism = Morphism {
source_language: SubjectPattern::new("tenant.acme.service.inventory.lookup"),
..authoritative_import_morphism()
};
let error = boundary
.compile_import_plan(&morphism, None)
.expect_err("morphism should not cover this boundary");
assert_eq!(
error,
ServiceBoundaryError::MorphismDoesNotCoverBoundary {
subject: "tenant.acme.service.orders.lookup".to_owned(),
source_language: "tenant.acme.service.inventory.lookup".to_owned(),
}
);
}
#[test]
fn service_boundary_transfer_rejects_non_unrestricted_mobility() {
let boundary = service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer-pinned",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
11,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-pinned",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("pinned mobility should fail closed");
assert_eq!(
error,
ServiceBoundaryError::TransferRequiresUnrestrictedMobility {
constraint: MobilityConstraint::Pinned,
target_subject: "tenant.acme.service.edge-orders.lookup".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_target_outside_destination_language() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer-miss",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
12,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-miss",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.wrong.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("target outside morphism destination should fail closed");
assert_eq!(
error,
ServiceBoundaryError::TargetOutsideMorphismDestination {
subject: "tenant.acme.service.wrong.lookup".to_owned(),
dest_language: "tenant.acme.service.edge-orders.lookup".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_broad_destination_language_for_subject_only_transfer() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer-broad-dest",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
22,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-broad-dest",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
let broad_destination = Morphism {
dest_language: SubjectPattern::new("tenant.acme.service.edge-orders.>"),
..authoritative_import_morphism()
};
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&broad_destination,
)
.expect_err("broad destination language should fail closed");
assert_eq!(
error,
ServiceBoundaryError::TransferRequiresExactMorphismDestination {
target_subject: "tenant.acme.service.edge-orders.lookup".to_owned(),
dest_language: "tenant.acme.service.edge-orders.>".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_admission_request_mismatch() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer-a",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
13,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-b",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("mismatched admission should fail closed");
assert_eq!(
error,
ServiceBoundaryError::AdmissionRequestMismatch {
admission_request_id: "req-transfer-a".to_owned(),
obligation_request_id: "req-transfer-b".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_obligation_outside_boundary_subject() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer-shifted",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
14,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-shifted",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
obligation.subject = "tenant.acme.service.edge-orders.lookup".to_owned();
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("obligation outside boundary should fail closed");
assert_eq!(
error,
ServiceBoundaryError::ObligationOutsideBoundary {
subject: "tenant.acme.service.edge-orders.lookup".to_owned(),
boundary_subject: "tenant.acme.service.orders.lookup".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
}
#[test]
fn service_boundary_transfer_rejects_admission_subject_mismatch() {
let boundary = transferable_service_boundary();
let other_boundary = inventory_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = other_boundary
.admit_request(
"req-transfer-subject",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
15,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-subject",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.certificate.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.certificate.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("admission from another boundary should fail closed");
assert_eq!(
error,
ServiceBoundaryError::AdmissionSubjectMismatch {
admission_subject: "tenant.acme.service.inventory.lookup".to_owned(),
boundary_subject: "tenant.acme.service.orders.lookup".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_admission_service_mismatch() {
let boundary = transferable_service_boundary();
let other_boundary = service_boundary_with_name("fabric.inventory");
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = other_boundary
.admit_request(
"req-transfer-service",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
16,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-service",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.certificate.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.certificate.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("admission from another service should fail closed");
assert_eq!(
error,
ServiceBoundaryError::AdmissionServiceMismatch {
admission_service: "fabric.inventory".to_owned(),
boundary_service: "fabric.echo".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_admission_caller_mismatch() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-transfer-caller",
"caller-b",
&caller,
ReplySpaceRule::CallerInbox,
17,
Time::from_nanos(1),
)
.expect("admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-caller",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.certificate.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.certificate.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(3),
),
&authoritative_import_morphism(),
)
.expect_err("caller mismatch should fail closed");
assert_eq!(
error,
ServiceBoundaryError::AdmissionCallerMismatch {
admission_caller: "caller-b".to_owned(),
obligation_caller: "caller-a".to_owned(),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_admission_delivery_class_mismatch() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let default_admission = boundary
.admit_request(
"req-transfer-class",
"caller-a",
&CallerOptions::default(),
ReplySpaceRule::CallerInbox,
18,
Time::from_nanos(1),
)
.expect("default admission");
let admission = boundary
.admit_request(
"req-transfer-class",
"caller-a",
&CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
..CallerOptions::default()
},
ReplySpaceRule::CallerInbox,
19,
Time::from_nanos(2),
)
.expect("mobility-safe admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-class",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
default_admission.certificate.delivery_class,
make_task(),
make_region(),
Time::from_nanos(3),
default_admission.certificate.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(4),
),
&authoritative_import_morphism(),
)
.expect_err("delivery-class mismatch should fail closed");
assert_eq!(
error,
ServiceBoundaryError::AdmissionDeliveryClassMismatch {
admission_class: DeliveryClass::MobilitySafe,
obligation_class: DeliveryClass::ObligationBacked,
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_transfer_rejects_admission_timeout_mismatch() {
let boundary = transferable_service_boundary();
let mut ledger = ObligationLedger::new();
let default_admission = boundary
.admit_request(
"req-transfer-timeout",
"caller-a",
&CallerOptions::default(),
ReplySpaceRule::CallerInbox,
20,
Time::from_nanos(1),
)
.expect("default admission");
let admission = boundary
.admit_request(
"req-transfer-timeout",
"caller-a",
&CallerOptions {
timeout_override: Some(Duration::from_secs(5)),
..CallerOptions::default()
},
ReplySpaceRule::CallerInbox,
21,
Time::from_nanos(2),
)
.expect("timed admission");
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-timeout",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
default_admission.certificate.delivery_class,
make_task(),
make_region(),
Time::from_nanos(3),
default_admission.certificate.timeout,
)
.expect("allocate obligation");
let error = boundary
.transfer_request_via_import(
&admission,
&mut obligation,
ImportTransferRequest::new(
"orders-edge",
Subject::new("tenant.acme.service.edge-orders.lookup"),
"import/orders->edge",
None,
Time::from_nanos(4),
),
&authoritative_import_morphism(),
)
.expect_err("timeout mismatch should fail closed");
assert_eq!(
error,
ServiceBoundaryError::AdmissionTimeoutMismatch {
admission_timeout: Some(Duration::from_secs(5)),
obligation_timeout: Some(Duration::from_secs(30)),
}
);
assert!(obligation.lineage.is_empty());
assert_eq!(obligation.subject, boundary.request_subject().as_str());
}
#[test]
fn service_boundary_cancellation_propagates_to_service_obligation_abort() {
let boundary = service_boundary();
let mut ledger = ObligationLedger::new();
let caller = CallerOptions {
requested_class: Some(DeliveryClass::ObligationBacked),
..CallerOptions::default()
};
let admission = boundary
.admit_request(
"req-cancel",
"caller-a",
&caller,
ReplySpaceRule::CallerInbox,
42,
Time::from_nanos(1),
)
.expect("admission");
let obligation = ServiceObligation::allocate(
&mut ledger,
"req-cancel",
"caller-a",
"orders-origin",
boundary.request_subject().as_str(),
admission.validated.delivery_class,
make_task(),
make_region(),
Time::from_nanos(2),
admission.validated.timeout,
)
.expect("allocate obligation");
let obligation_id = obligation.obligation_id().expect("tracked obligation");
let aborted = boundary
.cancel_request(obligation, &mut ledger, Time::from_nanos(3))
.expect("cancel request");
assert_eq!(aborted.failure, ServiceFailure::Cancelled);
assert_eq!(aborted.obligation_id, Some(obligation_id));
assert_eq!(ledger.pending_count(), 0);
let record = ledger.get(obligation_id).expect("ledger record");
assert_eq!(record.state, ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Cancel));
}
#[test]
fn service_registration_rejects_provider_terms_below_contract() {
let provider_terms = ProviderTerms {
guaranteed_durability: DeliveryClass::DurableOrdered,
..provider_terms()
};
let err = ServiceRegistration::new("fabric.echo", contract(), provider_terms)
.expect_err("durability floor should be enforced");
assert_eq!(
err,
ServiceContractError::ProviderGuaranteeBelowContractFloor {
guaranteed_durability: DeliveryClass::DurableOrdered,
required_durability: DeliveryClass::ObligationBacked,
}
);
}
#[test]
fn validate_caller_accepts_in_bounds_request() {
let registration =
ServiceRegistration::new("fabric.echo", contract(), provider_terms()).expect("valid");
let caller = CallerOptions {
requested_class: Some(DeliveryClass::MobilitySafe),
timeout_override: Some(Duration::from_secs(5)),
priority_hint: Some(200),
};
let validated = registration
.validate_caller(&caller)
.expect("caller request should be valid");
assert_eq!(validated.delivery_class, DeliveryClass::MobilitySafe);
assert_eq!(validated.timeout, Some(Duration::from_secs(5)));
assert_eq!(validated.priority_hint, Some(200));
assert_eq!(validated.mobility_constraint, MobilityConstraint::Pinned);
}
#[test]
fn validate_caller_rejects_out_of_bounds_delivery_class() {
let registration =
ServiceRegistration::new("fabric.echo", contract(), provider_terms()).expect("valid");
let caller = CallerOptions {
requested_class: Some(DeliveryClass::ForensicReplayable),
..CallerOptions::default()
};
let err = registration
.validate_caller(&caller)
.expect_err("caller class should be rejected");
assert_eq!(
err,
ServiceContractError::DeliveryClassPolicy(
DeliveryClassPolicyError::RequestedClassNotAdmissible {
requested: DeliveryClass::ForensicReplayable,
default_class: DeliveryClass::ObligationBacked,
}
)
);
}
#[test]
fn validate_caller_rejects_timeout_override_when_disabled() {
let mut contract = contract();
contract.budget_semantics.allow_timeout_override = false;
let registration =
ServiceRegistration::new("fabric.echo", contract, provider_terms()).expect("valid");
let caller = CallerOptions {
timeout_override: Some(Duration::from_secs(1)),
..CallerOptions::default()
};
let err = registration
.validate_caller(&caller)
.expect_err("timeout override should be rejected");
assert_eq!(err, ServiceContractError::TimeoutOverrideNotAllowed);
}
#[test]
fn validate_caller_rejects_timeout_override_above_contract_default() {
let mut contract = contract();
contract.budget_semantics.default_timeout = Some(Duration::from_secs(5));
let registration =
ServiceRegistration::new("fabric.echo", contract, provider_terms()).expect("valid");
let caller = CallerOptions {
timeout_override: Some(Duration::from_secs(6)),
..CallerOptions::default()
};
let err = registration
.validate_caller(&caller)
.expect_err("timeout override should stay within the provider default");
assert_eq!(
err,
ServiceContractError::TimeoutOverrideExceedsDefault {
requested_timeout: Duration::from_secs(6),
default_timeout: Duration::from_secs(5),
}
);
}
#[test]
fn validate_caller_accepts_timeout_override_equal_to_contract_default() {
let mut contract = contract();
contract.budget_semantics.default_timeout = Some(Duration::from_secs(5));
let registration =
ServiceRegistration::new("fabric.echo", contract, provider_terms()).expect("valid");
let caller = CallerOptions {
timeout_override: Some(Duration::from_secs(5)),
..CallerOptions::default()
};
let validated = registration
.validate_caller(&caller)
.expect("equal timeout override should remain admissible");
assert_eq!(validated.timeout, Some(Duration::from_secs(5)));
}
#[test]
fn ephemeral_request_reply_stays_untracked() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-ephemeral",
"caller",
"callee",
"svc.echo",
DeliveryClass::EphemeralInteractive,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("ephemeral path should be valid");
assert!(!obligation.is_tracked());
assert_eq!(ledger.pending_count(), 0);
let reply = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"ok".to_vec(),
AckKind::Accepted,
false,
)
.expect("cheap path commit should succeed");
assert_eq!(reply.service_obligation_id, None);
assert!(reply.reply_obligation.is_none());
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn obligation_backed_request_commits_and_creates_reply_obligation() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-1",
"caller",
"callee",
"svc.echo",
DeliveryClass::MobilitySafe,
make_task(),
make_region(),
Time::from_nanos(10),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let service_id = obligation.obligation_id().expect("tracked obligation id");
assert_eq!(ledger.pending_count(), 1);
let committed = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(20),
b"payload".to_vec(),
AckKind::Received,
true,
)
.expect("tracked commit should succeed");
assert_eq!(committed.service_obligation_id, Some(service_id));
assert_eq!(committed.payload, b"payload".to_vec());
let reply = committed
.reply_obligation
.expect("reply obligation expected");
let reply_id = reply.obligation_id();
assert_eq!(reply.service_obligation_id, service_id);
assert_eq!(ledger.pending_count(), 1);
let delivery = reply.commit_delivery(&mut ledger, Time::from_nanos(30));
assert_eq!(delivery.obligation_id, reply_id);
assert_eq!(delivery.service_obligation_id, service_id);
assert_eq!(delivery.delivery_boundary, AckKind::Received);
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn service_obligation_abort_records_typed_failure() {
let mut ledger = ObligationLedger::new();
let obligation = ServiceObligation::allocate(
&mut ledger,
"req-2",
"caller",
"callee",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(5),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let obligation_id = obligation.obligation_id().expect("tracked id");
let aborted = obligation
.abort(
&mut ledger,
Time::from_nanos(15),
ServiceFailure::ApplicationError,
)
.expect("abort should succeed");
assert_eq!(aborted.obligation_id, Some(obligation_id));
assert_eq!(aborted.failure, ServiceFailure::ApplicationError);
assert_eq!(ledger.pending_count(), 0);
let record = ledger.get(obligation_id).expect("ledger record exists");
assert_eq!(record.state, ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Error));
}
#[test]
fn service_obligation_transfer_preserves_identity_and_lineage() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-3",
"caller",
"callee-a",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let obligation_id = obligation.obligation_id().expect("tracked id");
obligation
.transfer(
"callee-b",
"svc.echo.imported",
"import/orders->edge",
Time::from_nanos(2),
)
.expect("transfer should succeed");
assert_eq!(obligation.obligation_id(), Some(obligation_id));
assert_eq!(obligation.callee, "callee-b");
assert_eq!(obligation.subject, "svc.echo.imported");
assert_eq!(obligation.lineage.len(), 1);
assert_eq!(obligation.lineage[0].morphism, "import/orders->edge");
}
#[test]
fn invalid_transfer_preserves_tracked_obligation() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-transfer-invalid",
"caller",
"callee-a",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let obligation_id = obligation.obligation_id().expect("tracked id");
let err = obligation
.transfer(
"",
"svc.echo.imported",
"import/orders->edge",
Time::from_nanos(2),
)
.expect_err("invalid transfer should be rejected");
assert_eq!(
err,
ServiceObligationError::EmptyField {
field: "transfer.callee",
}
);
assert_eq!(obligation.obligation_id(), Some(obligation_id));
assert_eq!(ledger.pending_count(), 1);
obligation
.abort(
&mut ledger,
Time::from_nanos(3),
ServiceFailure::ApplicationError,
)
.expect("abort should succeed");
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn service_obligation_timeout_is_explicit_abort() {
let mut ledger = ObligationLedger::new();
let obligation = ServiceObligation::allocate(
&mut ledger,
"req-4",
"caller",
"callee",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(3),
Some(Duration::from_secs(1)),
)
.expect("tracked request should allocate");
let obligation_id = obligation.obligation_id().expect("tracked id");
let timed_out = obligation
.timeout(&mut ledger, Time::from_nanos(100))
.expect("timeout should abort successfully");
assert_eq!(timed_out.failure, ServiceFailure::TimedOut);
let record = ledger.get(obligation_id).expect("ledger record exists");
assert_eq!(record.state, ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Explicit));
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn resolved_service_obligation_rejects_second_resolution() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-resolved-twice",
"caller",
"callee",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let committed = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"payload".to_vec(),
AckKind::Served,
false,
)
.expect("first resolution should succeed");
assert!(committed.reply_obligation.is_none());
assert_eq!(ledger.pending_count(), 0);
let err = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(3),
b"payload".to_vec(),
AckKind::Served,
false,
)
.expect_err("resolved obligation should reject a second commit");
assert_eq!(
err,
ServiceObligationError::AlreadyResolved {
operation: "commit_with_reply",
}
);
}
#[test]
fn resolved_service_obligation_rejects_abort_after_commit() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-resolved-abort",
"caller",
"callee",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"payload".to_vec(),
AckKind::Served,
false,
)
.expect("first resolution should succeed");
let err = obligation
.abort(
&mut ledger,
Time::from_nanos(3),
ServiceFailure::ApplicationError,
)
.expect_err("resolved obligation should reject abort");
assert_eq!(
err,
ServiceObligationError::AlreadyResolved { operation: "abort" }
);
}
#[test]
fn resolved_service_obligation_rejects_timeout_after_commit() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-resolved-timeout",
"caller",
"callee",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"payload".to_vec(),
AckKind::Served,
false,
)
.expect("first resolution should succeed");
let err = obligation
.timeout(&mut ledger, Time::from_nanos(3))
.expect_err("resolved obligation should reject timeout");
assert_eq!(
err,
ServiceObligationError::AlreadyResolved {
operation: "timeout",
}
);
}
#[test]
fn tracked_reply_boundary_below_minimum_preserves_obligation() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-boundary-floor",
"caller",
"callee",
"svc.echo",
DeliveryClass::MobilitySafe,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let obligation_id = obligation.obligation_id().expect("tracked id");
let err = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"payload".to_vec(),
AckKind::Committed,
false,
)
.expect_err("boundary below durable floor should be rejected");
assert_eq!(
err,
ServiceObligationError::ReplyBoundaryBelowMinimum {
delivery_class: DeliveryClass::MobilitySafe,
minimum_boundary: AckKind::Received,
requested_boundary: AckKind::Committed,
}
);
assert_eq!(obligation.obligation_id(), Some(obligation_id));
assert_eq!(ledger.pending_count(), 1);
let aborted = obligation
.abort(
&mut ledger,
Time::from_nanos(3),
ServiceFailure::ApplicationError,
)
.expect("abort should succeed");
assert_eq!(aborted.obligation_id, Some(obligation_id));
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn receipt_required_reply_must_use_received_boundary() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-receipt-boundary",
"caller",
"callee",
"svc.echo",
DeliveryClass::EphemeralInteractive,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("ephemeral request should allocate");
let err = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"payload".to_vec(),
AckKind::Served,
true,
)
.expect_err("receipt-required replies must use the received boundary");
assert_eq!(
err,
ServiceObligationError::ReceiptRequiresReceivedBoundary {
requested_boundary: AckKind::Served,
}
);
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn untracked_delivery_class_rejects_follow_up_reply_tracking() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-untracked-follow-up",
"caller",
"callee",
"svc.echo",
DeliveryClass::EphemeralInteractive,
make_task(),
make_region(),
Time::from_nanos(1),
Some(Duration::from_secs(5)),
)
.expect("ephemeral request should allocate");
let err = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(2),
b"payload".to_vec(),
AckKind::Received,
false,
)
.expect_err("cheap path should not pretend to support tracked reply delivery");
assert_eq!(
err,
ServiceObligationError::ReplyTrackingUnavailable {
delivery_class: DeliveryClass::EphemeralInteractive,
requested_boundary: AckKind::Received,
receipt_required: false,
}
);
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn reply_obligation_abort_records_failure_and_clears_pending() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-reply-abort",
"caller",
"callee",
"svc.echo",
DeliveryClass::MobilitySafe,
make_task(),
make_region(),
Time::from_nanos(10),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let committed = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(20),
b"payload".to_vec(),
AckKind::Received,
true,
)
.expect("tracked commit should succeed");
let reply = committed
.reply_obligation
.expect("reply obligation expected");
let reply_id = reply.obligation_id();
let aborted = reply.abort_delivery(
&mut ledger,
Time::from_nanos(30),
ServiceFailure::TransportError,
);
assert_eq!(aborted.obligation_id, reply_id);
assert_eq!(aborted.failure, ServiceFailure::TransportError);
assert_eq!(aborted.delivery_boundary, AckKind::Received);
let record = ledger.get(reply_id).expect("reply record exists");
assert_eq!(record.state, ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Error));
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn reply_obligation_timeout_is_explicit_abort() {
let mut ledger = ObligationLedger::new();
let mut obligation = ServiceObligation::allocate(
&mut ledger,
"req-reply-timeout",
"caller",
"callee",
"svc.echo",
DeliveryClass::MobilitySafe,
make_task(),
make_region(),
Time::from_nanos(10),
Some(Duration::from_secs(5)),
)
.expect("tracked request should allocate");
let committed = obligation
.commit_with_reply(
&mut ledger,
Time::from_nanos(20),
b"payload".to_vec(),
AckKind::Received,
true,
)
.expect("tracked commit should succeed");
let reply = committed
.reply_obligation
.expect("reply obligation expected");
let reply_id = reply.obligation_id();
let timed_out = reply.timeout(&mut ledger, Time::from_nanos(40));
assert_eq!(timed_out.obligation_id, reply_id);
assert_eq!(timed_out.failure, ServiceFailure::TimedOut);
assert_eq!(timed_out.delivery_boundary, AckKind::Received);
let record = ledger.get(reply_id).expect("reply record exists");
assert_eq!(record.state, ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Explicit));
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn payload_shape_named_schema_rejects_empty() {
let shape = PayloadShape::NamedSchema {
schema: " ".to_owned(),
};
assert!(shape.validate("test").is_err());
}
#[test]
fn payload_shape_named_schema_accepts_non_empty() {
let shape = PayloadShape::NamedSchema {
schema: "orders.v1".to_owned(),
};
assert!(shape.validate("test").is_ok());
}
#[test]
fn payload_shape_non_named_variants_validate() {
for shape in [
PayloadShape::Empty,
PayloadShape::JsonDocument,
PayloadShape::BinaryBlob,
PayloadShape::SubjectEncoded,
] {
assert!(shape.validate("test").is_ok());
}
}
#[test]
fn reply_shape_none_validates() {
assert!(ReplyShape::None.validate("test").is_ok());
}
#[test]
fn reply_shape_unary_with_empty_named_schema_rejects() {
let shape = ReplyShape::Unary {
shape: PayloadShape::NamedSchema {
schema: String::new(),
},
};
assert!(shape.validate("test").is_err());
}
#[test]
fn reply_shape_stream_validates_inner_shape() {
let shape = ReplyShape::Stream {
shape: PayloadShape::JsonDocument,
};
assert!(shape.validate("test").is_ok());
}
#[test]
fn budget_semantics_rejects_zero_timeout() {
let budget = BudgetSemantics {
default_timeout: Some(Duration::ZERO),
..BudgetSemantics::default()
};
match budget.validate() {
Err(ServiceContractError::ZeroDuration { field }) => {
assert!(field.contains("default_timeout"));
}
other => panic!("expected ZeroDuration, got {other:?}"),
}
}
#[test]
fn budget_semantics_none_timeout_validates() {
let budget = BudgetSemantics {
default_timeout: None,
..BudgetSemantics::default()
};
assert!(budget.validate().is_ok());
}
#[test]
fn mobility_unrestricted_satisfies_any_requirement() {
assert!(MobilityConstraint::Unrestricted.satisfies(&MobilityConstraint::Unrestricted));
}
#[test]
fn mobility_pinned_satisfies_pinned() {
assert!(MobilityConstraint::Pinned.satisfies(&MobilityConstraint::Pinned));
}
#[test]
fn mobility_pinned_satisfies_bounded_region() {
assert!(
MobilityConstraint::Pinned.satisfies(&MobilityConstraint::BoundedRegion {
region: "us-east".to_owned(),
})
);
}
#[test]
fn mobility_bounded_satisfies_same_region() {
let constraint = MobilityConstraint::BoundedRegion {
region: "eu-west".to_owned(),
};
let required = MobilityConstraint::BoundedRegion {
region: "eu-west".to_owned(),
};
assert!(constraint.satisfies(&required));
}
#[test]
fn mobility_bounded_does_not_satisfy_different_region() {
let constraint = MobilityConstraint::BoundedRegion {
region: "us-east".to_owned(),
};
let required = MobilityConstraint::BoundedRegion {
region: "eu-west".to_owned(),
};
assert!(!constraint.satisfies(&required));
}
#[test]
fn mobility_unrestricted_does_not_satisfy_bounded() {
assert!(
!MobilityConstraint::Unrestricted.satisfies(&MobilityConstraint::BoundedRegion {
region: "any".to_owned(),
})
);
}
#[test]
fn mobility_unrestricted_does_not_satisfy_pinned() {
assert!(!MobilityConstraint::Unrestricted.satisfies(&MobilityConstraint::Pinned));
}
#[test]
fn mobility_bounded_rejects_empty_region() {
let mc = MobilityConstraint::BoundedRegion {
region: " ".to_owned(),
};
assert!(mc.validate("test").is_err());
}
#[test]
fn overload_queue_rejects_zero_capacity() {
let policy = OverloadPolicy::QueueWithinBudget { max_pending: 0 };
assert_eq!(
policy.validate().unwrap_err(),
ServiceContractError::InvalidQueueCapacity
);
}
#[test]
fn overload_queue_accepts_nonzero_capacity() {
let policy = OverloadPolicy::QueueWithinBudget { max_pending: 100 };
assert!(policy.validate().is_ok());
}
#[test]
fn overload_non_queue_variants_validate() {
for policy in [
OverloadPolicy::RejectNew,
OverloadPolicy::DropEphemeral,
OverloadPolicy::FailFast,
] {
assert!(policy.validate().is_ok());
}
}
#[test]
fn default_contract_schema_validates() {
assert!(ServiceContractSchema::default().validate().is_ok());
}
#[test]
fn contract_schema_rejects_invalid_overload_policy() {
let mut schema = ServiceContractSchema::default();
schema.overload_policy = OverloadPolicy::QueueWithinBudget { max_pending: 0 };
assert!(schema.validate().is_err());
}
#[test]
fn provider_terms_reject_compensation_below_contract() {
let provider = ProviderTerms {
compensation_policy: CompensationSemantics::None,
..provider_terms()
};
let err = provider.validate_against(&contract()).unwrap_err();
assert!(matches!(
err,
ServiceContractError::ProviderCompensationBelowContract { .. }
));
}
#[test]
fn provider_terms_reject_evidence_below_contract() {
let c = ServiceContractSchema {
evidence_requirements: EvidenceLevel::Forensic,
..contract()
};
let provider = ProviderTerms {
evidence_level: EvidenceLevel::Standard,
..provider_terms()
};
let err = provider.validate_against(&c).unwrap_err();
assert!(matches!(
err,
ServiceContractError::ProviderEvidenceBelowContract { .. }
));
}
#[test]
fn provider_terms_reject_incompatible_mobility() {
let c = ServiceContractSchema {
mobility_constraints: MobilityConstraint::Pinned,
..contract()
};
let provider = ProviderTerms {
mobility_constraint: MobilityConstraint::Unrestricted,
..provider_terms()
};
let err = provider.validate_against(&c).unwrap_err();
assert!(matches!(
err,
ServiceContractError::ProviderMobilityIncompatible { .. }
));
}
#[test]
fn service_failure_maps_to_correct_abort_reasons() {
assert_eq!(
ServiceFailure::Cancelled.abort_reason(),
ObligationAbortReason::Cancel
);
assert_eq!(
ServiceFailure::TimedOut.abort_reason(),
ObligationAbortReason::Explicit
);
assert_eq!(
ServiceFailure::Rejected.abort_reason(),
ObligationAbortReason::Explicit
);
assert_eq!(
ServiceFailure::Overloaded.abort_reason(),
ObligationAbortReason::Error
);
assert_eq!(
ServiceFailure::TransportError.abort_reason(),
ObligationAbortReason::Error
);
assert_eq!(
ServiceFailure::ApplicationError.abort_reason(),
ObligationAbortReason::Error
);
}
#[test]
fn cleanup_urgency_display() {
assert_eq!(format!("{}", CleanupUrgency::Background), "background");
assert_eq!(format!("{}", CleanupUrgency::Prompt), "prompt");
assert_eq!(format!("{}", CleanupUrgency::Immediate), "immediate");
}
#[test]
fn cancellation_obligations_display() {
assert_eq!(
format!("{}", CancellationObligations::BestEffortDrain),
"best-effort-drain"
);
assert_eq!(
format!("{}", CancellationObligations::DrainBeforeReply),
"drain-before-reply"
);
assert_eq!(
format!("{}", CancellationObligations::DrainAndCompensate),
"drain-and-compensate"
);
}
#[test]
fn service_failure_display() {
assert_eq!(format!("{}", ServiceFailure::Cancelled), "cancelled");
assert_eq!(format!("{}", ServiceFailure::TimedOut), "timed_out");
assert_eq!(format!("{}", ServiceFailure::Overloaded), "overloaded");
}
#[test]
fn service_contract_schema_json_round_trip() {
let schema = ServiceContractSchema::default();
let json = serde_json::to_string(&schema).expect("serialize");
let rt: ServiceContractSchema = serde_json::from_str(&json).expect("deserialize");
assert_eq!(schema, rt);
}
#[test]
fn payload_shape_all_variants_json_round_trip() {
for shape in [
PayloadShape::Empty,
PayloadShape::JsonDocument,
PayloadShape::BinaryBlob,
PayloadShape::SubjectEncoded,
PayloadShape::NamedSchema {
schema: "test.v1".to_owned(),
},
] {
let json = serde_json::to_string(&shape).expect("serialize");
let rt: PayloadShape = serde_json::from_str(&json).expect("deserialize");
assert_eq!(shape, rt);
}
}
#[test]
fn mobility_constraint_all_variants_json_round_trip() {
for mc in [
MobilityConstraint::Unrestricted,
MobilityConstraint::BoundedRegion {
region: "us-west".to_owned(),
},
MobilityConstraint::Pinned,
] {
let json = serde_json::to_string(&mc).expect("serialize");
let rt: MobilityConstraint = serde_json::from_str(&json).expect("deserialize");
assert_eq!(mc, rt);
}
}
#[test]
fn overload_policy_all_variants_json_round_trip() {
for policy in [
OverloadPolicy::RejectNew,
OverloadPolicy::QueueWithinBudget { max_pending: 50 },
OverloadPolicy::DropEphemeral,
OverloadPolicy::FailFast,
] {
let json = serde_json::to_string(&policy).expect("serialize");
let rt: OverloadPolicy = serde_json::from_str(&json).expect("deserialize");
assert_eq!(policy, rt);
}
}
#[test]
fn default_enum_values_match_expected() {
assert_eq!(PayloadShape::default(), PayloadShape::Empty);
assert_eq!(ReplyShape::default(), ReplyShape::None);
assert_eq!(CleanupUrgency::default(), CleanupUrgency::Prompt);
assert_eq!(
CancellationObligations::default(),
CancellationObligations::DrainBeforeReply
);
assert_eq!(
CompensationSemantics::default(),
CompensationSemantics::None
);
assert_eq!(
MobilityConstraint::default(),
MobilityConstraint::Unrestricted
);
assert_eq!(EvidenceLevel::default(), EvidenceLevel::Standard);
assert_eq!(OverloadPolicy::default(), OverloadPolicy::RejectNew);
}
#[test]
fn unresolved_service_obligation_is_visible_to_leak_checks() {
let mut ledger = ObligationLedger::new();
let obligation = ServiceObligation::allocate(
&mut ledger,
"req-5",
"caller",
"callee",
"svc.echo",
DeliveryClass::ObligationBacked,
make_task(),
make_region(),
Time::from_nanos(3),
Some(Duration::from_secs(1)),
)
.expect("tracked request should allocate");
let obligation_id = obligation.obligation_id().expect("tracked id");
let leaks = ledger.check_leaks();
assert!(!leaks.is_clean());
assert_eq!(ledger.pending_count(), 1);
assert!(leaks.leaked.iter().any(|entry| entry.id == obligation_id));
drop(obligation);
}
#[test]
fn request_certificate_from_validated_roundtrip() {
let request = ValidatedServiceRequest {
delivery_class: DeliveryClass::ObligationBacked,
timeout: Some(Duration::from_secs(5)),
priority_hint: None,
guaranteed_durability: DeliveryClass::MobilitySafe,
evidence_level: EvidenceLevel::Standard,
mobility_constraint: MobilityConstraint::Unrestricted,
compensation_policy: CompensationSemantics::None,
overload_policy: OverloadPolicy::RejectNew,
};
let cert = RequestCertificate::from_validated(
"req-1".into(),
"caller-a".into(),
"orders.region1.created".into(),
&request,
super::super::ir::ReplySpaceRule::CallerInbox,
"OrderService".into(),
0xDEAD_BEEF,
Time::from_nanos(1000),
);
assert_eq!(cert.request_id, "req-1");
assert_eq!(cert.caller, "caller-a");
assert_eq!(cert.delivery_class, DeliveryClass::ObligationBacked);
assert_eq!(cert.capability_fingerprint, 0xDEAD_BEEF);
assert!(cert.validate().is_ok());
}
#[test]
fn request_certificate_rejects_empty_fields() {
let cert = RequestCertificate {
request_id: String::new(),
caller: "caller".into(),
subject: "sub".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
reply_space_rule: super::super::ir::ReplySpaceRule::CallerInbox,
service_class: "svc".into(),
capability_fingerprint: 0,
issued_at: Time::from_nanos(1),
timeout: None,
};
assert!(cert.validate().is_err());
}
#[test]
fn request_certificate_rejects_zero_timeout() {
let cert = RequestCertificate {
request_id: "req-1".into(),
caller: "caller".into(),
subject: "sub".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
reply_space_rule: super::super::ir::ReplySpaceRule::CallerInbox,
service_class: "svc".into(),
capability_fingerprint: 0,
issued_at: Time::from_nanos(1),
timeout: Some(Duration::ZERO),
};
assert!(matches!(
cert.validate(),
Err(ServiceObligationError::ZeroTimeout)
));
}
#[test]
fn request_certificate_rejects_empty_reply_space_prefix() {
let cert = RequestCertificate {
request_id: "req-1".into(),
caller: "caller".into(),
subject: "sub".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
reply_space_rule: super::super::ir::ReplySpaceRule::SharedPrefix {
prefix: " ".into(),
},
service_class: "svc".into(),
capability_fingerprint: 0,
issued_at: Time::from_nanos(1),
timeout: None,
};
assert_eq!(
cert.validate(),
Err(ServiceObligationError::EmptyField {
field: "reply_space_rule.prefix",
})
);
}
#[test]
fn request_certificate_digest_is_deterministic() {
let cert = RequestCertificate {
request_id: "req-1".into(),
caller: "caller-a".into(),
subject: "orders.created".into(),
delivery_class: DeliveryClass::DurableOrdered,
reply_space_rule: super::super::ir::ReplySpaceRule::CallerInbox,
service_class: "OrderSvc".into(),
capability_fingerprint: 42,
issued_at: Time::from_nanos(1000),
timeout: None,
};
assert_eq!(cert.digest(), cert.digest());
}
#[test]
fn request_certificate_digest_distinguishes_reply_contract_metadata() {
let shared = RequestCertificate {
request_id: "req-1".into(),
caller: "caller-a".into(),
subject: "orders.created".into(),
delivery_class: DeliveryClass::DurableOrdered,
reply_space_rule: super::super::ir::ReplySpaceRule::SharedPrefix {
prefix: "_INBOX.shared".into(),
},
service_class: "OrderSvc".into(),
capability_fingerprint: 42,
issued_at: Time::from_nanos(1000),
timeout: Some(Duration::from_secs(5)),
};
let dedicated = RequestCertificate {
reply_space_rule: super::super::ir::ReplySpaceRule::DedicatedPrefix {
prefix: "_INBOX.dedicated".into(),
},
..shared.clone()
};
assert_ne!(shared.digest(), dedicated.digest());
}
#[test]
fn reply_certificate_from_commit() {
let commit = ServiceReplyCommit {
request_id: "req-1".into(),
service_obligation_id: None,
payload: b"hello".to_vec(),
delivery_class: DeliveryClass::EphemeralInteractive,
reply_obligation: None,
};
let cert = ReplyCertificate::from_commit(
&commit,
"callee-a".into(),
Time::from_nanos(2000),
Duration::from_millis(50),
);
assert_eq!(cert.request_id, "req-1");
assert_eq!(cert.callee, "callee-a");
assert!(!cert.is_chunked);
assert!(cert.total_chunks.is_none());
assert!(cert.validate().is_ok());
}
#[test]
fn reply_certificate_rejects_chunked_without_count() {
let cert = ReplyCertificate {
request_id: "req-1".into(),
callee: "callee-a".into(),
delivery_class: DeliveryClass::DurableOrdered,
service_obligation_id: None,
payload_digest: 0,
is_chunked: true,
total_chunks: None,
issued_at: Time::from_nanos(1),
service_latency: Duration::from_millis(1),
};
assert!(matches!(
cert.validate(),
Err(ServiceObligationError::ChunkedReplyMissingCount)
));
}
#[test]
fn reply_certificate_rejects_unary_chunk_count() {
let cert = ReplyCertificate {
request_id: "req-1".into(),
callee: "callee-a".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
service_obligation_id: None,
payload_digest: 0,
is_chunked: false,
total_chunks: Some(1),
issued_at: Time::from_nanos(1),
service_latency: Duration::from_millis(1),
};
assert!(matches!(
cert.validate(),
Err(ServiceObligationError::UnaryReplyChunkCountPresent)
));
}
#[test]
fn reply_certificate_rejects_tracked_class_without_service_obligation_id() {
let cert = ReplyCertificate {
request_id: "req-1".into(),
callee: "callee-a".into(),
delivery_class: DeliveryClass::ObligationBacked,
service_obligation_id: None,
payload_digest: 0,
is_chunked: false,
total_chunks: None,
issued_at: Time::from_nanos(1),
service_latency: Duration::from_millis(1),
};
assert!(matches!(
cert.validate(),
Err(
ServiceObligationError::TrackedReplyMissingParentObligationId {
delivery_class: DeliveryClass::ObligationBacked,
}
)
));
}
#[test]
fn reply_certificate_digest_is_deterministic() {
let cert = ReplyCertificate {
request_id: "req-1".into(),
callee: "callee-a".into(),
delivery_class: DeliveryClass::DurableOrdered,
service_obligation_id: None,
payload_digest: 0xCAFE,
is_chunked: false,
total_chunks: None,
issued_at: Time::from_nanos(1000),
service_latency: Duration::from_millis(10),
};
assert_eq!(cert.digest(), cert.digest());
}
#[test]
fn reply_certificate_digest_distinguishes_chunk_metadata() {
let unary = ReplyCertificate {
request_id: "req-1".into(),
callee: "callee-a".into(),
delivery_class: DeliveryClass::DurableOrdered,
service_obligation_id: Some(ObligationId::new_for_test(7, 0)),
payload_digest: 0xCAFE,
is_chunked: false,
total_chunks: None,
issued_at: Time::from_nanos(1000),
service_latency: Duration::from_millis(10),
};
let chunked = ReplyCertificate {
is_chunked: true,
total_chunks: Some(3),
..unary.clone()
};
assert_ne!(unary.digest(), chunked.digest());
}
#[test]
fn chunked_reply_lifecycle_bounded() {
let mut chunked = ChunkedReplyObligation::new(
"family-1".into(),
"req-1".into(),
None,
Some(3),
DeliveryClass::DurableOrdered,
AckKind::Recoverable,
)
.unwrap();
assert!(!chunked.is_complete());
assert_eq!(chunked.receive_chunk().unwrap(), 0);
assert_eq!(chunked.receive_chunk().unwrap(), 1);
assert!(!chunked.is_complete());
assert_eq!(chunked.receive_chunk().unwrap(), 2);
assert!(chunked.is_complete());
assert!(matches!(
chunked.receive_chunk(),
Err(ServiceObligationError::ChunkedReplyOverflow {
expected: 3,
received: 4,
})
));
let count = chunked.finalize().unwrap();
assert_eq!(count, 3);
assert!(chunked.is_finalized());
}
#[test]
fn chunked_reply_unbounded_stream() {
let mut chunked = ChunkedReplyObligation::new(
"family-2".into(),
"req-2".into(),
Some(ObligationId::new_for_test(21, 0)),
None, DeliveryClass::ObligationBacked,
AckKind::Served,
)
.unwrap();
for _ in 0..100 {
chunked.receive_chunk().unwrap();
}
assert!(!chunked.is_complete()); assert_eq!(chunked.received_chunks(), 100);
let count = chunked.finalize().unwrap();
assert_eq!(count, 100);
}
#[test]
fn chunked_reply_rejects_zero_expected() {
assert!(matches!(
ChunkedReplyObligation::new(
"family-3".into(),
"req-3".into(),
None,
Some(0),
DeliveryClass::DurableOrdered,
AckKind::Recoverable,
),
Err(ServiceObligationError::ChunkedReplyZeroExpected)
));
}
#[test]
fn chunked_reply_finalize_is_idempotent_guard() {
let mut chunked = ChunkedReplyObligation::new(
"family-4".into(),
"req-4".into(),
None,
Some(1),
DeliveryClass::EphemeralInteractive,
AckKind::Accepted,
)
.unwrap();
chunked.receive_chunk().unwrap();
chunked.finalize().unwrap();
assert!(matches!(
chunked.finalize(),
Err(ServiceObligationError::AlreadyResolved { .. })
));
}
#[test]
fn chunked_reply_certificate_carries_chunk_count() {
let mut chunked = ChunkedReplyObligation::new(
"family-5".into(),
"req-5".into(),
None,
Some(2),
DeliveryClass::DurableOrdered,
AckKind::Recoverable,
)
.unwrap();
chunked.receive_chunk().unwrap();
chunked.receive_chunk().unwrap();
chunked.finalize().unwrap();
let cert = chunked
.certificate(
"callee-a".into(),
0xBEEF,
Time::from_nanos(3000),
Duration::from_millis(100),
)
.expect("finalized bounded stream should produce a certificate");
assert!(cert.is_chunked);
assert_eq!(cert.total_chunks, Some(2));
assert_eq!(cert.payload_digest, 0xBEEF);
assert!(cert.validate().is_ok());
}
#[test]
fn chunked_reply_finalize_rejects_incomplete_bounded_stream() {
let mut chunked = ChunkedReplyObligation::new(
"family-early-finalize".into(),
"req-early-finalize".into(),
None,
Some(2),
DeliveryClass::DurableOrdered,
AckKind::Recoverable,
)
.unwrap();
chunked.receive_chunk().unwrap();
assert!(matches!(
chunked.finalize(),
Err(ServiceObligationError::ChunkedReplyIncomplete {
expected: 2,
received: 1,
})
));
assert!(!chunked.is_finalized());
}
#[test]
fn chunked_reply_certificate_requires_finalize() {
let mut chunked = ChunkedReplyObligation::new(
"family-unfinalized-cert".into(),
"req-unfinalized-cert".into(),
None,
Some(1),
DeliveryClass::DurableOrdered,
AckKind::Recoverable,
)
.unwrap();
chunked.receive_chunk().unwrap();
assert!(matches!(
chunked.certificate(
"callee-a".into(),
0xCAFE,
Time::from_nanos(1),
Duration::from_millis(10),
),
Err(ServiceObligationError::ChunkedReplyNotFinalized)
));
}
#[test]
fn chunked_reply_receive_after_finalize_fails() {
let mut chunked = ChunkedReplyObligation::new(
"family-6".into(),
"req-6".into(),
Some(ObligationId::new_for_test(22, 0)),
Some(1), DeliveryClass::ObligationBacked,
AckKind::Served,
)
.unwrap();
chunked.receive_chunk().unwrap();
chunked.finalize().unwrap();
assert!(matches!(
chunked.receive_chunk(),
Err(ServiceObligationError::AlreadyResolved { .. })
));
}
#[test]
fn chunked_reply_finalize_rejects_incomplete() {
let mut chunked = ChunkedReplyObligation::new(
"family-7".into(),
"req-7".into(),
Some(ObligationId::new_for_test(23, 0)),
Some(5),
DeliveryClass::ObligationBacked,
AckKind::Served,
)
.unwrap();
chunked.receive_chunk().unwrap();
assert!(matches!(
chunked.finalize(),
Err(ServiceObligationError::ChunkedReplyIncomplete {
expected: 5,
received: 1,
})
));
}
#[test]
fn chunked_reply_rejects_boundary_below_minimum() {
assert!(matches!(
ChunkedReplyObligation::new(
"family-below-min".into(),
"req-below-min".into(),
None,
Some(1),
DeliveryClass::DurableOrdered,
AckKind::Committed,
),
Err(ServiceObligationError::ReplyBoundaryBelowMinimum {
delivery_class: DeliveryClass::DurableOrdered,
minimum_boundary: AckKind::Recoverable,
requested_boundary: AckKind::Committed,
})
));
}
#[test]
fn chunked_reply_rejects_untracked_follow_up_boundary() {
assert!(matches!(
ChunkedReplyObligation::new(
"family-untracked".into(),
"req-untracked".into(),
None,
Some(1),
DeliveryClass::EphemeralInteractive,
AckKind::Received,
),
Err(ServiceObligationError::ReplyTrackingUnavailable {
delivery_class: DeliveryClass::EphemeralInteractive,
requested_boundary: AckKind::Received,
receipt_required: false,
})
));
}
#[test]
fn chunked_reply_rejects_tracked_class_without_parent_obligation_id() {
assert!(matches!(
ChunkedReplyObligation::new(
"family-tracked-missing-parent".into(),
"req-tracked-missing-parent".into(),
None,
Some(1),
DeliveryClass::ObligationBacked,
AckKind::Served,
),
Err(
ServiceObligationError::TrackedReplyMissingParentObligationId {
delivery_class: DeliveryClass::ObligationBacked,
}
)
));
}
#[test]
fn quantitative_contract_valid_interactive_slo() {
let contract = QuantitativeContract {
name: "order-processing-p99".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.999,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Passive,
record_violations: true,
};
assert!(contract.validate().is_ok());
assert!(contract.latency_satisfies(Duration::from_millis(30)));
assert!(!contract.latency_satisfies(Duration::from_millis(100)));
}
#[test]
fn quantitative_contract_valid_with_fixed_retry() {
let contract = QuantitativeContract {
name: "durable-ack".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_secs(5),
target_probability: 0.99,
retry_law: RetryLaw::Fixed {
interval: Duration::from_millis(500),
max_attempts: 3,
},
monitoring_policy: MonitoringPolicy::Sampled {
sampling_ratio: 0.1,
window_size: 100,
},
record_violations: false,
};
assert!(contract.validate().is_ok());
}
#[test]
fn quantitative_contract_valid_with_exponential_backoff() {
let contract = QuantitativeContract {
name: "obligation-backed-slo".into(),
delivery_class: DeliveryClass::ObligationBacked,
target_latency: Duration::from_secs(1),
target_probability: 0.995,
retry_law: RetryLaw::ExponentialBackoff {
initial_delay: Duration::from_millis(100),
multiplier: 2.0,
max_delay: Duration::from_secs(10),
max_attempts: 5,
},
monitoring_policy: MonitoringPolicy::EProcess {
confidence: 0.99,
max_evidence: 100.0,
},
record_violations: true,
};
assert!(contract.validate().is_ok());
}
#[test]
fn quantitative_contract_valid_with_conformal_monitoring() {
let contract = QuantitativeContract {
name: "forensic-slo".into(),
delivery_class: DeliveryClass::ForensicReplayable,
target_latency: Duration::from_secs(30),
target_probability: 0.9,
retry_law: RetryLaw::BudgetBounded {
interval: Duration::from_secs(1),
},
monitoring_policy: MonitoringPolicy::Conformal {
target_coverage: 0.95,
calibration_size: 50,
},
record_violations: true,
};
assert!(contract.validate().is_ok());
}
#[test]
fn quantitative_contract_rejects_empty_name() {
let contract = QuantitativeContract {
name: String::new(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.999,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::EmptyName)
));
}
#[test]
fn quantitative_contract_rejects_zero_latency() {
let contract = QuantitativeContract {
name: "zero-lat".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::ZERO,
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::ZeroLatency)
));
}
#[test]
fn quantitative_contract_rejects_invalid_probability() {
for p in [0.0, -0.1, 1.1, f64::NAN] {
let contract = QuantitativeContract {
name: "bad-prob".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: p,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(contract.validate().is_err(), "probability {p} should fail");
}
}
#[test]
fn quantitative_contract_rejects_bad_retry_law() {
let contract = QuantitativeContract {
name: "bad-retry".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_secs(1),
target_probability: 0.99,
retry_law: RetryLaw::Fixed {
interval: Duration::ZERO,
max_attempts: 3,
},
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::ZeroRetryInterval)
));
let contract = QuantitativeContract {
name: "bad-retry".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_secs(1),
target_probability: 0.99,
retry_law: RetryLaw::Fixed {
interval: Duration::from_millis(100),
max_attempts: 0,
},
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::ZeroMaxAttempts)
));
let contract = QuantitativeContract {
name: "bad-mult".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_secs(1),
target_probability: 0.99,
retry_law: RetryLaw::ExponentialBackoff {
initial_delay: Duration::from_millis(100),
multiplier: 1.0,
max_delay: Duration::from_secs(10),
max_attempts: 3,
},
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::InvalidMultiplier(_))
));
let contract = QuantitativeContract {
name: "bad-delay-order".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_secs(1),
target_probability: 0.99,
retry_law: RetryLaw::ExponentialBackoff {
initial_delay: Duration::from_secs(2),
multiplier: 2.0,
max_delay: Duration::from_secs(1),
max_attempts: 3,
},
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::MaxDelayBelowInitialDelay {
initial_delay,
max_delay,
}) if initial_delay == Duration::from_secs(2) && max_delay == Duration::from_secs(1)
));
}
#[test]
fn quantitative_contract_rejects_bad_monitoring() {
let contract = QuantitativeContract {
name: "bad-mon".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Sampled {
sampling_ratio: 0.5,
window_size: 0,
},
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::ZeroWindowSize)
));
let contract = QuantitativeContract {
name: "bad-eproc".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::EProcess {
confidence: 1.0,
max_evidence: 100.0,
},
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::InvalidConfidence(_))
));
let contract = QuantitativeContract {
name: "bad-conformal".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Conformal {
target_coverage: 0.95,
calibration_size: 0,
},
record_violations: false,
};
assert!(matches!(
contract.validate(),
Err(QuantitativeContractError::ZeroCalibrationSize)
));
}
#[test]
fn quantitative_contract_rejects_non_finite_float_parameters() {
let bad_multiplier = QuantitativeContract {
name: "bad-multiplier".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_secs(1),
target_probability: 0.99,
retry_law: RetryLaw::ExponentialBackoff {
initial_delay: Duration::from_millis(100),
multiplier: f64::NAN,
max_delay: Duration::from_secs(10),
max_attempts: 3,
},
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
assert!(matches!(
bad_multiplier.validate(),
Err(QuantitativeContractError::InvalidMultiplier(value)) if value.is_nan()
));
let bad_sampling_ratio = QuantitativeContract {
name: "bad-sampling".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Sampled {
sampling_ratio: f64::NAN,
window_size: 10,
},
record_violations: false,
};
assert!(matches!(
bad_sampling_ratio.validate(),
Err(QuantitativeContractError::InvalidSamplingRatio(value)) if value.is_nan()
));
let bad_max_evidence = QuantitativeContract {
name: "bad-evidence".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::EProcess {
confidence: 0.95,
max_evidence: f64::INFINITY,
},
record_violations: false,
};
assert!(matches!(
bad_max_evidence.validate(),
Err(QuantitativeContractError::ZeroMaxEvidence)
));
let cap_below_threshold = QuantitativeContract {
name: "bad-evidence-threshold".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::EProcess {
confidence: 0.80,
max_evidence: 4.0,
},
record_violations: false,
};
assert!(matches!(
cap_below_threshold.validate(),
Err(QuantitativeContractError::MaxEvidenceBelowAlertThreshold {
max_evidence,
threshold,
}) if (max_evidence - 4.0).abs() < f64::EPSILON
&& (threshold - 5.0).abs() < 1e-9
));
}
#[test]
fn quantitative_contract_uses_clamped_eprocess_threshold_near_confidence_one() {
let confidence = f64::from_bits(1.0_f64.to_bits() - 1);
let threshold = quantitative_eprocess_threshold(confidence);
assert_eq!(quantitative_eprocess_alpha(confidence), f64::EPSILON);
assert_eq!(threshold, 1.0 / f64::EPSILON);
let contract = QuantitativeContract {
name: "near-one-confidence".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::EProcess {
confidence,
max_evidence: threshold,
},
record_violations: false,
};
assert!(contract.validate().is_ok());
let rejected = QuantitativeContract {
monitoring_policy: MonitoringPolicy::EProcess {
confidence,
max_evidence: threshold / 2.0,
},
..contract
};
assert!(matches!(
rejected.validate(),
Err(QuantitativeContractError::MaxEvidenceBelowAlertThreshold {
max_evidence,
threshold: rejected_threshold,
}) if max_evidence == threshold / 2.0 && rejected_threshold == threshold
));
}
#[test]
fn quantitative_contract_serde_roundtrip() {
let contract = QuantitativeContract {
name: "serde-test".into(),
delivery_class: DeliveryClass::ObligationBacked,
target_latency: Duration::from_millis(200),
target_probability: 0.999,
retry_law: RetryLaw::ExponentialBackoff {
initial_delay: Duration::from_millis(50),
multiplier: 2.0,
max_delay: Duration::from_secs(5),
max_attempts: 4,
},
monitoring_policy: MonitoringPolicy::EProcess {
confidence: 0.99,
max_evidence: 50.0,
},
record_violations: true,
};
let json = serde_json::to_string(&contract).expect("serialize");
let deserialized: QuantitativeContract = serde_json::from_str(&json).expect("deserialize");
assert_eq!(contract, deserialized);
}
#[test]
fn quantitative_monitor_passive_records_policy_change_evidence() {
let contract = QuantitativeContract {
name: "passive-evidence".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(50),
target_probability: 0.90,
retry_law: RetryLaw::Fixed {
interval: Duration::from_millis(10),
max_attempts: 2,
},
monitoring_policy: MonitoringPolicy::Passive,
record_violations: true,
};
let mut monitor = QuantitativeContractMonitor::new(contract).expect("valid monitor");
monitor.observe_latency(Duration::from_millis(20));
monitor.observe_latency(Duration::from_millis(70));
let evaluation = monitor.observe_latency(Duration::from_millis(80));
assert_eq!(evaluation.state, QuantitativeContractState::Violated);
assert_eq!(
evaluation.recommendation,
QuantitativePolicyRecommendation::Escalate
);
let evidence = monitor
.policy_change_evidence()
.expect("violations should produce evidence");
assert_eq!(evidence.contract_name, "passive-evidence");
assert_eq!(evidence.hit_count, 1);
assert_eq!(evidence.observations, 3);
}
#[test]
fn quantitative_monitor_sampled_window_detects_violation() {
let contract = QuantitativeContract {
name: "sampled-window".into(),
delivery_class: DeliveryClass::DurableOrdered,
target_latency: Duration::from_millis(40),
target_probability: 0.75,
retry_law: RetryLaw::BudgetBounded {
interval: Duration::from_millis(15),
},
monitoring_policy: MonitoringPolicy::Sampled {
sampling_ratio: 1.0,
window_size: 4,
},
record_violations: true,
};
let mut monitor = QuantitativeContractMonitor::new(contract).expect("valid monitor");
monitor.observe_latency(Duration::from_millis(10));
monitor.observe_latency(Duration::from_millis(20));
monitor.observe_latency(Duration::from_millis(60));
let evaluation = monitor.observe_latency(Duration::from_millis(70));
assert_eq!(evaluation.state, QuantitativeContractState::Violated);
assert_eq!(
evaluation.recommendation,
QuantitativePolicyRecommendation::Escalate
);
let QuantitativeMonitorEvidence::Sampled {
sampled_observations,
window_hit_rate,
..
} = evaluation.evidence
else {
panic!("expected sampled monitoring evidence");
};
assert_eq!(sampled_observations, 4);
assert!((window_hit_rate - 0.5).abs() < 1e-9);
}
#[test]
fn quantitative_monitor_eprocess_alerts_and_auto_resets() {
let contract = QuantitativeContract {
name: "eprocess-slo".into(),
delivery_class: DeliveryClass::ObligationBacked,
target_latency: Duration::from_millis(10),
target_probability: 0.95,
retry_law: RetryLaw::Fixed {
interval: Duration::from_millis(5),
max_attempts: 3,
},
monitoring_policy: MonitoringPolicy::EProcess {
confidence: 0.80,
max_evidence: 10.0,
},
record_violations: true,
};
let mut monitor = QuantitativeContractMonitor::new(contract).expect("valid monitor");
monitor.observe_latency(Duration::from_millis(100));
monitor.observe_latency(Duration::from_millis(120));
let evaluation = monitor.observe_latency(Duration::from_millis(150));
assert_eq!(evaluation.state, QuantitativeContractState::Violated);
let QuantitativeMonitorEvidence::EProcess {
e_value,
threshold,
max_evidence,
capped,
alert_state,
..
} = evaluation.evidence
else {
panic!("expected e-process evidence");
};
assert!(
max_evidence >= threshold,
"test config must not cap evidence below the alert threshold"
);
assert!(
e_value >= threshold,
"reported evidence should clear the alert threshold once alert_state=Alert"
);
assert!(capped, "e-value should cap and reset");
assert_eq!(alert_state, QuantitativeMonitorAlertState::Alert);
let after_reset = monitor.observe_latency(Duration::from_millis(5));
let QuantitativeMonitorEvidence::EProcess { e_value, .. } = after_reset.evidence else {
panic!("expected e-process evidence after reset");
};
assert!(
e_value <= 1.0,
"fresh monitor should restart from a low evidence level"
);
}
#[test]
fn quantitative_monitor_conformal_reports_drift() {
let contract = QuantitativeContract {
name: "conformal-slo".into(),
delivery_class: DeliveryClass::ForensicReplayable,
target_latency: Duration::from_millis(50),
target_probability: 0.80,
retry_law: RetryLaw::BudgetBounded {
interval: Duration::from_millis(10),
},
monitoring_policy: MonitoringPolicy::Conformal {
target_coverage: 0.80,
calibration_size: 4,
},
record_violations: true,
};
let mut monitor = QuantitativeContractMonitor::new(contract).expect("valid monitor");
for sample in [20_u64, 22, 24, 26] {
let evaluation = monitor.observe_latency(Duration::from_millis(sample));
assert_eq!(
evaluation.state,
QuantitativeContractState::Healthy,
"calibration samples should not trigger drift by themselves"
);
}
let evaluation = monitor.observe_latency(Duration::from_millis(200));
assert_eq!(evaluation.state, QuantitativeContractState::AtRisk);
assert_eq!(
evaluation.recommendation,
QuantitativePolicyRecommendation::ApplyRetryLaw
);
let QuantitativeMonitorEvidence::Conformal {
calibration_samples,
latest_conforming,
threshold_latency,
..
} = evaluation.evidence
else {
panic!("expected conformal evidence");
};
assert_eq!(calibration_samples, 4);
assert_eq!(latest_conforming, Some(false));
assert!(
threshold_latency.is_some(),
"conformal evaluation should expose the calibrated threshold"
);
assert!(
monitor.policy_change_evidence().is_some(),
"non-healthy conformal result should emit evidence when recording is enabled"
);
}
#[test]
fn quantitative_monitor_suppresses_policy_change_evidence_when_disabled() {
let contract = QuantitativeContract {
name: "no-evidence".into(),
delivery_class: DeliveryClass::EphemeralInteractive,
target_latency: Duration::from_millis(30),
target_probability: 0.99,
retry_law: RetryLaw::None,
monitoring_policy: MonitoringPolicy::Passive,
record_violations: false,
};
let mut monitor = QuantitativeContractMonitor::new(contract).expect("valid monitor");
monitor.observe_latency(Duration::from_millis(100));
monitor.observe_latency(Duration::from_millis(110));
monitor.observe_latency(Duration::from_millis(120));
assert!(monitor.policy_change_evidence().is_none());
}
#[test]
fn workflow_linear_execution_tracks_subject_steps() {
let mut ledger = ObligationLedger::new();
let task = make_task();
let region = make_region();
let reserve = workflow_step(
"reserve",
"fabric.order.reserve",
vec![
WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
},
WorkflowObligationRole::Lease {
resource: "inventory.sku-42".to_owned(),
},
],
&[],
);
let settle = workflow_step(
"settle",
"fabric.payment.settle",
vec![WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
}],
&[],
);
let mut saga = SagaState::new("checkout", vec![reserve, settle]).expect("valid saga");
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(10))
.expect("start first step");
assert_eq!(saga.current_step, Some(0));
let owed = saga
.what_is_still_owed(&ledger)
.expect("owed obligations for first step");
assert_eq!(owed.len(), 2);
assert!(owed.iter().all(|entry| entry.step_id == "reserve"));
saga.complete_current_step(&mut ledger, Time::from_nanos(20))
.expect("complete first step");
assert_eq!(saga.steps[0].status, WorkflowStepStatus::Completed);
assert_eq!(saga.current_step, Some(1));
assert!(
saga.what_is_still_owed(&ledger)
.expect("no second-step obligations before start")
.is_empty()
);
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(30))
.expect("start second step");
let owed = saga
.what_is_still_owed(&ledger)
.expect("owed obligations for second step");
assert_eq!(owed.len(), 1);
assert_eq!(owed[0].step_id, "settle");
saga.complete_current_step(&mut ledger, Time::from_nanos(40))
.expect("complete second step");
assert_eq!(saga.steps[1].status, WorkflowStepStatus::Completed);
assert_eq!(saga.current_step, None);
assert!(
saga.what_is_still_owed(&ledger)
.expect("all work should be resolved")
.is_empty()
);
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn workflow_failure_activates_and_commits_compensation() {
let mut ledger = ObligationLedger::new();
let task = make_task();
let region = make_region();
let charge = workflow_step(
"charge",
"fabric.payment.charge",
vec![
WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
},
WorkflowObligationRole::Lease {
resource: "payment.intent.pi_123".to_owned(),
},
],
&["fabric.payment.refund", "fabric.inventory.restock"],
);
let mut saga = SagaState::new("payment-saga", vec![charge]).expect("valid saga");
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(5))
.expect("start workflow");
saga.fail_current_step(
&mut ledger,
Time::from_nanos(8),
ServiceFailure::ApplicationError,
task,
region,
)
.expect("fail current step");
assert_eq!(
saga.steps[0].status,
WorkflowStepStatus::Compensating {
failure: ServiceFailure::ApplicationError,
}
);
let owed = saga
.what_is_still_owed(&ledger)
.expect("compensation should now be owed");
assert_eq!(owed.len(), 2);
assert!(
owed.iter()
.all(|entry| matches!(entry.role, WorkflowObligationRole::Compensation { .. }))
);
saga.complete_current_compensation(&mut ledger, Time::from_nanos(13))
.expect("commit compensation");
assert_eq!(
saga.steps[0].status,
WorkflowStepStatus::Compensated {
failure: ServiceFailure::ApplicationError,
}
);
assert_eq!(saga.compensated_steps, vec!["charge".to_owned()]);
assert!(
saga.what_is_still_owed(&ledger)
.expect("compensation should be resolved")
.is_empty()
);
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn workflow_partial_progress_tracks_only_active_step_obligations() {
let mut ledger = ObligationLedger::new();
let task = make_task();
let region = make_region();
let prepare = workflow_step(
"prepare",
"fabric.order.prepare",
vec![WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
}],
&[],
);
let ship = workflow_step(
"ship",
"fabric.order.ship",
vec![
WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
},
WorkflowObligationRole::Deadline {
deadline: Time::from_nanos(500),
},
],
&[],
);
let mut saga = SagaState::new("ship-flow", vec![prepare, ship]).expect("valid saga");
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(10))
.expect("start prepare");
saga.complete_current_step(&mut ledger, Time::from_nanos(20))
.expect("complete prepare");
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(30))
.expect("start ship");
let owed = saga
.what_is_still_owed(&ledger)
.expect("active step obligations");
assert_eq!(owed.len(), 2);
assert!(owed.iter().all(|entry| entry.step_id == "ship"));
assert!(
owed.iter()
.any(|entry| matches!(entry.role, WorkflowObligationRole::Reply { .. }))
);
assert!(
owed.iter()
.any(|entry| matches!(entry.role, WorkflowObligationRole::Deadline { .. }))
);
}
#[test]
fn workflow_recovery_after_crash_preserves_explicit_owed_work() {
let mut ledger = ObligationLedger::new();
let task = make_task();
let region = make_region();
let replicate = workflow_step(
"replicate",
"fabric.repair.replicate",
vec![
WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
},
WorkflowObligationRole::Timeout,
],
&[],
);
let mut saga = SagaState::new("repair", vec![replicate]).expect("valid saga");
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(11))
.expect("start step");
let encoded = serde_json::to_string(&saga).expect("serialize saga snapshot");
let snapshot: SagaState = serde_json::from_str(&encoded).expect("deserialize snapshot");
let mut recovered = snapshot
.recover_from_replay(&ledger, Time::from_nanos(19))
.expect("recover from snapshot");
assert_eq!(recovered.current_step, Some(0));
assert!(matches!(
recovered
.evidence_trail
.last()
.expect("recovery evidence")
.event,
SagaEvidenceEvent::Recovered
));
assert_eq!(
recovered
.what_is_still_owed(&ledger)
.expect("recovered owed work")
.len(),
2
);
recovered
.fail_current_step(
&mut ledger,
Time::from_nanos(25),
ServiceFailure::TimedOut,
task,
region,
)
.expect("abort by id during recovery");
assert_eq!(
recovered.steps[0].status,
WorkflowStepStatus::Failed {
failure: ServiceFailure::TimedOut,
}
);
assert!(
recovered
.what_is_still_owed(&ledger)
.expect("all recovered work should be resolved")
.is_empty()
);
assert_eq!(ledger.pending_count(), 0);
}
#[test]
fn workflow_what_is_still_owed_excludes_future_pending_steps() {
let mut ledger = ObligationLedger::new();
let task = make_task();
let region = make_region();
let ingest = workflow_step(
"ingest",
"fabric.ingest.start",
vec![WorkflowObligationRole::Lease {
resource: "shard-01".to_owned(),
}],
&[],
);
let compact = workflow_step(
"compact",
"fabric.ingest.compact",
vec![WorkflowObligationRole::Reply {
delivery_boundary: AckKind::Received,
receipt_required: true,
}],
&[],
);
let mut saga = SagaState::new("ingest-flow", vec![ingest, compact]).expect("valid saga");
saga.start_next_step(&mut ledger, task, region, Time::from_nanos(7))
.expect("start ingest");
let owed = saga
.what_is_still_owed(&ledger)
.expect("ingest obligation should be owed");
assert_eq!(owed.len(), 1);
assert_eq!(owed[0].step_id, "ingest");
assert_eq!(owed[0].state, ObligationState::Reserved);
let record = ledger
.get(owed[0].obligation_id)
.expect("obligation record must exist");
assert_eq!(record.state, ObligationState::Reserved);
saga.complete_current_step(&mut ledger, Time::from_nanos(9))
.expect("complete ingest");
assert!(
saga.what_is_still_owed(&ledger)
.expect("future pending step should stay absent")
.is_empty()
);
}
#[test]
fn quantitative_ratio_is_fail_closed_on_empty_window() {
assert_eq!(ratio(0, 0), 0.0);
assert_eq!(bool_ratio(&VecDeque::new()), 0.0);
}
}