use super::fabric::{CellEpoch, CellId, SubjectCell, SubjectPattern};
use super::jetstream::{AckPolicy, DeliverPolicy};
use crate::obligation::ledger::{LedgerStats, ObligationLedger, ObligationToken};
use crate::record::{ObligationAbortReason, ObligationKind, SourceLocation};
use crate::remote::NodeId;
use crate::types::{ObligationId, RegionId, TaskId, Time};
use crate::util::DetHasher;
use franken_decision::{
DecisionAuditEntry, DecisionContract, EvalContext, FallbackPolicy, LossMatrix, Posterior,
evaluate,
};
use franken_kernel::{DecisionId, TraceId};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::hash::{Hash, Hasher};
use std::panic::Location;
use std::time::Duration;
use thiserror::Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SequenceWindow {
start: u64,
end: u64,
}
impl SequenceWindow {
pub fn new(start: u64, end: u64) -> Result<Self, ConsumerCursorError> {
if start > end {
return Err(ConsumerCursorError::InvalidSequenceWindow { start, end });
}
Ok(Self { start, end })
}
#[must_use]
pub const fn start(self) -> u64 {
self.start
}
#[must_use]
pub const fn end(self) -> u64 {
self.end
}
#[must_use]
pub const fn contains_window(self, other: Self) -> bool {
self.start <= other.start && self.end >= other.end
}
#[must_use]
pub const fn contains_sequence(self, sequence: u64) -> bool {
self.start <= sequence && sequence <= self.end
}
}
impl fmt::Display for SequenceWindow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}..={}", self.start, self.end)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ConsumerDemandClass {
Tail,
CatchUp,
Replay,
}
impl ConsumerDemandClass {
#[must_use]
const fn priority_rank(self) -> u8 {
match self {
Self::Tail => 0,
Self::CatchUp => 1,
Self::Replay => 2,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CursorRequest {
Sequence(u64),
Window(SequenceWindow),
DemandClass(ConsumerDemandClass),
}
impl CursorRequest {
#[must_use]
fn requested_window(self) -> Option<SequenceWindow> {
match self {
Self::Sequence(sequence) => Some(SequenceWindow {
start: sequence,
end: sequence,
}),
Self::Window(window) => Some(window),
Self::DemandClass(_) => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CursorDeliveryMode {
Pull(CursorRequest),
Push {
window: SequenceWindow,
},
}
impl CursorDeliveryMode {
#[must_use]
fn requested_window(self) -> Option<SequenceWindow> {
match self {
Self::Pull(request) => request.requested_window(),
Self::Push { window } => Some(window),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CursorLeaseScope {
ControlCapsule,
DelegatedCursorPartition {
partition: u16,
},
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CursorLeaseHolder {
Steward(NodeId),
Relay(NodeId),
}
impl CursorLeaseHolder {
#[must_use]
pub fn node(&self) -> &NodeId {
match self {
Self::Steward(node) | Self::Relay(node) => node,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorAuthorityLease {
pub cell_id: CellId,
pub epoch: CellEpoch,
pub scope: CursorLeaseScope,
pub holder: CursorLeaseHolder,
pub lease_generation: u64,
pub policy_revision: u64,
}
impl CursorAuthorityLease {
pub fn from_subject_cell(cell: &SubjectCell) -> Result<Self, ConsumerCursorError> {
let Some(active) = cell.control_capsule.active_sequencer.clone() else {
return Err(ConsumerCursorError::NoActiveSequencer {
cell_id: cell.cell_id,
});
};
Ok(Self {
cell_id: cell.cell_id,
epoch: cell.epoch,
scope: CursorLeaseScope::ControlCapsule,
holder: CursorLeaseHolder::Steward(active),
lease_generation: cell.control_capsule.sequencer_lease_generation,
policy_revision: cell.control_capsule.policy_revision,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorLeaseRef {
pub scope: CursorLeaseScope,
pub holder: CursorLeaseHolder,
pub lease_generation: u64,
}
impl CursorLeaseRef {
#[must_use]
pub fn from_authority_lease(lease: &CursorAuthorityLease) -> Self {
Self {
scope: lease.scope,
holder: lease.holder.clone(),
lease_generation: lease.lease_generation,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorPartitionLease {
pub partition: u16,
pub leader: CursorLeaseHolder,
pub lease_generation: u64,
}
impl CursorPartitionLease {
#[must_use]
pub fn from_authority_lease(lease: &CursorAuthorityLease) -> Option<Self> {
match lease.scope {
CursorLeaseScope::ControlCapsule => None,
CursorLeaseScope::DelegatedCursorPartition { partition } => Some(Self {
partition,
leader: lease.holder.clone(),
lease_generation: lease.lease_generation,
}),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CursorPartitionSelector {
ConsumerGroup(String),
SubjectSubRange {
start: String,
end: String,
},
HashBucket {
bucket: u16,
buckets: u16,
},
}
impl CursorPartitionSelector {
fn validate(&self) -> Result<(), ConsumerCursorError> {
match self {
Self::ConsumerGroup(group) if group.trim().is_empty() => {
Err(ConsumerCursorError::EmptyCursorPartitionSelector {
field: "consumer_group",
})
}
Self::SubjectSubRange { start, end } if start.trim().is_empty() => {
Err(ConsumerCursorError::EmptyCursorPartitionSelector { field: "start" })
}
Self::SubjectSubRange { start: _, end } if end.trim().is_empty() => {
Err(ConsumerCursorError::EmptyCursorPartitionSelector { field: "end" })
}
Self::SubjectSubRange { start, end } if start > end => {
Err(ConsumerCursorError::InvalidCursorPartitionSubRange {
start: start.clone(),
end: end.clone(),
})
}
Self::HashBucket { buckets, .. } if *buckets == 0 => {
Err(ConsumerCursorError::InvalidCursorPartitionBucket {
bucket: 0,
buckets: *buckets,
})
}
Self::HashBucket { bucket, buckets } if *bucket >= *buckets => {
Err(ConsumerCursorError::InvalidCursorPartitionBucket {
bucket: *bucket,
buckets: *buckets,
})
}
Self::ConsumerGroup(_) | Self::SubjectSubRange { .. } | Self::HashBucket { .. } => {
Ok(())
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorPartitionAssignment {
pub partition: u16,
pub leader: CursorLeaseHolder,
pub selector: CursorPartitionSelector,
pub consumers: BTreeSet<String>,
}
impl CursorPartitionAssignment {
fn validate(&self) -> Result<(), ConsumerCursorError> {
self.selector.validate()?;
if self.consumers.is_empty() {
return Err(ConsumerCursorError::EmptyCursorPartitionConsumers {
partition: self.partition,
});
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CursorPartitionCheckpoint {
pub partition: u16,
pub lease_generation: u64,
pub ack_floor: u64,
pub delivered_through: u64,
pub pending_count: u64,
pub consumer_count: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorPartitionSummary {
pub partition: u16,
pub selector: CursorPartitionSelector,
pub leader: CursorLeaseHolder,
pub lease_generation: u64,
pub ack_floor: u64,
pub delivered_through: u64,
pub pending_count: u64,
pub consumer_count: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CacheabilityRule {
NoCache,
Private {
max_age_ticks: u64,
},
Shared {
max_age_ticks: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ReadDelegationRevocationHandle(u64);
impl ReadDelegationRevocationHandle {
#[must_use]
pub const fn raw(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ReadDelegationExpiry {
pub issued_at_tick: u64,
pub not_after_tick: u64,
}
impl ReadDelegationExpiry {
pub fn new(issued_at_tick: u64, ttl_ticks: u64) -> Result<Self, ConsumerCursorError> {
if ttl_ticks == 0 {
return Err(ConsumerCursorError::InvalidReadDelegationTtl { ttl_ticks });
}
Ok(Self {
issued_at_tick,
not_after_tick: issued_at_tick.saturating_add(ttl_ticks),
})
}
#[must_use]
fn is_expired(self, current_tick: u64) -> bool {
current_tick > self.not_after_tick
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadDelegationTicket {
pub cell_id: CellId,
pub epoch: CellEpoch,
pub cursor_lease_ref: CursorLeaseRef,
pub relay: NodeId,
pub segment_window: SequenceWindow,
pub expiry: ReadDelegationExpiry,
pub cacheability_rules: CacheabilityRule,
pub revocation_handle: ReadDelegationRevocationHandle,
}
impl ReadDelegationTicket {
pub fn new(
lease: &CursorAuthorityLease,
relay: NodeId,
segment_window: SequenceWindow,
issued_at_tick: u64,
ttl_ticks: u64,
cacheability_rules: CacheabilityRule,
revocation_handle: ReadDelegationRevocationHandle,
) -> Result<Self, ConsumerCursorError> {
Ok(Self {
cell_id: lease.cell_id,
epoch: lease.epoch,
cursor_lease_ref: CursorLeaseRef::from_authority_lease(lease),
relay,
segment_window,
expiry: ReadDelegationExpiry::new(issued_at_tick, ttl_ticks)?,
cacheability_rules,
revocation_handle,
})
}
fn validate(
&self,
lease: &CursorAuthorityLease,
relay: &NodeId,
window: SequenceWindow,
current_tick: u64,
revoked_tickets: &BTreeMap<ReadDelegationRevocationHandle, u64>,
) -> Result<(), ConsumerCursorError> {
if self.cell_id != lease.cell_id || self.epoch != lease.epoch {
return Err(ConsumerCursorError::StaleReadDelegationEpoch {
relay: relay.clone(),
ticket_cell: self.cell_id,
ticket_epoch: self.epoch,
current_cell: lease.cell_id,
current_epoch: lease.epoch,
});
}
if revoked_tickets.contains_key(&self.revocation_handle) {
return Err(ConsumerCursorError::RevokedReadDelegationTicket {
relay: relay.clone(),
revocation_handle: self.revocation_handle,
});
}
if self.expiry.is_expired(current_tick) {
return Err(ConsumerCursorError::ExpiredReadDelegationTicket {
relay: relay.clone(),
expired_at_tick: self.expiry.not_after_tick,
current_tick,
});
}
if self.cursor_lease_ref != CursorLeaseRef::from_authority_lease(lease)
|| &self.relay != relay
|| !self.segment_window.contains_window(window)
{
return Err(ConsumerCursorError::InvalidReadDelegationTicket {
relay: relay.clone(),
requested_window: window,
});
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttemptCertificate {
pub cell_id: CellId,
pub epoch: CellEpoch,
pub cursor_authority_lease: CursorAuthorityLease,
pub delivery_mode: CursorDeliveryMode,
pub delivery_attempt: u32,
pub obligation_id: ObligationId,
pub supersedes_obligation_id: Option<ObligationId>,
}
impl AttemptCertificate {
#[must_use]
pub fn cursor_partition_lease(&self) -> Option<CursorPartitionLease> {
CursorPartitionLease::from_authority_lease(&self.cursor_authority_lease)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct RecoverableCapsule {
coverage: BTreeMap<NodeId, Vec<SequenceWindow>>,
}
impl RecoverableCapsule {
#[must_use]
pub fn with_window(mut self, node: NodeId, window: SequenceWindow) -> Self {
self.insert_window(node, window);
self
}
pub fn insert_window(&mut self, node: NodeId, window: SequenceWindow) {
self.coverage.entry(node).or_default().push(window);
}
#[must_use]
fn node_covers(&self, node: &NodeId, window: SequenceWindow) -> bool {
self.coverage.get(node).is_some_and(|ranges| {
ranges
.iter()
.any(|candidate| candidate.contains_window(window))
})
}
#[must_use]
fn reconstruction_contributors(&self, window: SequenceWindow) -> Option<Vec<NodeId>> {
let mut current = window.start();
let mut contributors = Vec::new();
while current <= window.end() {
let mut best: Option<(u64, NodeId)> = None;
for (node, ranges) in &self.coverage {
for range in ranges {
if !range.contains_sequence(current) {
continue;
}
let candidate = (range.end(), node.clone());
if best.as_ref().is_none_or(|(best_end, best_node)| {
candidate.0 > *best_end
|| (candidate.0 == *best_end
&& candidate.1.as_str() < best_node.as_str())
}) {
best = Some(candidate);
}
}
}
let (best_end, best_node) = best?;
if contributors.last() != Some(&best_node) {
contributors.push(best_node);
}
if best_end >= window.end() {
break;
}
current = best_end.saturating_add(1);
}
Some(contributors)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeliveryPlan {
CurrentSteward(NodeId),
LeasedRelay {
relay: NodeId,
ticket: ReadDelegationTicket,
},
Reconstructed {
contributors: Vec<NodeId>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AckResolution {
Committed {
obligation_id: ObligationId,
against: CursorLeaseHolder,
},
StaleNoOp {
obligation_id: ObligationId,
current_generation: u64,
current_holder: CursorLeaseHolder,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorTransferProposal {
pub proposed_holder: CursorLeaseHolder,
pub proposed_scope: CursorLeaseScope,
pub expected_generation: u64,
pub transfer_obligation: ObligationId,
}
impl CursorTransferProposal {
#[must_use]
pub fn control_capsule(
proposed_holder: CursorLeaseHolder,
expected_generation: u64,
transfer_obligation: ObligationId,
) -> Self {
Self {
proposed_holder,
proposed_scope: CursorLeaseScope::ControlCapsule,
expected_generation,
transfer_obligation,
}
}
#[must_use]
pub fn delegated_partition(
proposed_holder: CursorLeaseHolder,
partition: u16,
expected_generation: u64,
transfer_obligation: ObligationId,
) -> Self {
Self {
proposed_holder,
proposed_scope: CursorLeaseScope::DelegatedCursorPartition { partition },
expected_generation,
transfer_obligation,
}
}
fn validate(&self) -> Result<(), ConsumerCursorError> {
match (&self.proposed_holder, self.proposed_scope) {
(CursorLeaseHolder::Relay(relay), CursorLeaseScope::ControlCapsule) => {
Err(ConsumerCursorError::RelayTransferRequiresPartition {
relay: relay.clone(),
})
}
_ => Ok(()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ContestedTransferResolution {
Accepted {
new_lease: CursorAuthorityLease,
winning_obligation: ObligationId,
},
StaleNoOp {
current_lease: CursorAuthorityLease,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FabricConsumerCursor {
steward_pool: Vec<NodeId>,
current_lease: CursorAuthorityLease,
partition_assignments: BTreeMap<u16, CursorPartitionAssignment>,
partition_summaries: BTreeMap<u16, CursorPartitionSummary>,
ticket_clock: u64,
next_revocation_handle: u64,
revoked_tickets: BTreeMap<ReadDelegationRevocationHandle, u64>,
}
impl FabricConsumerCursor {
pub fn new(cell: &SubjectCell) -> Result<Self, ConsumerCursorError> {
Ok(Self {
steward_pool: cell.control_capsule.steward_pool.clone(),
current_lease: CursorAuthorityLease::from_subject_cell(cell)?,
partition_assignments: BTreeMap::new(),
partition_summaries: BTreeMap::new(),
ticket_clock: 0,
next_revocation_handle: 1,
revoked_tickets: BTreeMap::new(),
})
}
#[must_use]
pub fn current_lease(&self) -> &CursorAuthorityLease {
&self.current_lease
}
#[must_use]
pub fn partition_assignment(&self, partition: u16) -> Option<&CursorPartitionAssignment> {
self.partition_assignments.get(&partition)
}
#[must_use]
pub fn partition_summary(&self, partition: u16) -> Option<&CursorPartitionSummary> {
self.partition_summaries.get(&partition)
}
#[must_use]
pub const fn ticket_clock(&self) -> u64 {
self.ticket_clock
}
pub fn advance_ticket_clock(&mut self, ticks: u64) -> u64 {
self.ticket_clock = self.ticket_clock.saturating_add(ticks);
self.ticket_clock
}
pub fn assign_partition(
&mut self,
assignment: CursorPartitionAssignment,
) -> Result<&CursorPartitionAssignment, ConsumerCursorError> {
assignment.validate()?;
let partition = assignment.partition;
self.partition_assignments.insert(partition, assignment);
self.partition_summaries.remove(&partition);
Ok(self
.partition_assignments
.get(&partition)
.expect("assignment inserted"))
}
pub fn issue_attempt(
&self,
delivery_mode: CursorDeliveryMode,
delivery_attempt: u32,
obligation_id: ObligationId,
) -> Result<AttemptCertificate, ConsumerCursorError> {
if delivery_attempt == 0 {
return Err(ConsumerCursorError::InvalidDeliveryAttempt);
}
Ok(AttemptCertificate {
cell_id: self.current_lease.cell_id,
epoch: self.current_lease.epoch,
cursor_authority_lease: self.current_lease.clone(),
delivery_mode,
delivery_attempt,
obligation_id,
supersedes_obligation_id: None,
})
}
pub fn grant_read_ticket(
&mut self,
relay: NodeId,
segment_window: SequenceWindow,
ttl_ticks: u64,
cacheability_rules: CacheabilityRule,
) -> Result<ReadDelegationTicket, ConsumerCursorError> {
if self.steward_pool.iter().any(|node| node == &relay) {
return Err(ConsumerCursorError::RelayMustNotBeSteward { relay });
}
let revocation_handle = ReadDelegationRevocationHandle(self.next_revocation_handle);
self.next_revocation_handle = self.next_revocation_handle.saturating_add(1);
ReadDelegationTicket::new(
&self.current_lease,
relay,
segment_window,
self.ticket_clock,
ttl_ticks,
cacheability_rules,
revocation_handle,
)
}
pub fn revoke_read_ticket(
&mut self,
handle: ReadDelegationRevocationHandle,
not_after_tick: u64,
) {
self.revoked_tickets.insert(handle, not_after_tick);
}
pub fn prune_expired_revocations(&mut self) {
let clock = self.ticket_clock;
self.revoked_tickets
.retain(|_, &mut expiry| clock <= expiry);
}
pub fn plan_delivery(
&self,
delivery_mode: CursorDeliveryMode,
capsule: &RecoverableCapsule,
ticket: Option<&ReadDelegationTicket>,
) -> Result<DeliveryPlan, ConsumerCursorError> {
let Some(window) = delivery_mode.requested_window() else {
return Ok(DeliveryPlan::CurrentSteward(
self.current_lease.holder.node().clone(),
));
};
match &self.current_lease.holder {
CursorLeaseHolder::Steward(node) if capsule.node_covers(node, window) => {
Ok(DeliveryPlan::CurrentSteward(node.clone()))
}
CursorLeaseHolder::Relay(node) => {
let Some(ticket) = ticket else {
return Err(ConsumerCursorError::MissingReadDelegationTicket {
relay: node.clone(),
});
};
ticket.validate(
&self.current_lease,
node,
window,
self.ticket_clock,
&self.revoked_tickets,
)?;
if capsule.node_covers(node, window) {
Ok(DeliveryPlan::LeasedRelay {
relay: node.clone(),
ticket: ticket.clone(),
})
} else {
capsule
.reconstruction_contributors(window)
.map(|contributors| DeliveryPlan::Reconstructed { contributors })
.ok_or(ConsumerCursorError::UnrecoverableWindow { window })
}
}
CursorLeaseHolder::Steward(_) => capsule
.reconstruction_contributors(window)
.map(|contributors| DeliveryPlan::Reconstructed { contributors })
.ok_or(ConsumerCursorError::UnrecoverableWindow { window }),
}
}
pub fn failover(
&mut self,
next_steward: NodeId,
) -> Result<&CursorAuthorityLease, ConsumerCursorError> {
if !self.steward_pool.iter().any(|node| node == &next_steward) {
return Err(ConsumerCursorError::UnknownSteward {
cell_id: self.current_lease.cell_id,
steward: next_steward,
});
}
self.current_lease.holder = CursorLeaseHolder::Steward(next_steward);
self.current_lease.lease_generation = self.current_lease.lease_generation.saturating_add(1);
self.current_lease.scope = CursorLeaseScope::ControlCapsule;
Ok(&self.current_lease)
}
pub fn resolve_contested_transfer(
&mut self,
proposals: &[CursorTransferProposal],
) -> Result<ContestedTransferResolution, ConsumerCursorError> {
let valid = proposals
.iter()
.map(|proposal| proposal.validate().map(|()| proposal))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.filter(|proposal| proposal.expected_generation == self.current_lease.lease_generation)
.filter_map(|proposal| {
self.transfer_rank(&proposal.proposed_holder)
.map(|rank| (rank, scope_rank(proposal.proposed_scope), proposal))
})
.min_by(|left, right| {
left.0
.cmp(&right.0)
.then_with(|| left.1.cmp(&right.1))
.then_with(|| left.2.transfer_obligation.cmp(&right.2.transfer_obligation))
});
let Some((_, _, winner)) = valid else {
return Ok(ContestedTransferResolution::StaleNoOp {
current_lease: self.current_lease.clone(),
});
};
if let CursorLeaseScope::DelegatedCursorPartition { partition } = self.current_lease.scope {
self.partition_summaries.remove(&partition);
}
if let CursorLeaseScope::DelegatedCursorPartition { partition } = winner.proposed_scope {
let Some(assignment) = self.partition_assignments.get_mut(&partition) else {
return Err(ConsumerCursorError::UnknownCursorPartition { partition });
};
assignment.leader = winner.proposed_holder.clone();
self.partition_summaries.remove(&partition);
}
self.current_lease.holder = winner.proposed_holder.clone();
self.current_lease.lease_generation = self.current_lease.lease_generation.saturating_add(1);
self.current_lease.scope = winner.proposed_scope;
Ok(ContestedTransferResolution::Accepted {
new_lease: self.current_lease.clone(),
winning_obligation: winner.transfer_obligation,
})
}
pub fn report_partition_checkpoint(
&mut self,
checkpoint: CursorPartitionCheckpoint,
) -> Result<&CursorPartitionSummary, ConsumerCursorError> {
let CursorLeaseScope::DelegatedCursorPartition { partition } = self.current_lease.scope
else {
return Err(
ConsumerCursorError::PartitionCheckpointRequiresDelegatedLease {
partition: checkpoint.partition,
current_scope: self.current_lease.scope,
},
);
};
if checkpoint.partition != partition {
return Err(
ConsumerCursorError::PartitionCheckpointRequiresDelegatedLease {
partition: checkpoint.partition,
current_scope: self.current_lease.scope,
},
);
}
if checkpoint.lease_generation != self.current_lease.lease_generation {
return Err(ConsumerCursorError::StaleCursorPartitionCheckpoint {
partition: checkpoint.partition,
report_generation: checkpoint.lease_generation,
current_generation: self.current_lease.lease_generation,
});
}
let Some(assignment) = self.partition_assignments.get(&partition) else {
return Err(ConsumerCursorError::UnknownCursorPartition { partition });
};
if usize::try_from(checkpoint.consumer_count).ok() != Some(assignment.consumers.len()) {
return Err(
ConsumerCursorError::PartitionCheckpointConsumerCountMismatch {
partition,
reported_consumer_count: checkpoint.consumer_count,
assigned_consumer_count: assignment.consumers.len(),
},
);
}
let summary = CursorPartitionSummary {
partition,
selector: assignment.selector.clone(),
leader: self.current_lease.holder.clone(),
lease_generation: checkpoint.lease_generation,
ack_floor: checkpoint.ack_floor,
delivered_through: checkpoint.delivered_through,
pending_count: checkpoint.pending_count,
consumer_count: checkpoint.consumer_count,
};
self.partition_summaries.insert(partition, summary);
Ok(self
.partition_summaries
.get(&partition)
.expect("partition summary inserted"))
}
pub fn rebalance_partition(
&mut self,
partition: u16,
next_leader: CursorLeaseHolder,
expected_generation: u64,
transfer_obligation: ObligationId,
) -> Result<ContestedTransferResolution, ConsumerCursorError> {
if !self.partition_assignments.contains_key(&partition) {
return Err(ConsumerCursorError::UnknownCursorPartition { partition });
}
self.resolve_contested_transfer(&[CursorTransferProposal::delegated_partition(
next_leader,
partition,
expected_generation,
transfer_obligation,
)])
}
pub fn acknowledge(
&self,
attempt: &AttemptCertificate,
) -> Result<AckResolution, ConsumerCursorError> {
if attempt.cell_id != self.current_lease.cell_id
|| attempt.epoch != self.current_lease.epoch
{
return Err(ConsumerCursorError::AttemptScopeMismatch {
certificate_cell: attempt.cell_id,
certificate_epoch: attempt.epoch,
current_cell: self.current_lease.cell_id,
current_epoch: self.current_lease.epoch,
});
}
if attempt.cursor_authority_lease.lease_generation == self.current_lease.lease_generation {
Ok(AckResolution::Committed {
obligation_id: attempt.obligation_id,
against: self.current_lease.holder.clone(),
})
} else {
Ok(AckResolution::StaleNoOp {
obligation_id: attempt.obligation_id,
current_generation: self.current_lease.lease_generation,
current_holder: self.current_lease.holder.clone(),
})
}
}
fn transfer_rank(&self, holder: &CursorLeaseHolder) -> Option<(u8, usize, String)> {
match holder {
CursorLeaseHolder::Steward(node) => self
.steward_pool
.iter()
.position(|candidate| candidate == node)
.map(|index| (0, index, node.as_str().to_owned())),
CursorLeaseHolder::Relay(node) => Some((1, usize::MAX, node.as_str().to_owned())),
}
}
}
fn scope_rank(scope: CursorLeaseScope) -> (u8, u16) {
match scope {
CursorLeaseScope::ControlCapsule => (0, 0),
CursorLeaseScope::DelegatedCursorPartition { partition } => (1, partition),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum ConsumerReplayPolicy {
#[default]
Instant,
Original,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum ConsumerDispatchMode {
#[default]
Push,
Pull,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FabricConsumerConfig {
pub durable_name: Option<String>,
pub filter_subject: Option<SubjectPattern>,
pub ack_policy: AckPolicy,
pub max_deliver: u16,
pub max_ack_pending: usize,
pub max_waiting: usize,
pub ack_wait: Duration,
pub replay_policy: ConsumerReplayPolicy,
pub deliver_policy: DeliverPolicy,
pub flow_control: bool,
pub adaptive_kernel: AdaptiveConsumerKernel,
pub overflow_policy: ConsumerOverflowPolicy,
pub heartbeat: Option<Duration>,
pub idle_heartbeat: Option<Duration>,
}
impl Default for FabricConsumerConfig {
fn default() -> Self {
Self {
durable_name: None,
filter_subject: None,
ack_policy: AckPolicy::Explicit,
max_deliver: 1,
max_ack_pending: 256,
max_waiting: 64,
ack_wait: Duration::from_secs(30),
replay_policy: ConsumerReplayPolicy::Instant,
deliver_policy: DeliverPolicy::All,
flow_control: false,
adaptive_kernel: AdaptiveConsumerKernel::Stable,
overflow_policy: ConsumerOverflowPolicy::RejectNew,
heartbeat: None,
idle_heartbeat: None,
}
}
}
impl FabricConsumerConfig {
pub fn validate(&self) -> Result<(), FabricConsumerError> {
if self.max_deliver == 0 {
return Err(FabricConsumerError::InvalidMaxDeliver);
}
if self.max_ack_pending == 0 {
return Err(FabricConsumerError::InvalidMaxAckPending);
}
if self.max_waiting == 0 {
return Err(FabricConsumerError::InvalidMaxWaiting);
}
if self.ack_wait.is_zero() {
return Err(FabricConsumerError::InvalidAckWait);
}
if self.heartbeat.is_some_and(|duration| duration.is_zero()) {
return Err(FabricConsumerError::InvalidHeartbeat { field: "heartbeat" });
}
if self
.idle_heartbeat
.is_some_and(|duration| duration.is_zero())
{
return Err(FabricConsumerError::InvalidHeartbeat {
field: "idle_heartbeat",
});
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum AdaptiveConsumerKernel {
#[default]
Stable,
AuditBacked,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
pub enum ConsumerOverflowPolicy {
#[default]
RejectNew,
ReplaceLowestPriority,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FabricConsumerDeliveryPolicy {
pub mode: ConsumerDispatchMode,
pub paused: bool,
}
impl Default for FabricConsumerDeliveryPolicy {
fn default() -> Self {
Self {
mode: ConsumerDispatchMode::Push,
paused: false,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FabricConsumerOwner {
pub holder: TaskId,
pub region: RegionId,
}
impl Default for FabricConsumerOwner {
fn default() -> Self {
Self {
holder: TaskId::new_ephemeral(),
region: RegionId::new_ephemeral(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PullRequest {
pub batch_size: u32,
pub demand_class: ConsumerDemandClass,
pub max_bytes: Option<u32>,
pub expires: Option<u64>,
pub no_wait: bool,
pub pinned_client: Option<NodeId>,
}
impl PullRequest {
pub fn new(
batch_size: u32,
demand_class: ConsumerDemandClass,
) -> Result<Self, FabricConsumerError> {
if batch_size == 0 {
return Err(FabricConsumerError::InvalidPullBatchSize);
}
Ok(Self {
batch_size,
demand_class,
max_bytes: None,
expires: None,
no_wait: false,
pinned_client: None,
})
}
#[must_use]
pub fn with_max_bytes(mut self, max_bytes: u32) -> Self {
self.max_bytes = Some(max_bytes);
self
}
#[must_use]
pub fn with_expires(mut self, ticks: u64) -> Self {
self.expires = Some(ticks);
self
}
#[must_use]
pub fn with_no_wait(mut self) -> Self {
self.no_wait = true;
self
}
#[must_use]
pub fn with_pinned_client(mut self, client: NodeId) -> Self {
self.pinned_client = Some(client);
self
}
fn effective_batch_size(&self) -> Result<u64, FabricConsumerError> {
if self.max_bytes == Some(0) {
return Err(FabricConsumerError::InvalidPullMaxBytes);
}
if self.expires == Some(0) {
return Err(FabricConsumerError::InvalidPullExpiry);
}
let batch_size = u64::from(self.batch_size);
let byte_bound = self.max_bytes.map_or(batch_size, u64::from);
Ok(batch_size.min(byte_bound).max(1))
}
fn is_expired(&self, enqueued_at_tick: u64, current_tick: u64) -> bool {
self.expires
.is_some_and(|ttl| current_tick > enqueued_at_tick.saturating_add(ttl))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct QueuedPullRequest {
request: PullRequest,
enqueued_at_tick: u64,
enqueue_order: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PendingAckState {
pub request: ScheduledConsumerRequest,
pub window: SequenceWindow,
pub delivery_mode: CursorDeliveryMode,
pub delivery_attempt: u32,
pub supersedes_obligation_id: Option<ObligationId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct FabricConsumerState {
pub delivered_count: u64,
pub pending_count: u64,
pub ack_floor: u64,
pub highest_dispatched: u64,
pub pending_acks: BTreeMap<ObligationId, PendingAckState>,
next_delivery_attempt: u32,
}
impl FabricConsumerState {
fn next_attempt(&mut self) -> u32 {
self.next_delivery_attempt = self.next_delivery_attempt.saturating_add(1).max(1);
self.next_delivery_attempt
}
fn advance_ack_floor(&mut self, candidate: u64) {
if self.pending_acks.is_empty() {
self.ack_floor = self.ack_floor.max(candidate);
return;
}
let min_pending_start = self
.pending_acks
.values()
.map(|p| p.window.start())
.filter(|&s| s > 0) .min()
.unwrap_or(u64::MAX);
let safe_floor = candidate.min(min_pending_start.saturating_sub(1));
self.ack_floor = self.ack_floor.max(safe_floor);
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScheduledConsumerRequest {
Push(SequenceWindow),
Pull(PullRequest),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScheduledConsumerDelivery {
pub request: ScheduledConsumerRequest,
pub window: SequenceWindow,
pub attempt: AttemptCertificate,
pub plan: DeliveryPlan,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PullDispatchOutcome {
Scheduled(Box<ScheduledConsumerDelivery>),
Waiting(PullRequest),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsumerNackReason {
Explicit,
Error,
Cancel,
}
impl ConsumerNackReason {
const fn abort_reason(self) -> ObligationAbortReason {
match self {
Self::Explicit => ObligationAbortReason::Explicit,
Self::Error => ObligationAbortReason::Error,
Self::Cancel => ObligationAbortReason::Cancel,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NackResolution {
Aborted {
obligation_id: ObligationId,
window: SequenceWindow,
reason: ConsumerNackReason,
},
StaleNoOp {
obligation_id: ObligationId,
current_generation: u64,
current_holder: CursorLeaseHolder,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeadLetterTransfer {
pub obligation_id: ObligationId,
pub window: SequenceWindow,
pub delivery_attempt: u32,
pub reason: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ConsumerDecisionKind {
PullScheduling,
Overflow,
Redelivery,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ConsumerRedeliveryAction {
RetryNow,
Delay,
DeadLetter,
}
impl ConsumerRedeliveryAction {
#[must_use]
const fn label(self) -> &'static str {
match self {
Self::RetryNow => "retry_now",
Self::Delay => "delay",
Self::DeadLetter => "dead_letter",
}
}
}
#[derive(Debug, Clone)]
pub struct ConsumerDecisionRecord {
pub kind: ConsumerDecisionKind,
pub action_name: String,
pub demand_class: Option<ConsumerDemandClass>,
pub obligation_id: Option<ObligationId>,
pub pinned_client: Option<NodeId>,
pub audit: DecisionAuditEntry,
}
#[derive(Debug)]
pub struct FabricConsumer {
cursor: FabricConsumerCursor,
config: FabricConsumerConfig,
owner: FabricConsumerOwner,
ledger: ObligationLedger,
policy: FabricConsumerDeliveryPolicy,
state: FabricConsumerState,
pending_ack_tokens: BTreeMap<ObligationId, ObligationToken>,
decision_log: Vec<ConsumerDecisionRecord>,
decision_log_capacity: usize,
next_event_nanos: u64,
next_pull_enqueue_order: u64,
waiting_pull_requests: Vec<QueuedPullRequest>,
}
impl FabricConsumer {
pub fn new(
cell: &SubjectCell,
config: FabricConsumerConfig,
) -> Result<Self, FabricConsumerError> {
Self::new_owned(cell, config, FabricConsumerOwner::default())
}
pub fn new_owned(
cell: &SubjectCell,
config: FabricConsumerConfig,
owner: FabricConsumerOwner,
) -> Result<Self, FabricConsumerError> {
config.validate()?;
Ok(Self {
cursor: FabricConsumerCursor::new(cell)?,
config,
owner,
ledger: ObligationLedger::new(),
policy: FabricConsumerDeliveryPolicy::default(),
state: FabricConsumerState::default(),
pending_ack_tokens: BTreeMap::new(),
decision_log: Vec::new(),
decision_log_capacity: 4096,
next_event_nanos: 0,
next_pull_enqueue_order: 0,
waiting_pull_requests: Vec::new(),
})
}
#[must_use]
pub fn config(&self) -> &FabricConsumerConfig {
&self.config
}
#[must_use]
pub fn policy(&self) -> &FabricConsumerDeliveryPolicy {
&self.policy
}
#[must_use]
pub fn state(&self) -> &FabricConsumerState {
&self.state
}
#[must_use]
pub const fn owner(&self) -> FabricConsumerOwner {
self.owner
}
#[must_use]
pub fn obligation_stats(&self) -> LedgerStats {
self.ledger.stats()
}
#[must_use]
pub fn decision_log(&self) -> &[ConsumerDecisionRecord] {
&self.decision_log
}
fn push_decision(&mut self, record: ConsumerDecisionRecord) {
if self.decision_log.len() >= self.decision_log_capacity {
self.decision_log.remove(0);
}
self.decision_log.push(record);
}
#[must_use]
pub fn waiting_pull_request_count(&self) -> usize {
self.waiting_pull_requests.len()
}
#[must_use]
pub fn current_lease(&self) -> &CursorAuthorityLease {
self.cursor.current_lease()
}
pub fn advance_clock(&mut self, ticks: u64) -> u64 {
self.cursor.advance_ticket_clock(ticks)
}
pub fn switch_mode(&mut self, mode: ConsumerDispatchMode) {
self.policy.mode = mode;
if mode == ConsumerDispatchMode::Push {
self.waiting_pull_requests.clear();
}
}
pub fn pause(&mut self) -> Result<(), FabricConsumerError> {
if !self.config.flow_control {
return Err(FabricConsumerError::FlowControlDisabled);
}
self.policy.paused = true;
Ok(())
}
pub fn resume(&mut self) {
self.policy.paused = false;
}
pub fn queue_pull_request(&mut self, request: PullRequest) -> Result<(), FabricConsumerError> {
if self.policy.mode != ConsumerDispatchMode::Pull {
return Err(FabricConsumerError::PullModeRequired);
}
let _ = request.effective_batch_size()?;
if self.waiting_pull_requests.len() >= self.config.max_waiting {
if !self.resolve_pull_overflow(&request) {
return Err(FabricConsumerError::MaxWaitingExceeded {
limit: self.config.max_waiting,
});
}
}
let enqueued_at_tick = self.cursor.ticket_clock();
let enqueue_order = self.allocate_pull_enqueue_order();
self.insert_pull_request(QueuedPullRequest {
request,
enqueued_at_tick,
enqueue_order,
});
Ok(())
}
#[track_caller]
pub fn dispatch_push(
&mut self,
window: SequenceWindow,
capsule: &RecoverableCapsule,
ticket: Option<&ReadDelegationTicket>,
) -> Result<ScheduledConsumerDelivery, FabricConsumerError> {
if self.policy.mode != ConsumerDispatchMode::Push {
return Err(FabricConsumerError::PushModeRequired);
}
let delivery_mode = CursorDeliveryMode::Push { window };
self.schedule_delivery(
ScheduledConsumerRequest::Push(window),
delivery_mode,
window,
capsule,
ticket,
None,
)
}
#[track_caller]
pub fn dispatch_next_pull(
&mut self,
available_tail: u64,
capsule: &RecoverableCapsule,
ticket: Option<&ReadDelegationTicket>,
) -> Result<PullDispatchOutcome, FabricConsumerError> {
if self.policy.mode != ConsumerDispatchMode::Pull {
return Err(FabricConsumerError::PullModeRequired);
}
let Some(queued) = self.pop_next_live_pull_request() else {
return Err(FabricConsumerError::NoQueuedPullRequests);
};
let request = queued.request.clone();
let Some(window) = self.resolve_pull_window(&request, available_tail)? else {
if request.no_wait {
return Err(FabricConsumerError::NoDataAvailable {
demand_class: request.demand_class,
available_tail,
});
}
self.insert_pull_request(queued);
return Ok(PullDispatchOutcome::Waiting(request));
};
if let Some(pinned_client) = &request.pinned_client
&& let Some(ticket) = ticket
&& &ticket.relay != pinned_client
{
self.insert_pull_request(queued);
return Err(FabricConsumerError::PinnedClientTicketMismatch {
pinned_client: pinned_client.clone(),
ticket_relay: ticket.relay.clone(),
});
}
let scheduled_request = ScheduledConsumerRequest::Pull(request);
match self.schedule_delivery(
scheduled_request,
CursorDeliveryMode::Pull(CursorRequest::Window(window)),
window,
capsule,
ticket,
None,
) {
Ok(delivery) => Ok(PullDispatchOutcome::Scheduled(Box::new(delivery))),
Err(err) => {
self.insert_pull_request(queued);
Err(err)
}
}
}
pub fn acknowledge_delivery(
&mut self,
attempt: &AttemptCertificate,
) -> Result<AckResolution, FabricConsumerError> {
if !self.state.pending_acks.contains_key(&attempt.obligation_id) {
return Ok(self.stale_attempt_noop(attempt.obligation_id));
}
let resolution = self.cursor.acknowledge(attempt)?;
match resolution {
AckResolution::Committed { .. } => {
if let Some(pending) = self.state.pending_acks.remove(&attempt.obligation_id) {
let token = self
.pending_ack_tokens
.remove(&attempt.obligation_id)
.ok_or(FabricConsumerError::MissingPendingAckToken {
obligation_id: attempt.obligation_id,
})?;
let resolved_at = self.next_event_time();
self.ledger.commit(token, resolved_at);
self.state.pending_count = self
.state
.pending_count
.saturating_sub(window_len(pending.window));
self.state.advance_ack_floor(pending.window.end());
}
}
AckResolution::StaleNoOp { .. } => {
if let Some(pending) = self.state.pending_acks.remove(&attempt.obligation_id) {
let token = self
.pending_ack_tokens
.remove(&attempt.obligation_id)
.ok_or(FabricConsumerError::MissingPendingAckToken {
obligation_id: attempt.obligation_id,
})?;
let aborted_at = self.next_event_time();
self.ledger
.abort(token, aborted_at, ObligationAbortReason::Cancel);
self.state.pending_count = self
.state
.pending_count
.saturating_sub(window_len(pending.window));
}
}
}
Ok(resolution)
}
pub fn nack_delivery(
&mut self,
attempt: &AttemptCertificate,
reason: ConsumerNackReason,
) -> Result<NackResolution, FabricConsumerError> {
let Some(pending) = self.state.pending_acks.remove(&attempt.obligation_id) else {
return Ok(self.stale_nack_noop(attempt.obligation_id));
};
let token = self
.pending_ack_tokens
.remove(&attempt.obligation_id)
.ok_or(FabricConsumerError::MissingPendingAckToken {
obligation_id: attempt.obligation_id,
})?;
let aborted_at = self.next_event_time();
self.ledger.abort(token, aborted_at, reason.abort_reason());
self.state.pending_count = self
.state
.pending_count
.saturating_sub(window_len(pending.window));
Ok(NackResolution::Aborted {
obligation_id: attempt.obligation_id,
window: pending.window,
reason,
})
}
#[track_caller]
pub fn redeliver_delivery(
&mut self,
attempt: &AttemptCertificate,
capsule: &RecoverableCapsule,
ticket: Option<&ReadDelegationTicket>,
) -> Result<ScheduledConsumerDelivery, FabricConsumerError> {
let Some(pending) = self.state.pending_acks.get(&attempt.obligation_id).cloned() else {
return Err(FabricConsumerError::PendingAckNotFound {
obligation_id: attempt.obligation_id,
});
};
let _ = self
.cursor
.plan_delivery(pending.delivery_mode, capsule, ticket)?;
if self.policy.paused {
return Err(FabricConsumerError::ConsumerPaused);
}
let (redelivery_action, decision_record) =
self.decide_redelivery_action(&pending, attempt.obligation_id);
if let Some(record) = decision_record {
self.push_decision(record);
}
match redelivery_action {
ConsumerRedeliveryAction::RetryNow => {}
ConsumerRedeliveryAction::Delay => {
return Err(FabricConsumerError::RedeliveryDeferred {
obligation_id: attempt.obligation_id,
delivery_attempt: pending.delivery_attempt.saturating_add(1),
});
}
ConsumerRedeliveryAction::DeadLetter => {
return Err(FabricConsumerError::RedeliveryRequiresDeadLetter {
obligation_id: attempt.obligation_id,
delivery_attempt: pending.delivery_attempt.saturating_add(1),
});
}
}
let removed = self
.state
.pending_acks
.remove(&attempt.obligation_id)
.expect("pending state must still exist after preflight");
let token = self
.pending_ack_tokens
.remove(&attempt.obligation_id)
.ok_or(FabricConsumerError::MissingPendingAckToken {
obligation_id: attempt.obligation_id,
})?;
let aborted_at = self.next_event_time();
self.ledger
.abort(token, aborted_at, ObligationAbortReason::Explicit);
self.state.pending_count = self
.state
.pending_count
.saturating_sub(window_len(removed.window));
self.schedule_delivery(
removed.request,
removed.delivery_mode,
removed.window,
capsule,
ticket,
Some(attempt.obligation_id),
)
}
pub fn dead_letter_delivery(
&mut self,
attempt: &AttemptCertificate,
reason: impl Into<String>,
) -> Result<DeadLetterTransfer, FabricConsumerError> {
let reason = reason.into();
if reason.trim().is_empty() {
return Err(FabricConsumerError::EmptyDeadLetterReason);
}
let pending = self
.state
.pending_acks
.remove(&attempt.obligation_id)
.ok_or(FabricConsumerError::PendingAckNotFound {
obligation_id: attempt.obligation_id,
})?;
let token = self
.pending_ack_tokens
.remove(&attempt.obligation_id)
.ok_or(FabricConsumerError::MissingPendingAckToken {
obligation_id: attempt.obligation_id,
})?;
let dead_lettered_at = self.next_event_time();
self.ledger
.abort(token, dead_lettered_at, ObligationAbortReason::Error);
self.state.pending_count = self
.state
.pending_count
.saturating_sub(window_len(pending.window));
Ok(DeadLetterTransfer {
obligation_id: attempt.obligation_id,
window: pending.window,
delivery_attempt: pending.delivery_attempt,
reason,
})
}
fn pop_next_live_pull_request(&mut self) -> Option<QueuedPullRequest> {
let current_tick = self.cursor.ticket_clock();
while !self.waiting_pull_requests.is_empty() {
let queued = self.waiting_pull_requests.remove(0);
if !queued
.request
.is_expired(queued.enqueued_at_tick, current_tick)
{
return Some(queued);
}
}
None
}
fn allocate_pull_enqueue_order(&mut self) -> u64 {
let order = self.next_pull_enqueue_order;
self.next_pull_enqueue_order = self.next_pull_enqueue_order.saturating_add(1);
order
}
fn insert_pull_request(&mut self, queued: QueuedPullRequest) {
let insert_at = self
.waiting_pull_requests
.iter()
.position(|existing| {
queued.request.demand_class.priority_rank()
< existing.request.demand_class.priority_rank()
|| (queued.request.demand_class.priority_rank()
== existing.request.demand_class.priority_rank()
&& queued.enqueue_order < existing.enqueue_order)
})
.unwrap_or(self.waiting_pull_requests.len());
self.waiting_pull_requests.insert(insert_at, queued);
}
fn resolve_pull_overflow(&mut self, request: &PullRequest) -> bool {
let Some(worst_index) = self
.waiting_pull_requests
.iter()
.enumerate()
.max_by_key(|(_, queued)| {
(
queued.request.demand_class.priority_rank(),
queued.enqueue_order,
)
})
.map(|(index, _)| index)
else {
return true;
};
let incoming_rank = request.demand_class.priority_rank();
let evicted = self.waiting_pull_requests[worst_index].clone();
let mut replaced = self.config.overflow_policy
== ConsumerOverflowPolicy::ReplaceLowestPriority
&& incoming_rank < evicted.request.demand_class.priority_rank();
if self.config.adaptive_kernel == AdaptiveConsumerKernel::AuditBacked {
let snapshot = ConsumerOverflowDecisionSnapshot {
incoming_demand: request.demand_class,
evicted_demand: evicted.request.demand_class,
replaced,
};
let action = if replaced {
ConsumerOverflowDecisionAction::ReplaceLowestPriority
} else {
ConsumerOverflowDecisionAction::RejectNew
};
let contract = ConsumerOverflowDecisionContract::new(action);
let posterior = snapshot.posterior();
let ctx = self.decision_context(
&snapshot,
snapshot.calibration_score(),
snapshot.e_process(),
snapshot.ci_width(),
);
let outcome = evaluate(&contract, &posterior, &ctx);
replaced = outcome.action_name
== ConsumerOverflowDecisionAction::ReplaceLowestPriority.label();
self.push_decision(ConsumerDecisionRecord {
kind: ConsumerDecisionKind::Overflow,
action_name: outcome.action_name,
demand_class: Some(request.demand_class),
obligation_id: None,
pinned_client: request.pinned_client.clone(),
audit: outcome.audit_entry,
});
}
if replaced {
self.waiting_pull_requests.remove(worst_index);
}
replaced
}
fn resolve_pull_window(
&self,
request: &PullRequest,
available_tail: u64,
) -> Result<Option<SequenceWindow>, FabricConsumerError> {
let batch = request.effective_batch_size()?;
if available_tail == 0 {
return Ok(None);
}
let next_unacked = self.state.ack_floor.saturating_add(1).max(1);
let resolve = match request.demand_class {
ConsumerDemandClass::Tail => {
if next_unacked > available_tail {
None
} else {
let start = available_tail
.saturating_sub(batch.saturating_sub(1))
.max(next_unacked);
Some((start, available_tail))
}
}
ConsumerDemandClass::CatchUp => {
if next_unacked > available_tail {
None
} else {
Some((
next_unacked,
available_tail.min(next_unacked.saturating_add(batch).saturating_sub(1)),
))
}
}
ConsumerDemandClass::Replay => {
let start = self.replay_start_sequence(available_tail);
if start > available_tail {
None
} else {
Some((
start,
available_tail.min(start.saturating_add(batch).saturating_sub(1)),
))
}
}
};
match resolve {
Some((start, end)) => Ok(Some(SequenceWindow::new(start, end)?)),
None => Ok(None),
}
}
fn replay_start_sequence(&self, available_tail: u64) -> u64 {
match self.config.deliver_policy {
DeliverPolicy::All => 1,
DeliverPolicy::New => available_tail.saturating_add(1),
DeliverPolicy::ByStartSequence(sequence) => sequence.max(1),
DeliverPolicy::Last | DeliverPolicy::LastPerSubject => available_tail.max(1),
}
}
fn next_event_time(&mut self) -> Time {
let now = Time::from_nanos(self.next_event_nanos);
self.next_event_nanos = self.next_event_nanos.saturating_add(1);
now
}
fn decision_context<T: Hash>(
&mut self,
seed: &T,
calibration_score: f64,
e_process: f64,
ci_width: f64,
) -> EvalContext {
let when = self.next_event_time();
let mut hasher = DetHasher::default();
self.owner.holder.hash(&mut hasher);
self.owner.region.hash(&mut hasher);
self.cursor
.current_lease()
.lease_generation
.hash(&mut hasher);
self.state.pending_count.hash(&mut hasher);
seed.hash(&mut hasher);
let fingerprint = u128::from(hasher.finish());
let ts_unix_ms = when.as_nanos();
EvalContext {
calibration_score,
e_process,
ci_width,
decision_id: DecisionId::from_parts(ts_unix_ms, fingerprint),
trace_id: TraceId::from_parts(ts_unix_ms, fingerprint ^ 0xC0DE_C011_5EED_5100),
ts_unix_ms,
}
}
fn stale_attempt_noop(&self, obligation_id: ObligationId) -> AckResolution {
AckResolution::StaleNoOp {
obligation_id,
current_generation: self.cursor.current_lease().lease_generation,
current_holder: self.cursor.current_lease().holder.clone(),
}
}
fn stale_nack_noop(&self, obligation_id: ObligationId) -> NackResolution {
NackResolution::StaleNoOp {
obligation_id,
current_generation: self.cursor.current_lease().lease_generation,
current_holder: self.cursor.current_lease().holder.clone(),
}
}
#[track_caller]
fn acquire_ack_token(
&mut self,
window: SequenceWindow,
delivery_attempt: u32,
supersedes_obligation_id: Option<ObligationId>,
) -> ObligationToken {
let description = supersedes_obligation_id.map_or_else(
|| {
format!(
"consumer ack attempt {} for window {}-{}",
delivery_attempt,
window.start(),
window.end()
)
},
|previous| {
format!(
"consumer ack attempt {} for window {}-{} superseding {:?}",
delivery_attempt,
window.start(),
window.end(),
previous
)
},
);
let acquired_at = self.next_event_time();
self.ledger.acquire_with_context(
ObligationKind::Ack,
self.owner.holder,
self.owner.region,
acquired_at,
SourceLocation::from_panic_location(Location::caller()),
None,
Some(description),
)
}
#[track_caller]
fn schedule_delivery(
&mut self,
request: ScheduledConsumerRequest,
delivery_mode: CursorDeliveryMode,
window: SequenceWindow,
capsule: &RecoverableCapsule,
ticket: Option<&ReadDelegationTicket>,
supersedes_obligation_id: Option<ObligationId>,
) -> Result<ScheduledConsumerDelivery, FabricConsumerError> {
if self.policy.paused {
return Err(FabricConsumerError::ConsumerPaused);
}
let window_messages = window_len(window);
if self.state.pending_count.saturating_add(window_messages)
> self.config.max_ack_pending as u64
{
return Err(FabricConsumerError::MaxAckPendingExceeded {
limit: self.config.max_ack_pending,
pending: self.state.pending_count,
});
}
let plan = self.cursor.plan_delivery(delivery_mode, capsule, ticket)?;
let delivery_attempt = self.state.next_attempt();
let token = self.acquire_ack_token(window, delivery_attempt, supersedes_obligation_id);
let obligation_id = token.id();
let mut attempt =
self.cursor
.issue_attempt(delivery_mode, delivery_attempt, obligation_id)?;
attempt.supersedes_obligation_id = supersedes_obligation_id;
self.state.delivered_count = self.state.delivered_count.saturating_add(window_messages);
self.state.pending_count = self.state.pending_count.saturating_add(window_messages);
self.state.highest_dispatched = self.state.highest_dispatched.max(window.end());
self.state.pending_acks.insert(
obligation_id,
PendingAckState {
request: request.clone(),
window,
delivery_mode,
delivery_attempt,
supersedes_obligation_id,
},
);
self.pending_ack_tokens.insert(obligation_id, token);
if let ScheduledConsumerRequest::Pull(pull_request) = &request
&& let Some(record) = self.make_pull_decision_record(pull_request, &plan, obligation_id)
{
self.push_decision(record);
}
Ok(ScheduledConsumerDelivery {
request,
window,
attempt,
plan,
})
}
fn make_pull_decision_record(
&mut self,
request: &PullRequest,
plan: &DeliveryPlan,
obligation_id: ObligationId,
) -> Option<ConsumerDecisionRecord> {
if self.config.adaptive_kernel != AdaptiveConsumerKernel::AuditBacked {
return None;
}
let snapshot = ConsumerPullDecisionSnapshot {
demand_class: request.demand_class,
pinned_requested: request.pinned_client.is_some(),
pending_ratio_permille: pending_ratio_permille(
self.state.pending_count,
self.config.max_ack_pending,
),
};
let chosen_action = ConsumerPullDecisionAction::from_plan(plan, request);
let contract = ConsumerPullDecisionContract::new(chosen_action);
let posterior = snapshot.posterior();
let ctx = self.decision_context(
&snapshot,
snapshot.calibration_score(),
snapshot.e_process(),
snapshot.ci_width(),
);
let outcome = evaluate(&contract, &posterior, &ctx);
Some(ConsumerDecisionRecord {
kind: ConsumerDecisionKind::PullScheduling,
action_name: outcome.action_name,
demand_class: Some(request.demand_class),
obligation_id: Some(obligation_id),
pinned_client: request.pinned_client.clone(),
audit: outcome.audit_entry,
})
}
fn decide_redelivery_action(
&mut self,
pending: &PendingAckState,
obligation_id: ObligationId,
) -> (ConsumerRedeliveryAction, Option<ConsumerDecisionRecord>) {
let next_attempt = pending.delivery_attempt.saturating_add(1);
let pending_ratio =
pending_ratio_permille(self.state.pending_count, self.config.max_ack_pending);
let action = if next_attempt > u32::from(self.config.max_deliver) {
ConsumerRedeliveryAction::DeadLetter
} else if pending_ratio >= 850 && next_attempt > 1 {
ConsumerRedeliveryAction::Delay
} else {
ConsumerRedeliveryAction::RetryNow
};
if self.config.adaptive_kernel != AdaptiveConsumerKernel::AuditBacked {
return (action, None);
}
let snapshot = ConsumerRedeliveryDecisionSnapshot {
next_attempt,
max_deliver: self.config.max_deliver,
pending_ratio_permille: pending_ratio,
};
let contract = ConsumerRedeliveryDecisionContract::new(action);
let posterior = snapshot.posterior();
let ctx = self.decision_context(
&snapshot,
snapshot.calibration_score(),
snapshot.e_process(),
snapshot.ci_width(),
);
let outcome = evaluate(&contract, &posterior, &ctx);
(
action,
Some(ConsumerDecisionRecord {
kind: ConsumerDecisionKind::Redelivery,
action_name: outcome.action_name,
demand_class: None,
obligation_id: Some(obligation_id),
pinned_client: None,
audit: outcome.audit_entry,
}),
)
}
}
fn window_len(window: SequenceWindow) -> u64 {
window
.end()
.saturating_sub(window.start())
.saturating_add(1)
}
fn pending_ratio_permille(pending_count: u64, max_ack_pending: usize) -> u16 {
let limit = max_ack_pending.max(1) as u64;
let ratio = pending_count
.saturating_mul(1000)
.checked_div(limit)
.unwrap_or(0)
.min(1000);
ratio as u16
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum ConsumerPullDecisionAction {
CurrentSteward,
LeasedRelay,
Reconstructed,
}
impl ConsumerPullDecisionAction {
fn from_plan(plan: &DeliveryPlan, request: &PullRequest) -> Self {
match plan {
DeliveryPlan::CurrentSteward(_) => {
let _ = request;
Self::CurrentSteward
}
DeliveryPlan::LeasedRelay { .. } => Self::LeasedRelay,
DeliveryPlan::Reconstructed { .. } => Self::Reconstructed,
}
}
const fn label(self) -> &'static str {
match self {
Self::CurrentSteward => "current_steward",
Self::LeasedRelay => "leased_relay",
Self::Reconstructed => "reconstructed",
}
}
const fn index(self) -> usize {
match self {
Self::CurrentSteward => 0,
Self::LeasedRelay => 1,
Self::Reconstructed => 2,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct ConsumerPullDecisionSnapshot {
demand_class: ConsumerDemandClass,
pinned_requested: bool,
pending_ratio_permille: u16,
}
impl ConsumerPullDecisionSnapshot {
fn posterior(self) -> Posterior {
let backpressure = f64::from(self.pending_ratio_permille) / 1000.0;
let mut weights = [0.05; 4];
let state_index = match self.demand_class {
ConsumerDemandClass::Tail => 0,
ConsumerDemandClass::CatchUp => 1,
ConsumerDemandClass::Replay => 2,
};
weights[state_index] = 0.72 - (backpressure * 0.2);
weights[3] = 0.08 + (backpressure * 0.55);
if self.pinned_requested {
weights[1] += 0.08;
}
normalize_posterior(weights)
}
fn calibration_score(self) -> f64 {
if self.pending_ratio_permille >= 850 {
0.74
} else {
0.93
}
}
fn e_process(self) -> f64 {
1.0 + f64::from(self.pending_ratio_permille) / 650.0
}
fn ci_width(self) -> f64 {
0.08 + f64::from(self.pending_ratio_permille) / 3000.0
}
}
#[derive(Debug, Clone)]
struct ConsumerPullDecisionContract {
states: Vec<String>,
actions: Vec<String>,
losses: LossMatrix,
chosen_action: ConsumerPullDecisionAction,
fallback: FallbackPolicy,
}
impl ConsumerPullDecisionContract {
fn new(chosen_action: ConsumerPullDecisionAction) -> Self {
let states = vec![
"tail_priority".into(),
"catchup_priority".into(),
"replay_priority".into(),
"backpressured".into(),
];
let actions = vec![
ConsumerPullDecisionAction::CurrentSteward.label().into(),
ConsumerPullDecisionAction::LeasedRelay.label().into(),
ConsumerPullDecisionAction::Reconstructed.label().into(),
];
let losses = LossMatrix::new(
states.clone(),
actions.clone(),
vec![
1.0, 2.0, 7.0, 4.0, 2.0, 5.0, 6.0, 4.0, 1.0, 8.0, 5.0, 3.0, ],
)
.expect("consumer pull decision losses should be valid");
Self {
states,
actions,
losses,
chosen_action,
fallback: FallbackPolicy::default(),
}
}
}
impl DecisionContract for ConsumerPullDecisionContract {
fn name(&self) -> &'static str {
"fabric_consumer_pull_scheduler"
}
fn state_space(&self) -> &[String] {
&self.states
}
fn action_set(&self) -> &[String] {
&self.actions
}
fn loss_matrix(&self) -> &LossMatrix {
&self.losses
}
fn update_posterior(&self, posterior: &mut Posterior, observation: usize) {
let mut likelihoods = [0.1; 4];
if let Some(slot) = likelihoods.get_mut(observation) {
*slot = 0.9;
}
posterior.bayesian_update(&likelihoods);
}
fn choose_action(&self, _posterior: &Posterior) -> usize {
self.chosen_action.index()
}
fn fallback_action(&self) -> usize {
self.chosen_action.index()
}
fn fallback_policy(&self) -> &FallbackPolicy {
&self.fallback
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum ConsumerOverflowDecisionAction {
RejectNew,
ReplaceLowestPriority,
}
impl ConsumerOverflowDecisionAction {
const fn label(self) -> &'static str {
match self {
Self::RejectNew => "reject_new",
Self::ReplaceLowestPriority => "replace_low_priority",
}
}
const fn index(self) -> usize {
match self {
Self::RejectNew => 0,
Self::ReplaceLowestPriority => 1,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct ConsumerOverflowDecisionSnapshot {
incoming_demand: ConsumerDemandClass,
evicted_demand: ConsumerDemandClass,
replaced: bool,
}
impl ConsumerOverflowDecisionSnapshot {
fn posterior(self) -> Posterior {
let mut weights = [0.05; 4];
weights[self.incoming_demand.priority_rank() as usize] = 0.68;
weights[3] = if self.replaced { 0.12 } else { 0.34 };
normalize_posterior(weights)
}
fn calibration_score(self) -> f64 {
if self.replaced { 0.91 } else { 0.79 }
}
fn e_process(self) -> f64 {
1.6 + f64::from(self.evicted_demand.priority_rank())
}
fn ci_width(self) -> f64 {
if self.replaced { 0.15 } else { 0.31 }
}
}
#[derive(Debug, Clone)]
struct ConsumerOverflowDecisionContract {
states: Vec<String>,
actions: Vec<String>,
losses: LossMatrix,
chosen_action: ConsumerOverflowDecisionAction,
fallback: FallbackPolicy,
}
impl ConsumerOverflowDecisionContract {
fn new(chosen_action: ConsumerOverflowDecisionAction) -> Self {
let states = vec![
"tail_pressure".into(),
"catchup_pressure".into(),
"replay_pressure".into(),
"queue_saturated".into(),
];
let actions = vec![
ConsumerOverflowDecisionAction::RejectNew.label().into(),
ConsumerOverflowDecisionAction::ReplaceLowestPriority
.label()
.into(),
];
let losses = LossMatrix::new(
states.clone(),
actions.clone(),
vec![
9.0, 1.0, 5.0, 3.0, 1.0, 8.0, 4.0, 2.0, ],
)
.expect("consumer overflow decision losses should be valid");
Self {
states,
actions,
losses,
chosen_action,
fallback: FallbackPolicy::default(),
}
}
}
impl DecisionContract for ConsumerOverflowDecisionContract {
fn name(&self) -> &'static str {
"fabric_consumer_overflow_policy"
}
fn state_space(&self) -> &[String] {
&self.states
}
fn action_set(&self) -> &[String] {
&self.actions
}
fn loss_matrix(&self) -> &LossMatrix {
&self.losses
}
fn update_posterior(&self, posterior: &mut Posterior, observation: usize) {
let mut likelihoods = [0.1; 4];
if let Some(slot) = likelihoods.get_mut(observation) {
*slot = 0.9;
}
posterior.bayesian_update(&likelihoods);
}
fn choose_action(&self, _posterior: &Posterior) -> usize {
self.chosen_action.index()
}
fn fallback_action(&self) -> usize {
self.chosen_action.index()
}
fn fallback_policy(&self) -> &FallbackPolicy {
&self.fallback
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct ConsumerRedeliveryDecisionSnapshot {
next_attempt: u32,
max_deliver: u16,
pending_ratio_permille: u16,
}
impl ConsumerRedeliveryDecisionSnapshot {
fn posterior(self) -> Posterior {
let exhausted = self.next_attempt > u32::from(self.max_deliver);
let pressured = self.pending_ratio_permille >= 850;
let weights = if exhausted {
[0.05, 0.1, 0.85]
} else if pressured {
[0.18, 0.67, 0.15]
} else {
[0.82, 0.12, 0.06]
};
normalize_posterior(weights)
}
fn calibration_score(self) -> f64 {
if self.next_attempt > u32::from(self.max_deliver) {
0.88
} else if self.pending_ratio_permille >= 850 {
0.77
} else {
0.94
}
}
fn e_process(self) -> f64 {
1.0 + f64::from(self.next_attempt) / 3.0 + f64::from(self.pending_ratio_permille) / 900.0
}
fn ci_width(self) -> f64 {
0.09 + f64::from(self.pending_ratio_permille) / 4000.0
}
}
#[derive(Debug, Clone)]
struct ConsumerRedeliveryDecisionContract {
states: Vec<String>,
actions: Vec<String>,
losses: LossMatrix,
chosen_action: ConsumerRedeliveryAction,
fallback: FallbackPolicy,
}
impl ConsumerRedeliveryDecisionContract {
fn new(chosen_action: ConsumerRedeliveryAction) -> Self {
let states = vec![
"transient_failure".into(),
"pressure".into(),
"exhausted".into(),
];
let actions = vec![
ConsumerRedeliveryAction::RetryNow.label().into(),
ConsumerRedeliveryAction::Delay.label().into(),
ConsumerRedeliveryAction::DeadLetter.label().into(),
];
let losses = LossMatrix::new(
states.clone(),
actions.clone(),
vec![
1.0, 4.0, 12.0, 7.0, 2.0, 5.0, 18.0, 6.0, 1.0, ],
)
.expect("consumer redelivery decision losses should be valid");
Self {
states,
actions,
losses,
chosen_action,
fallback: FallbackPolicy::default(),
}
}
}
impl DecisionContract for ConsumerRedeliveryDecisionContract {
fn name(&self) -> &'static str {
"fabric_consumer_redelivery_policy"
}
fn state_space(&self) -> &[String] {
&self.states
}
fn action_set(&self) -> &[String] {
&self.actions
}
fn loss_matrix(&self) -> &LossMatrix {
&self.losses
}
fn update_posterior(&self, posterior: &mut Posterior, observation: usize) {
let mut likelihoods = [0.1; 3];
if let Some(slot) = likelihoods.get_mut(observation) {
*slot = 0.9;
}
posterior.bayesian_update(&likelihoods);
}
fn choose_action(&self, _posterior: &Posterior) -> usize {
match self.chosen_action {
ConsumerRedeliveryAction::RetryNow => 0,
ConsumerRedeliveryAction::Delay => 1,
ConsumerRedeliveryAction::DeadLetter => 2,
}
}
fn fallback_action(&self) -> usize {
match self.chosen_action {
ConsumerRedeliveryAction::RetryNow => 0,
ConsumerRedeliveryAction::Delay => 1,
ConsumerRedeliveryAction::DeadLetter => 2,
}
}
fn fallback_policy(&self) -> &FallbackPolicy {
&self.fallback
}
}
fn normalize_posterior<const N: usize>(mut weights: [f64; N]) -> Posterior {
for weight in &mut weights {
if *weight <= 0.0 {
*weight = 0.01;
}
}
let total = weights.iter().sum::<f64>().max(f64::EPSILON);
Posterior::new(weights.into_iter().map(|weight| weight / total).collect())
.expect("consumer decision posterior should normalize")
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum FabricConsumerError {
#[error("consumer max_deliver must be greater than zero")]
InvalidMaxDeliver,
#[error("consumer max_ack_pending must be greater than zero")]
InvalidMaxAckPending,
#[error("consumer max_waiting must be greater than zero")]
InvalidMaxWaiting,
#[error("consumer ack_wait must be greater than zero")]
InvalidAckWait,
#[error("consumer {field} must be greater than zero when configured")]
InvalidHeartbeat {
field: &'static str,
},
#[error("pull request batch_size must be greater than zero")]
InvalidPullBatchSize,
#[error("pull request max_bytes must be greater than zero when configured")]
InvalidPullMaxBytes,
#[error("pull request expires must be greater than zero when configured")]
InvalidPullExpiry,
#[error("consumer is not in push mode")]
PushModeRequired,
#[error("consumer is not in pull mode")]
PullModeRequired,
#[error("consumer flow control is disabled")]
FlowControlDisabled,
#[error("consumer dispatch is paused")]
ConsumerPaused,
#[error("consumer pending messages `{pending}` exceed or meet max_ack_pending `{limit}`")]
MaxAckPendingExceeded {
limit: usize,
pending: u64,
},
#[error("consumer already has max_waiting `{limit}` queued pull requests")]
MaxWaitingExceeded {
limit: usize,
},
#[error("consumer has no queued pull requests")]
NoQueuedPullRequests,
#[error(
"no data available for pull request class `{demand_class:?}` at tail `{available_tail}`"
)]
NoDataAvailable {
demand_class: ConsumerDemandClass,
available_tail: u64,
},
#[error(
"pinned client `{pinned_client}` does not match supplied ticket relay `{ticket_relay}`"
)]
PinnedClientTicketMismatch {
pinned_client: NodeId,
ticket_relay: NodeId,
},
#[error("consumer obligation `{obligation_id}` is not pending")]
PendingAckNotFound {
obligation_id: ObligationId,
},
#[error("consumer obligation `{obligation_id}` is pending but its ledger token is missing")]
MissingPendingAckToken {
obligation_id: ObligationId,
},
#[error("dead-letter reason must not be empty")]
EmptyDeadLetterReason,
#[error(
"consumer deferred redelivery for obligation `{obligation_id}` at attempt `{delivery_attempt}`"
)]
RedeliveryDeferred {
obligation_id: ObligationId,
delivery_attempt: u32,
},
#[error(
"consumer requires dead-letter handling for obligation `{obligation_id}` at attempt `{delivery_attempt}`"
)]
RedeliveryRequiresDeadLetter {
obligation_id: ObligationId,
delivery_attempt: u32,
},
#[error(transparent)]
Cursor(#[from] ConsumerCursorError),
}
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum ConsumerCursorError {
#[error("invalid sequence window `{start}..={end}`")]
InvalidSequenceWindow {
start: u64,
end: u64,
},
#[error("subject cell `{cell_id}` has no active sequencer")]
NoActiveSequencer {
cell_id: CellId,
},
#[error("delivery attempt must be greater than zero")]
InvalidDeliveryAttempt,
#[error("read-delegation ticket ttl must be greater than zero, got `{ttl_ticks}`")]
InvalidReadDelegationTtl {
ttl_ticks: u64,
},
#[error("steward `{steward}` is not in the steward pool for cell `{cell_id}`")]
UnknownSteward {
cell_id: CellId,
steward: NodeId,
},
#[error("relay `{relay}` is already a steward and does not need a read ticket")]
RelayMustNotBeSteward {
relay: NodeId,
},
#[error("relay `{relay}` transfer must target a delegated cursor partition")]
RelayTransferRequiresPartition {
relay: NodeId,
},
#[error("cursor partition selector field `{field}` must not be empty")]
EmptyCursorPartitionSelector {
field: &'static str,
},
#[error("cursor partition subject sub-range `{start}`..=`{end}` is invalid")]
InvalidCursorPartitionSubRange {
start: String,
end: String,
},
#[error("cursor partition bucket `{bucket}` is invalid for bucket set size `{buckets}`")]
InvalidCursorPartitionBucket {
bucket: u16,
buckets: u16,
},
#[error("cursor partition `{partition}` must own at least one consumer")]
EmptyCursorPartitionConsumers {
partition: u16,
},
#[error("cursor partition `{partition}` is unknown to the current cursor state")]
UnknownCursorPartition {
partition: u16,
},
#[error(
"cursor partition `{partition}` checkpoint does not match the current delegated lease scope `{current_scope:?}`"
)]
PartitionCheckpointRequiresDelegatedLease {
partition: u16,
current_scope: CursorLeaseScope,
},
#[error(
"cursor partition `{partition}` checkpoint generation `{report_generation}` is stale; current generation is `{current_generation}`"
)]
StaleCursorPartitionCheckpoint {
partition: u16,
report_generation: u64,
current_generation: u64,
},
#[error(
"cursor partition `{partition}` reported consumer_count `{reported_consumer_count}` but assignment owns `{assigned_consumer_count}` consumers"
)]
PartitionCheckpointConsumerCountMismatch {
partition: u16,
reported_consumer_count: u32,
assigned_consumer_count: usize,
},
#[error("relay `{relay}` is missing a read-delegation ticket")]
MissingReadDelegationTicket {
relay: NodeId,
},
#[error(
"read-delegation ticket for relay `{relay}` is stale for `{ticket_cell}`@{ticket_epoch:?}; current lease is `{current_cell}`@{current_epoch:?}"
)]
StaleReadDelegationEpoch {
relay: NodeId,
ticket_cell: CellId,
ticket_epoch: CellEpoch,
current_cell: CellId,
current_epoch: CellEpoch,
},
#[error(
"read-delegation ticket for relay `{relay}` expired at tick `{expired_at_tick}` (current `{current_tick}`)"
)]
ExpiredReadDelegationTicket {
relay: NodeId,
expired_at_tick: u64,
current_tick: u64,
},
#[error(
"read-delegation ticket for relay `{relay}` was revoked via handle `{revocation_handle:?}`"
)]
RevokedReadDelegationTicket {
relay: NodeId,
revocation_handle: ReadDelegationRevocationHandle,
},
#[error(
"read-delegation ticket for relay `{relay}` does not match the current lease/window `{requested_window}`"
)]
InvalidReadDelegationTicket {
relay: NodeId,
requested_window: SequenceWindow,
},
#[error("requested delivery window `{window}` is not recoverable from the capsule")]
UnrecoverableWindow {
window: SequenceWindow,
},
#[error("attempt certificate scope does not match the current cursor lease")]
AttemptScopeMismatch {
certificate_cell: CellId,
certificate_epoch: CellEpoch,
current_cell: CellId,
current_epoch: CellEpoch,
},
}
#[derive(Debug)]
pub enum ConsumerCall {
Pull {
available_tail: u64,
capsule: RecoverableCapsule,
ticket: Option<ReadDelegationTicket>,
},
Ack {
attempt: AttemptCertificate,
},
Nack {
attempt: AttemptCertificate,
reason: ConsumerNackReason,
},
State,
}
#[derive(Debug)]
pub enum ConsumerReply {
Pull(Result<PullDispatchOutcome, FabricConsumerError>),
Ack(Result<AckResolution, FabricConsumerError>),
Nack(Result<NackResolution, FabricConsumerError>),
State(FabricConsumerState),
}
#[derive(Debug)]
pub enum ConsumerCast {
Pause,
Resume,
}
#[derive(Debug)]
pub enum ConsumerInfo {
Heartbeat,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsumerActorLifecycle {
Running,
Paused,
Stopping,
}
#[derive(Debug)]
pub struct ConsumerActor {
consumer: FabricConsumer,
lifecycle: ConsumerActorLifecycle,
}
impl ConsumerActor {
#[must_use]
pub fn new(consumer: FabricConsumer) -> Self {
Self {
consumer,
lifecycle: ConsumerActorLifecycle::Running,
}
}
#[must_use]
pub fn lifecycle(&self) -> ConsumerActorLifecycle {
self.lifecycle
}
#[must_use]
pub fn consumer(&self) -> &FabricConsumer {
&self.consumer
}
pub fn consumer_mut(&mut self) -> &mut FabricConsumer {
&mut self.consumer
}
}
impl crate::gen_server::GenServer for ConsumerActor {
type Call = ConsumerCall;
type Reply = ConsumerReply;
type Cast = ConsumerCast;
type Info = ConsumerInfo;
fn handle_call(
&mut self,
_cx: &crate::cx::Cx,
request: Self::Call,
reply: crate::gen_server::Reply<Self::Reply>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
Box::pin(async move {
match request {
ConsumerCall::Pull {
available_tail,
capsule,
ticket,
} => {
if self.lifecycle == ConsumerActorLifecycle::Paused {
reply.send(ConsumerReply::Pull(Err(
FabricConsumerError::ConsumerPaused,
)));
} else {
let result = self.consumer.dispatch_next_pull(
available_tail,
&capsule,
ticket.as_ref(),
);
reply.send(ConsumerReply::Pull(result));
}
}
ConsumerCall::Ack { attempt } => {
let result = self.consumer.acknowledge_delivery(&attempt);
reply.send(ConsumerReply::Ack(result));
}
ConsumerCall::Nack { attempt, reason } => {
let result = self.consumer.nack_delivery(&attempt, reason);
reply.send(ConsumerReply::Nack(result));
}
ConsumerCall::State => {
reply.send(ConsumerReply::State(self.consumer.state().clone()));
}
}
})
}
fn handle_cast(
&mut self,
cx: &crate::cx::Cx,
msg: Self::Cast,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
let cx = cx.clone();
Box::pin(async move {
let name = self
.consumer
.config()
.durable_name
.clone()
.unwrap_or_else(|| "anonymous".to_owned());
match msg {
ConsumerCast::Pause => {
if self.lifecycle == ConsumerActorLifecycle::Running {
let _ = self.consumer.pause();
self.lifecycle = ConsumerActorLifecycle::Paused;
cx.trace_with_fields(
"fabric.consumer_actor.pause",
&[("event", "consumer_pause"), ("consumer", name.as_str())],
);
}
}
ConsumerCast::Resume => {
if self.lifecycle == ConsumerActorLifecycle::Paused {
self.consumer.resume();
self.lifecycle = ConsumerActorLifecycle::Running;
cx.trace_with_fields(
"fabric.consumer_actor.resume",
&[("event", "consumer_resume"), ("consumer", name.as_str())],
);
}
}
}
})
}
fn handle_info(
&mut self,
cx: &crate::cx::Cx,
msg: Self::Info,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
let cx = cx.clone();
Box::pin(async move {
let name = self
.consumer
.config()
.durable_name
.clone()
.unwrap_or_else(|| "anonymous".to_owned());
match msg {
ConsumerInfo::Heartbeat => {
cx.trace_with_fields(
"fabric.consumer_actor.heartbeat",
&[("event", "consumer_heartbeat"), ("consumer", name.as_str())],
);
}
}
})
}
fn on_start(
&mut self,
cx: &crate::cx::Cx,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
let cx = cx.clone();
Box::pin(async move {
let name = self
.consumer
.config()
.durable_name
.clone()
.unwrap_or_else(|| "anonymous".to_owned());
cx.trace_with_fields(
"fabric.consumer_actor.start",
&[
("event", "consumer_actor_start"),
("consumer", name.as_str()),
],
);
})
}
fn on_stop(
&mut self,
cx: &crate::cx::Cx,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
let cx = cx.clone();
Box::pin(async move {
self.lifecycle = ConsumerActorLifecycle::Stopping;
let name = self
.consumer
.config()
.durable_name
.clone()
.unwrap_or_else(|| "anonymous".to_owned());
cx.trace_with_fields(
"fabric.consumer_actor.stop",
&[
("event", "consumer_actor_stop"),
("consumer", name.as_str()),
],
);
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::messaging::fabric::{
CellTemperature, DataCapsule, NodeRole, PlacementPolicy, RepairPolicy, StewardCandidate,
StorageClass, SubjectPattern,
};
fn candidate(name: &str, domain: &str) -> StewardCandidate {
StewardCandidate::new(NodeId::new(name), domain)
.with_role(NodeRole::Steward)
.with_role(NodeRole::RepairWitness)
.with_storage_class(StorageClass::Durable)
}
fn test_cell() -> SubjectCell {
SubjectCell::new(
&SubjectPattern::parse("orders.created").expect("pattern"),
CellEpoch::new(7, 11),
&[
candidate("node-a", "rack-a"),
candidate("node-b", "rack-b"),
candidate("node-c", "rack-c"),
],
&PlacementPolicy {
cold_stewards: 3,
warm_stewards: 3,
hot_stewards: 3,
..PlacementPolicy::default()
},
RepairPolicy::default(),
DataCapsule {
temperature: CellTemperature::Warm,
retained_message_blocks: 4,
},
)
.expect("cell")
}
fn obligation(index: u32) -> ObligationId {
ObligationId::new_for_test(index, 0)
}
fn partition_consumers(partition: u16) -> BTreeSet<String> {
[
format!("consumer-{partition}-a"),
format!("consumer-{partition}-b"),
]
.into_iter()
.collect()
}
fn partition_assignment(
partition: u16,
selector: CursorPartitionSelector,
) -> CursorPartitionAssignment {
CursorPartitionAssignment {
partition,
leader: CursorLeaseHolder::Steward(NodeId::new("node-a")),
selector,
consumers: partition_consumers(partition),
}
}
fn delegate_partition_to_holder(
cursor: &mut FabricConsumerCursor,
partition: u16,
holder: CursorLeaseHolder,
transfer_obligation: ObligationId,
) -> ContestedTransferResolution {
if cursor.partition_assignment(partition).is_none() {
cursor
.assign_partition(partition_assignment(
partition,
CursorPartitionSelector::ConsumerGroup(format!("group-{partition}")),
))
.expect("assign partition");
}
cursor
.resolve_contested_transfer(&[CursorTransferProposal::delegated_partition(
holder,
partition,
cursor.current_lease().lease_generation,
transfer_obligation,
)])
.expect("delegate partition")
}
#[test]
fn cursor_lease_starts_from_the_control_capsule() {
let cell = test_cell();
let cursor = FabricConsumerCursor::new(&cell).expect("cursor");
assert_eq!(cursor.current_lease().cell_id, cell.cell_id);
assert_eq!(cursor.current_lease().epoch, cell.epoch);
assert_eq!(
cursor.current_lease().holder,
CursorLeaseHolder::Steward(NodeId::new("node-a"))
);
assert_eq!(
cursor.current_lease().lease_generation,
cell.control_capsule.sequencer_lease_generation
);
}
#[test]
fn delivery_attempts_commit_against_the_current_lease_holder() {
let cell = test_cell();
let cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(10, 12).expect("window");
let attempt = cursor
.issue_attempt(CursorDeliveryMode::Push { window }, 1, obligation(10))
.expect("attempt");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
assert_eq!(
cursor.plan_delivery(attempt.delivery_mode, &capsule, None),
Ok(DeliveryPlan::CurrentSteward(NodeId::new("node-a")))
);
assert_eq!(
cursor.acknowledge(&attempt),
Ok(AckResolution::Committed {
obligation_id: obligation(10),
against: CursorLeaseHolder::Steward(NodeId::new("node-a")),
})
);
}
#[test]
fn pull_demand_class_attempts_preserve_the_named_request() {
let cell = test_cell();
let cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let attempt = cursor
.issue_attempt(
CursorDeliveryMode::Pull(CursorRequest::DemandClass(ConsumerDemandClass::Tail)),
2,
obligation(11),
)
.expect("attempt");
assert_eq!(
attempt.delivery_mode,
CursorDeliveryMode::Pull(CursorRequest::DemandClass(ConsumerDemandClass::Tail))
);
assert_eq!(
cursor.plan_delivery(attempt.delivery_mode, &RecoverableCapsule::default(), None),
Ok(DeliveryPlan::CurrentSteward(NodeId::new("node-a")))
);
}
#[test]
fn failover_bumps_generation_and_turns_old_acks_into_stale_noops() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(20, 20).expect("window");
let first_attempt = cursor
.issue_attempt(
CursorDeliveryMode::Pull(CursorRequest::Window(window)),
1,
obligation(12),
)
.expect("attempt");
cursor.failover(NodeId::new("node-b")).expect("failover");
assert_eq!(
cursor.acknowledge(&first_attempt),
Ok(AckResolution::StaleNoOp {
obligation_id: obligation(12),
current_generation: cell.control_capsule.sequencer_lease_generation + 1,
current_holder: CursorLeaseHolder::Steward(NodeId::new("node-b")),
})
);
let second_attempt = cursor
.issue_attempt(
CursorDeliveryMode::Pull(CursorRequest::Sequence(20)),
2,
obligation(13),
)
.expect("attempt");
assert_eq!(
cursor.acknowledge(&second_attempt),
Ok(AckResolution::Committed {
obligation_id: obligation(13),
against: CursorLeaseHolder::Steward(NodeId::new("node-b")),
})
);
}
#[test]
fn relay_delivery_requires_a_matching_read_ticket() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(30, 35).expect("window");
let resolution = delegate_partition_to_holder(
&mut cursor,
7,
CursorLeaseHolder::Relay(NodeId::new("relay-1")),
obligation(14),
);
assert!(matches!(
resolution,
ContestedTransferResolution::Accepted { .. }
));
let ticket = cursor
.grant_read_ticket(
NodeId::new("relay-1"),
window,
4,
CacheabilityRule::Private { max_age_ticks: 2 },
)
.expect("ticket");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("relay-1"), window);
assert_eq!(
ticket.cursor_lease_ref.lease_generation,
cursor.current_lease().lease_generation
);
assert_eq!(ticket.segment_window, window);
assert_eq!(
ticket.cacheability_rules,
CacheabilityRule::Private { max_age_ticks: 2 }
);
assert_eq!(ticket.expiry.issued_at_tick, 0);
assert_eq!(ticket.expiry.not_after_tick, 4);
assert_eq!(
cursor.plan_delivery(CursorDeliveryMode::Push { window }, &capsule, Some(&ticket)),
Ok(DeliveryPlan::LeasedRelay {
relay: NodeId::new("relay-1"),
ticket,
})
);
}
#[test]
fn relay_delivery_rejects_missing_ticket_when_authority_is_delegated() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(36, 38).expect("window");
let relay = NodeId::new("relay-2");
delegate_partition_to_holder(
&mut cursor,
8,
CursorLeaseHolder::Relay(relay.clone()),
obligation(15),
);
let capsule = RecoverableCapsule::default().with_window(relay.clone(), window);
assert_eq!(
cursor.plan_delivery(CursorDeliveryMode::Push { window }, &capsule, None),
Err(ConsumerCursorError::MissingReadDelegationTicket { relay })
);
}
#[test]
fn read_delegation_ticket_expiry_is_enforced() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(46, 49).expect("window");
let relay = NodeId::new("relay-expiring");
delegate_partition_to_holder(
&mut cursor,
9,
CursorLeaseHolder::Relay(relay.clone()),
obligation(16),
);
let ticket = cursor
.grant_read_ticket(relay.clone(), window, 1, CacheabilityRule::NoCache)
.expect("ticket");
cursor.advance_ticket_clock(2);
let capsule = RecoverableCapsule::default().with_window(relay.clone(), window);
assert_eq!(
cursor.plan_delivery(CursorDeliveryMode::Push { window }, &capsule, Some(&ticket)),
Err(ConsumerCursorError::ExpiredReadDelegationTicket {
relay,
expired_at_tick: 1,
current_tick: 2,
})
);
}
#[test]
fn read_delegation_ticket_revocation_is_enforced() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(50, 52).expect("window");
let relay = NodeId::new("relay-revoked");
delegate_partition_to_holder(
&mut cursor,
10,
CursorLeaseHolder::Relay(relay.clone()),
obligation(17),
);
let ticket = cursor
.grant_read_ticket(
relay.clone(),
window,
5,
CacheabilityRule::Shared { max_age_ticks: 1 },
)
.expect("ticket");
cursor.revoke_read_ticket(ticket.revocation_handle, ticket.expiry.not_after_tick);
let capsule = RecoverableCapsule::default().with_window(relay.clone(), window);
assert_eq!(
cursor.plan_delivery(CursorDeliveryMode::Push { window }, &capsule, Some(&ticket)),
Err(ConsumerCursorError::RevokedReadDelegationTicket {
relay,
revocation_handle: ticket.revocation_handle,
})
);
}
#[test]
fn prune_expired_revocations_removes_stale_entries() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(50, 55).expect("window");
let relay = NodeId::new("relay-prune");
delegate_partition_to_holder(
&mut cursor,
10,
CursorLeaseHolder::Relay(relay.clone()),
obligation(30),
);
let ticket = cursor
.grant_read_ticket(
relay,
window,
5, CacheabilityRule::Private { max_age_ticks: 1 },
)
.expect("ticket");
cursor.revoke_read_ticket(ticket.revocation_handle, ticket.expiry.not_after_tick);
assert_eq!(cursor.revoked_tickets.len(), 1);
cursor.ticket_clock = 6;
cursor.prune_expired_revocations();
assert_eq!(
cursor.revoked_tickets.len(),
0,
"expired revocation entries must be pruned"
);
}
#[test]
fn stale_epoch_read_delegation_ticket_is_rejected() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(60, 63).expect("window");
let relay = NodeId::new("relay-stale");
delegate_partition_to_holder(
&mut cursor,
11,
CursorLeaseHolder::Relay(relay.clone()),
obligation(18),
);
let mut ticket = cursor
.grant_read_ticket(relay.clone(), window, 5, CacheabilityRule::NoCache)
.expect("ticket");
ticket.epoch = CellEpoch::new(6, 99);
let capsule = RecoverableCapsule::default().with_window(relay.clone(), window);
assert_eq!(
cursor.plan_delivery(CursorDeliveryMode::Push { window }, &capsule, Some(&ticket)),
Err(ConsumerCursorError::StaleReadDelegationEpoch {
relay,
ticket_cell: cell.cell_id,
ticket_epoch: CellEpoch::new(6, 99),
current_cell: cell.cell_id,
current_epoch: cell.epoch,
})
);
}
#[test]
fn partition_assignments_preserve_selector_strategies_and_consumers() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let by_group = partition_assignment(
20,
CursorPartitionSelector::ConsumerGroup("orders-tail".to_owned()),
);
let by_range = partition_assignment(
21,
CursorPartitionSelector::SubjectSubRange {
start: "orders.a".to_owned(),
end: "orders.m".to_owned(),
},
);
let by_bucket = partition_assignment(
22,
CursorPartitionSelector::HashBucket {
bucket: 1,
buckets: 4,
},
);
cursor
.assign_partition(by_group.clone())
.expect("assign group partition");
cursor
.assign_partition(by_range.clone())
.expect("assign range partition");
cursor
.assign_partition(by_bucket.clone())
.expect("assign bucket partition");
assert_eq!(cursor.partition_assignment(20), Some(&by_group));
assert_eq!(cursor.partition_assignment(21), Some(&by_range));
assert_eq!(cursor.partition_assignment(22), Some(&by_bucket));
assert_eq!(
cursor
.partition_assignment(21)
.map(|entry| entry.consumers.len()),
Some(2)
);
}
#[test]
fn delegated_attempts_expose_partition_lease_metadata() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let relay = NodeId::new("relay-partition");
let resolution = delegate_partition_to_holder(
&mut cursor,
23,
CursorLeaseHolder::Relay(relay.clone()),
obligation(181),
);
assert!(matches!(
resolution,
ContestedTransferResolution::Accepted { .. }
));
let attempt = cursor
.issue_attempt(
CursorDeliveryMode::Push {
window: SequenceWindow::new(64, 66).expect("window"),
},
1,
obligation(182),
)
.expect("attempt");
assert_eq!(
attempt.cursor_partition_lease(),
Some(CursorPartitionLease {
partition: 23,
leader: CursorLeaseHolder::Relay(relay),
lease_generation: cursor.current_lease().lease_generation,
})
);
}
#[test]
fn relay_transfer_requires_partition_scope_and_registered_partition() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let relay = NodeId::new("relay-invalid");
let generation = cursor.current_lease().lease_generation;
assert_eq!(
cursor.resolve_contested_transfer(&[CursorTransferProposal::control_capsule(
CursorLeaseHolder::Relay(relay.clone()),
generation,
obligation(183),
)]),
Err(ConsumerCursorError::RelayTransferRequiresPartition {
relay: relay.clone()
})
);
assert_eq!(
cursor.resolve_contested_transfer(&[CursorTransferProposal::delegated_partition(
CursorLeaseHolder::Relay(relay),
99,
generation,
obligation(184),
)]),
Err(ConsumerCursorError::UnknownCursorPartition { partition: 99 })
);
}
#[test]
fn partition_checkpoint_reporting_records_summary_for_active_partition() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let relay = NodeId::new("relay-summary");
delegate_partition_to_holder(
&mut cursor,
24,
CursorLeaseHolder::Relay(relay.clone()),
obligation(185),
);
let summary = cursor
.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 24,
lease_generation: cursor.current_lease().lease_generation,
ack_floor: 120,
delivered_through: 127,
pending_count: 3,
consumer_count: 2,
})
.expect("checkpoint")
.clone();
assert_eq!(
summary,
CursorPartitionSummary {
partition: 24,
selector: CursorPartitionSelector::ConsumerGroup("group-24".to_owned()),
leader: CursorLeaseHolder::Relay(relay),
lease_generation: cursor.current_lease().lease_generation,
ack_floor: 120,
delivered_through: 127,
pending_count: 3,
consumer_count: 2,
}
);
assert_eq!(cursor.partition_summary(24), Some(&summary));
}
#[test]
fn partition_checkpoint_reporting_requires_matching_scope_and_generation() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
assert_eq!(
cursor.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 25,
lease_generation: cursor.current_lease().lease_generation,
ack_floor: 0,
delivered_through: 0,
pending_count: 0,
consumer_count: 2,
}),
Err(
ConsumerCursorError::PartitionCheckpointRequiresDelegatedLease {
partition: 25,
current_scope: CursorLeaseScope::ControlCapsule,
}
)
);
let relay = NodeId::new("relay-fenced");
delegate_partition_to_holder(
&mut cursor,
25,
CursorLeaseHolder::Relay(relay),
obligation(186),
);
let current_generation = cursor.current_lease().lease_generation;
assert_eq!(
cursor.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 25,
lease_generation: current_generation.saturating_sub(1),
ack_floor: 0,
delivered_through: 0,
pending_count: 0,
consumer_count: 2,
}),
Err(ConsumerCursorError::StaleCursorPartitionCheckpoint {
partition: 25,
report_generation: current_generation.saturating_sub(1),
current_generation,
})
);
assert_eq!(
cursor.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 25,
lease_generation: current_generation,
ack_floor: 0,
delivered_through: 0,
pending_count: 0,
consumer_count: 1,
}),
Err(
ConsumerCursorError::PartitionCheckpointConsumerCountMismatch {
partition: 25,
reported_consumer_count: 1,
assigned_consumer_count: 2,
}
)
);
}
#[test]
fn replacing_partition_assignment_invalidates_stale_summary() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
delegate_partition_to_holder(
&mut cursor,
26,
CursorLeaseHolder::Relay(NodeId::new("relay-a")),
obligation(187),
);
cursor
.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 26,
lease_generation: cursor.current_lease().lease_generation,
ack_floor: 200,
delivered_through: 208,
pending_count: 4,
consumer_count: 2,
})
.expect("checkpoint");
assert!(cursor.partition_summary(26).is_some());
cursor
.assign_partition(partition_assignment(
26,
CursorPartitionSelector::SubjectSubRange {
start: "orders.n".to_owned(),
end: "orders.z".to_owned(),
},
))
.expect("replace assignment");
assert_eq!(
cursor
.partition_assignment(26)
.map(|assignment| &assignment.selector),
Some(&CursorPartitionSelector::SubjectSubRange {
start: "orders.n".to_owned(),
end: "orders.z".to_owned(),
})
);
assert_eq!(cursor.partition_summary(26), None);
}
#[test]
fn partition_rebalance_updates_assignment_and_invalidates_stale_summary() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let relay_a = NodeId::new("relay-a");
let relay_b = NodeId::new("relay-b");
delegate_partition_to_holder(
&mut cursor,
26,
CursorLeaseHolder::Relay(relay_a),
obligation(187),
);
cursor
.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 26,
lease_generation: cursor.current_lease().lease_generation,
ack_floor: 200,
delivered_through: 208,
pending_count: 4,
consumer_count: 2,
})
.expect("checkpoint");
let stale_generation = cursor.current_lease().lease_generation.saturating_sub(1);
assert_eq!(
cursor
.rebalance_partition(
26,
CursorLeaseHolder::Relay(relay_b.clone()),
stale_generation,
obligation(188),
)
.expect("stale rebalance result"),
ContestedTransferResolution::StaleNoOp {
current_lease: cursor.current_lease().clone(),
}
);
let accepted = cursor
.rebalance_partition(
26,
CursorLeaseHolder::Relay(relay_b.clone()),
cursor.current_lease().lease_generation,
obligation(189),
)
.expect("rebalance");
assert!(matches!(
accepted,
ContestedTransferResolution::Accepted { .. }
));
assert_eq!(
cursor
.partition_assignment(26)
.map(|assignment| &assignment.leader),
Some(&CursorLeaseHolder::Relay(relay_b.clone()))
);
assert_eq!(cursor.partition_summary(26), None);
let checkpoint = cursor
.report_partition_checkpoint(CursorPartitionCheckpoint {
partition: 26,
lease_generation: cursor.current_lease().lease_generation,
ack_floor: 209,
delivered_through: 214,
pending_count: 1,
consumer_count: 2,
})
.expect("post-rebalance checkpoint");
assert_eq!(checkpoint.leader, CursorLeaseHolder::Relay(relay_b));
}
#[test]
fn reconstruction_is_used_when_no_single_peer_covers_the_full_window() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let window = SequenceWindow::new(40, 45).expect("window");
cursor
.failover(NodeId::new("node-b"))
.expect("make node-b current");
let capsule = RecoverableCapsule::default()
.with_window(
NodeId::new("node-a"),
SequenceWindow::new(40, 42).expect("left window"),
)
.with_window(
NodeId::new("node-c"),
SequenceWindow::new(43, 45).expect("right window"),
);
assert_eq!(
cursor.plan_delivery(
CursorDeliveryMode::Pull(CursorRequest::Window(window)),
&capsule,
None
),
Ok(DeliveryPlan::Reconstructed {
contributors: vec![NodeId::new("node-a"), NodeId::new("node-c")],
})
);
}
#[test]
fn contested_transfer_prefers_steward_order_and_filters_stale_proposals() {
let cell = test_cell();
let mut cursor = FabricConsumerCursor::new(&cell).expect("cursor");
let current_generation = cursor.current_lease().lease_generation;
let resolution = cursor
.resolve_contested_transfer(&[
CursorTransferProposal::control_capsule(
CursorLeaseHolder::Steward(NodeId::new("node-c")),
current_generation,
obligation(20),
),
CursorTransferProposal::control_capsule(
CursorLeaseHolder::Steward(NodeId::new("node-b")),
current_generation,
obligation(21),
),
CursorTransferProposal::delegated_partition(
CursorLeaseHolder::Relay(NodeId::new("relay-2")),
27,
current_generation.saturating_sub(1),
obligation(22),
),
])
.expect("resolve contested transfer");
assert_eq!(
resolution,
ContestedTransferResolution::Accepted {
new_lease: cursor.current_lease().clone(),
winning_obligation: obligation(21),
}
);
assert_eq!(
cursor.current_lease().holder,
CursorLeaseHolder::Steward(NodeId::new("node-b"))
);
assert_eq!(
cursor.current_lease().lease_generation,
current_generation + 1
);
}
#[test]
fn fabric_consumer_creation_preserves_config_and_starts_clean() {
let cell = test_cell();
let config = FabricConsumerConfig {
durable_name: Some("orders-durable".to_owned()),
filter_subject: Some(SubjectPattern::parse("orders.*").expect("pattern")),
flow_control: true,
heartbeat: Some(std::time::Duration::from_secs(5)),
idle_heartbeat: Some(std::time::Duration::from_secs(15)),
..FabricConsumerConfig::default()
};
let consumer = FabricConsumer::new(&cell, config.clone()).expect("consumer");
assert_eq!(consumer.config(), &config);
assert_eq!(consumer.policy().mode, ConsumerDispatchMode::Push);
assert!(!consumer.policy().paused);
assert_eq!(consumer.state().delivered_count, 0);
assert_eq!(consumer.state().pending_count, 0);
assert_eq!(consumer.state().ack_floor, 0);
assert_eq!(consumer.waiting_pull_request_count(), 0);
assert_eq!(
consumer.current_lease().holder,
CursorLeaseHolder::Steward(NodeId::new("node-a"))
);
}
#[test]
fn fabric_consumer_config_rejects_zero_heartbeat_values() {
let heartbeat = FabricConsumerConfig {
heartbeat: Some(Duration::ZERO),
..FabricConsumerConfig::default()
};
assert_eq!(
heartbeat.validate(),
Err(FabricConsumerError::InvalidHeartbeat { field: "heartbeat" })
);
let idle_heartbeat = FabricConsumerConfig {
idle_heartbeat: Some(Duration::ZERO),
..FabricConsumerConfig::default()
};
assert_eq!(
idle_heartbeat.validate(),
Err(FabricConsumerError::InvalidHeartbeat {
field: "idle_heartbeat",
})
);
}
#[test]
fn fabric_consumer_mode_switching_clears_waiting_pull_requests() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(
PullRequest::new(2, ConsumerDemandClass::CatchUp).expect("pull request"),
)
.expect("queue pull request");
assert_eq!(consumer.waiting_pull_request_count(), 1);
consumer.switch_mode(ConsumerDispatchMode::Push);
assert_eq!(consumer.policy().mode, ConsumerDispatchMode::Push);
assert_eq!(consumer.waiting_pull_request_count(), 0);
}
#[test]
fn fabric_consumer_pull_queue_respects_max_waiting() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
max_waiting: 1,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(
PullRequest::new(1, ConsumerDemandClass::CatchUp).expect("first request"),
)
.expect("queue first");
assert_eq!(
consumer.queue_pull_request(
PullRequest::new(1, ConsumerDemandClass::Tail).expect("second request")
),
Err(FabricConsumerError::MaxWaitingExceeded { limit: 1 })
);
}
#[test]
fn fabric_consumer_stable_kernel_replaces_priority_overflow_without_audit_log() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
max_waiting: 1,
overflow_policy: ConsumerOverflowPolicy::ReplaceLowestPriority,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(1, ConsumerDemandClass::Replay).expect("replay"))
.expect("queue replay");
consumer
.queue_pull_request(PullRequest::new(1, ConsumerDemandClass::Tail).expect("tail"))
.expect("replace replay with tail");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 20).expect("window"),
);
let delivery = match consumer
.dispatch_next_pull(20, &capsule, None)
.expect("dispatch tail")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("tail request should dispatch"),
};
assert!(matches!(
delivery.request,
ScheduledConsumerRequest::Pull(PullRequest {
demand_class: ConsumerDemandClass::Tail,
..
})
));
assert!(consumer.decision_log().is_empty());
}
#[test]
fn fabric_consumer_priority_groups_dispatch_tail_before_replay() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 20).expect("window"),
);
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(2, ConsumerDemandClass::Replay).expect("replay"))
.expect("queue replay");
consumer
.queue_pull_request(PullRequest::new(2, ConsumerDemandClass::Tail).expect("tail"))
.expect("queue tail");
let first = match consumer
.dispatch_next_pull(20, &capsule, None)
.expect("dispatch first")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("tail request should schedule first"),
};
assert!(matches!(
&first.request,
ScheduledConsumerRequest::Pull(request)
if request.demand_class == ConsumerDemandClass::Tail
));
assert_eq!(
first.window,
SequenceWindow::new(19, 20).expect("tail window")
);
let second = match consumer
.dispatch_next_pull(20, &capsule, None)
.expect("dispatch second")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("replay request should schedule second"),
};
assert!(matches!(
&second.request,
ScheduledConsumerRequest::Pull(request)
if request.demand_class == ConsumerDemandClass::Replay
));
assert_eq!(
second.window,
SequenceWindow::new(1, 2).expect("replay window")
);
}
#[test]
fn fabric_consumer_audit_backed_overflow_replaces_replay_with_tail_and_records_evidence() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
max_waiting: 1,
adaptive_kernel: AdaptiveConsumerKernel::AuditBacked,
overflow_policy: ConsumerOverflowPolicy::ReplaceLowestPriority,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(1, ConsumerDemandClass::Replay).expect("replay"))
.expect("queue replay");
consumer
.queue_pull_request(PullRequest::new(1, ConsumerDemandClass::Tail).expect("tail"))
.expect("queue tail replacement");
assert_eq!(consumer.waiting_pull_request_count(), 1);
assert_eq!(consumer.decision_log().len(), 1);
let overflow = &consumer.decision_log()[0];
assert_eq!(overflow.kind, ConsumerDecisionKind::Overflow);
assert_eq!(overflow.action_name, "replace_low_priority");
assert_eq!(overflow.demand_class, Some(ConsumerDemandClass::Tail));
}
#[test]
fn fabric_consumer_pull_dispatches_audited_pinned_client_leased_delivery() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
adaptive_kernel: AdaptiveConsumerKernel::AuditBacked,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let relay = NodeId::new("relay-1");
let window = SequenceWindow::new(1, 2).expect("window");
let capsule = RecoverableCapsule::default().with_window(relay.clone(), window);
let transfer = delegate_partition_to_holder(
&mut consumer.cursor,
28,
CursorLeaseHolder::Relay(relay.clone()),
ObligationId::new_for_test(88, 0),
);
assert!(matches!(
transfer,
ContestedTransferResolution::Accepted { .. }
));
let ticket = consumer
.cursor
.grant_read_ticket(
relay.clone(),
window,
8,
CacheabilityRule::Private { max_age_ticks: 4 },
)
.expect("ticket");
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(
PullRequest::new(2, ConsumerDemandClass::CatchUp)
.expect("pull request")
.with_pinned_client(relay.clone()),
)
.expect("queue pull");
let delivery = match consumer
.dispatch_next_pull(2, &capsule, Some(&ticket))
.expect("dispatch pinned")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("pinned request should schedule"),
};
assert!(matches!(
&delivery.plan,
DeliveryPlan::LeasedRelay { relay: chosen, .. } if chosen == &relay
));
let decision = consumer.decision_log().last().expect("decision record");
assert_eq!(decision.kind, ConsumerDecisionKind::PullScheduling);
assert_eq!(decision.action_name, "leased_relay");
assert_eq!(decision.pinned_client.as_ref(), Some(&relay));
assert_eq!(decision.obligation_id, Some(delivery.attempt.obligation_id));
}
#[test]
fn fabric_consumer_rejects_pinned_client_ticket_mismatch() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
adaptive_kernel: AdaptiveConsumerKernel::AuditBacked,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let pinned = NodeId::new("relay-pinned");
let wrong_relay = NodeId::new("relay-wrong");
let window = SequenceWindow::new(1, 1).expect("window");
let capsule = RecoverableCapsule::default().with_window(wrong_relay.clone(), window);
let transfer = delegate_partition_to_holder(
&mut consumer.cursor,
29,
CursorLeaseHolder::Relay(wrong_relay.clone()),
ObligationId::new_for_test(89, 0),
);
assert!(matches!(
transfer,
ContestedTransferResolution::Accepted { .. }
));
let wrong_ticket = consumer
.cursor
.grant_read_ticket(
wrong_relay.clone(),
window,
8,
CacheabilityRule::Private { max_age_ticks: 4 },
)
.expect("ticket");
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(
PullRequest::new(1, ConsumerDemandClass::CatchUp)
.expect("pull request")
.with_pinned_client(pinned.clone()),
)
.expect("queue pull");
assert_eq!(
consumer.dispatch_next_pull(1, &capsule, Some(&wrong_ticket)),
Err(FabricConsumerError::PinnedClientTicketMismatch {
pinned_client: pinned,
ticket_relay: wrong_relay,
})
);
assert_eq!(consumer.waiting_pull_request_count(), 1);
}
#[test]
fn fabric_consumer_keeps_pull_request_queued_when_dispatch_is_paused() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
flow_control: true,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 4).expect("window"),
);
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(2, ConsumerDemandClass::CatchUp).expect("pull"))
.expect("queue pull");
consumer.pause().expect("pause");
assert_eq!(
consumer.dispatch_next_pull(4, &capsule, None),
Err(FabricConsumerError::ConsumerPaused)
);
assert_eq!(consumer.waiting_pull_request_count(), 1);
}
#[test]
fn fabric_consumer_pull_dispatches_catchup_then_tail_windows() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 12).expect("window"),
);
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(
PullRequest::new(3, ConsumerDemandClass::CatchUp).expect("catchup request"),
)
.expect("queue catchup");
let first_outcome = consumer
.dispatch_next_pull(12, &capsule, None)
.expect("dispatch catchup");
let first = if let PullDispatchOutcome::Scheduled(delivery) = first_outcome {
*delivery
} else {
assert!(false, "catchup request should schedule");
return;
};
assert_eq!(first.window, SequenceWindow::new(1, 3).expect("window"));
assert_eq!(consumer.state().pending_count, 3);
assert_eq!(
consumer.acknowledge_delivery(&first.attempt),
Ok(AckResolution::Committed {
obligation_id: first.attempt.obligation_id,
against: CursorLeaseHolder::Steward(NodeId::new("node-a")),
})
);
assert_eq!(consumer.state().pending_count, 0);
assert_eq!(consumer.state().ack_floor, 3);
consumer
.queue_pull_request(PullRequest::new(2, ConsumerDemandClass::Tail).expect("tail"))
.expect("queue tail");
let tail_outcome = consumer
.dispatch_next_pull(12, &capsule, None)
.expect("dispatch tail");
let tail = if let PullDispatchOutcome::Scheduled(delivery) = tail_outcome {
*delivery
} else {
assert!(false, "tail request should schedule");
return;
};
assert_eq!(tail.window, SequenceWindow::new(11, 12).expect("window"));
}
#[test]
fn fabric_consumer_tail_waits_when_fully_caught_up() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 10).expect("window"),
);
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(10, ConsumerDemandClass::CatchUp).expect("req"))
.expect("queue catchup");
let catchup = match consumer
.dispatch_next_pull(10, &capsule, None)
.expect("dispatch")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("catchup request should schedule"),
};
consumer
.acknowledge_delivery(&catchup.attempt)
.expect("ack catchup");
assert_eq!(consumer.state().ack_floor, 10);
consumer
.queue_pull_request(PullRequest::new(2, ConsumerDemandClass::Tail).expect("tail"))
.expect("queue tail");
let outcome = consumer
.dispatch_next_pull(10, &capsule, None)
.expect("tail should wait for fresh data");
assert!(matches!(outcome, PullDispatchOutcome::Waiting(_)));
assert_eq!(consumer.waiting_pull_request_count(), 1);
}
#[test]
fn fabric_consumer_tail_clamps_to_unacked_suffix() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 10).expect("window"),
);
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(9, ConsumerDemandClass::CatchUp).expect("req"))
.expect("queue catchup");
let catchup = match consumer
.dispatch_next_pull(10, &capsule, None)
.expect("dispatch")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("catchup request should schedule"),
};
consumer
.acknowledge_delivery(&catchup.attempt)
.expect("ack catchup");
assert_eq!(consumer.state().ack_floor, 9);
consumer
.queue_pull_request(PullRequest::new(3, ConsumerDemandClass::Tail).expect("tail"))
.expect("queue tail");
let tail = match consumer
.dispatch_next_pull(10, &capsule, None)
.expect("dispatch tail")
{
PullDispatchOutcome::Scheduled(delivery) => *delivery,
PullDispatchOutcome::Waiting(_) => panic!("tail should schedule newest unacked suffix"),
};
assert_eq!(tail.window, SequenceWindow::new(10, 10).expect("suffix"));
}
#[test]
fn fabric_consumer_pause_and_resume_gate_dispatch() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
flow_control: true,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let window = SequenceWindow::new(1, 1).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
consumer.pause().expect("pause");
assert_eq!(
consumer.dispatch_push(window, &capsule, None),
Err(FabricConsumerError::ConsumerPaused)
);
consumer.resume();
let delivery = consumer
.dispatch_push(window, &capsule, None)
.expect("dispatch after resume");
assert_eq!(delivery.window, window);
}
#[test]
fn fabric_consumer_max_ack_pending_blocks_until_ack_commit() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
max_ack_pending: 2,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let first_window = SequenceWindow::new(1, 2).expect("window");
let second_window = SequenceWindow::new(3, 3).expect("window");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 3).expect("capsule"),
);
let first = consumer
.dispatch_push(first_window, &capsule, None)
.expect("first dispatch");
assert_eq!(consumer.state().pending_count, 2);
assert_eq!(
consumer.dispatch_push(second_window, &capsule, None),
Err(FabricConsumerError::MaxAckPendingExceeded {
limit: 2,
pending: 2,
})
);
assert!(matches!(
consumer.acknowledge_delivery(&first.attempt),
Ok(AckResolution::Committed { .. })
));
assert_eq!(consumer.state().pending_count, 0);
let second = consumer
.dispatch_push(second_window, &capsule, None)
.expect("second dispatch");
assert_eq!(second.window, second_window);
}
#[test]
fn fabric_consumer_ack_commits_obligation_backed_state() {
let cell = test_cell();
let owner = FabricConsumerOwner {
holder: TaskId::new_for_test(41, 0),
region: RegionId::new_for_test(7, 0),
};
let mut consumer = FabricConsumer::new_owned(&cell, FabricConsumerConfig::default(), owner)
.expect("consumer");
let window = SequenceWindow::new(5, 6).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
let delivery = consumer
.dispatch_push(window, &capsule, None)
.expect("dispatch");
let reserved = consumer
.ledger
.get(delivery.attempt.obligation_id)
.expect("reserved record");
assert_eq!(reserved.kind, ObligationKind::Ack);
assert_eq!(reserved.holder, owner.holder);
assert_eq!(reserved.region, owner.region);
assert_eq!(consumer.obligation_stats().pending, 1);
assert_eq!(consumer.obligation_stats().total_acquired, 1);
assert!(matches!(
consumer.acknowledge_delivery(&delivery.attempt),
Ok(AckResolution::Committed { .. })
));
let committed = consumer
.ledger
.get(delivery.attempt.obligation_id)
.expect("committed record");
assert_eq!(committed.state, crate::record::ObligationState::Committed);
assert_eq!(consumer.obligation_stats().pending, 0);
assert_eq!(consumer.obligation_stats().total_committed, 1);
}
#[test]
fn fabric_consumer_nack_aborts_obligation_and_old_ack_is_stale() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let window = SequenceWindow::new(7, 7).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
let delivery = consumer
.dispatch_push(window, &capsule, None)
.expect("dispatch");
assert_eq!(
consumer.nack_delivery(&delivery.attempt, ConsumerNackReason::Explicit),
Ok(NackResolution::Aborted {
obligation_id: delivery.attempt.obligation_id,
window,
reason: ConsumerNackReason::Explicit,
})
);
let record = consumer
.ledger
.get(delivery.attempt.obligation_id)
.expect("aborted record");
assert_eq!(record.state, crate::record::ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Explicit));
assert_eq!(consumer.obligation_stats().total_aborted, 1);
assert_eq!(
consumer.acknowledge_delivery(&delivery.attempt),
Ok(AckResolution::StaleNoOp {
obligation_id: delivery.attempt.obligation_id,
current_generation: consumer.current_lease().lease_generation,
current_holder: CursorLeaseHolder::Steward(NodeId::new("node-a")),
})
);
}
#[test]
fn fabric_consumer_redelivery_mints_new_obligation_and_supersedes_old_attempt() {
let cell = test_cell();
let config = FabricConsumerConfig {
max_deliver: 3,
..FabricConsumerConfig::default()
};
let mut consumer = FabricConsumer::new(&cell, config).expect("consumer");
let window = SequenceWindow::new(8, 9).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
let first = consumer
.dispatch_push(window, &capsule, None)
.expect("first dispatch");
let redelivery = consumer
.redeliver_delivery(&first.attempt, &capsule, None)
.expect("redelivery");
assert_ne!(
first.attempt.obligation_id,
redelivery.attempt.obligation_id
);
assert!(consumer.decision_log().is_empty());
assert_eq!(
redelivery.attempt.supersedes_obligation_id,
Some(first.attempt.obligation_id)
);
let first_record = consumer
.ledger
.get(first.attempt.obligation_id)
.expect("first record");
assert_eq!(first_record.state, crate::record::ObligationState::Aborted);
assert_eq!(
first_record.abort_reason,
Some(ObligationAbortReason::Explicit)
);
assert_eq!(
consumer.acknowledge_delivery(&first.attempt),
Ok(AckResolution::StaleNoOp {
obligation_id: first.attempt.obligation_id,
current_generation: consumer.current_lease().lease_generation,
current_holder: CursorLeaseHolder::Steward(NodeId::new("node-a")),
})
);
assert!(matches!(
consumer.acknowledge_delivery(&redelivery.attempt),
Ok(AckResolution::Committed { obligation_id, .. })
if obligation_id == redelivery.attempt.obligation_id
));
let stats = consumer.obligation_stats();
assert_eq!(stats.total_acquired, 2);
assert_eq!(stats.total_aborted, 1);
assert_eq!(stats.total_committed, 1);
assert_eq!(stats.pending, 0);
}
#[test]
fn fabric_consumer_audit_backed_redelivery_requires_dead_letter_at_retry_limit() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
adaptive_kernel: AdaptiveConsumerKernel::AuditBacked,
max_deliver: 1,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let window = SequenceWindow::new(12, 12).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
let first = consumer
.dispatch_push(window, &capsule, None)
.expect("dispatch");
assert_eq!(
consumer.redeliver_delivery(&first.attempt, &capsule, None),
Err(FabricConsumerError::RedeliveryRequiresDeadLetter {
obligation_id: first.attempt.obligation_id,
delivery_attempt: 2,
})
);
let decision = consumer.decision_log().last().expect("redelivery decision");
assert_eq!(decision.kind, ConsumerDecisionKind::Redelivery);
assert_eq!(decision.action_name, "dead_letter");
assert_eq!(decision.obligation_id, Some(first.attempt.obligation_id));
}
#[test]
fn fabric_consumer_audit_backed_redelivery_defers_under_pending_pressure() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
adaptive_kernel: AdaptiveConsumerKernel::AuditBacked,
max_deliver: 3,
max_ack_pending: 1,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
let window = SequenceWindow::new(13, 13).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
let first = consumer
.dispatch_push(window, &capsule, None)
.expect("dispatch");
assert_eq!(
consumer.redeliver_delivery(&first.attempt, &capsule, None),
Err(FabricConsumerError::RedeliveryDeferred {
obligation_id: first.attempt.obligation_id,
delivery_attempt: 2,
})
);
let decision = consumer.decision_log().last().expect("redelivery decision");
assert_eq!(decision.kind, ConsumerDecisionKind::Redelivery);
assert_eq!(decision.action_name, "delay");
assert_eq!(decision.obligation_id, Some(first.attempt.obligation_id));
}
#[test]
fn fabric_consumer_dead_letter_records_reason_and_aborts_obligation() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let window = SequenceWindow::new(10, 10).expect("window");
let capsule = RecoverableCapsule::default().with_window(NodeId::new("node-a"), window);
let delivery = consumer
.dispatch_push(window, &capsule, None)
.expect("dispatch");
let transfer = consumer
.dead_letter_delivery(&delivery.attempt, "poison payload")
.expect("dead letter");
assert_eq!(transfer.obligation_id, delivery.attempt.obligation_id);
assert_eq!(transfer.window, window);
assert_eq!(transfer.reason, "poison payload");
let record = consumer
.ledger
.get(delivery.attempt.obligation_id)
.expect("dead-letter record");
assert_eq!(record.state, crate::record::ObligationState::Aborted);
assert_eq!(record.abort_reason, Some(ObligationAbortReason::Error));
assert_eq!(consumer.obligation_stats().pending, 0);
assert_eq!(consumer.obligation_stats().total_aborted, 1);
}
#[test]
fn pull_request_expiry_measured_from_original_enqueue_not_re_enqueue() {
let cell = test_cell();
let mut consumer = FabricConsumer::new(
&cell,
FabricConsumerConfig {
max_waiting: 4,
..FabricConsumerConfig::default()
},
)
.expect("consumer");
consumer.switch_mode(ConsumerDispatchMode::Pull);
let request = PullRequest::new(1, ConsumerDemandClass::Tail).expect("request");
let request = request.with_expires(10);
consumer.queue_pull_request(request).expect("enqueue");
consumer.advance_clock(5);
let capsule = RecoverableCapsule::default();
let outcome = consumer
.dispatch_next_pull(0, &capsule, None)
.expect("dispatch with no data");
assert!(matches!(outcome, PullDispatchOutcome::Waiting(_)));
assert_eq!(consumer.waiting_pull_request_count(), 1);
consumer.advance_clock(6);
let err = consumer.dispatch_next_pull(0, &capsule, None);
assert!(
err.is_err(),
"request should have expired by its original enqueue time"
);
}
#[test]
fn out_of_order_ack_does_not_advance_floor_past_pending() {
let cell = test_cell();
let mut consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let capsule = RecoverableCapsule::default().with_window(
NodeId::new("node-a"),
SequenceWindow::new(1, 20).expect("window"),
);
consumer.switch_mode(ConsumerDispatchMode::Pull);
consumer
.queue_pull_request(PullRequest::new(5, ConsumerDemandClass::CatchUp).expect("req"))
.expect("queue");
let d1 = match consumer.dispatch_next_pull(20, &capsule, None).expect("d") {
PullDispatchOutcome::Scheduled(d) => *d,
other => panic!("expected Scheduled, got {other:?}"), };
consumer.acknowledge_delivery(&d1.attempt).expect("ack");
assert_eq!(consumer.state().ack_floor, 5);
consumer
.queue_pull_request(PullRequest::new(5, ConsumerDemandClass::CatchUp).expect("req"))
.expect("queue");
let d2 = match consumer.dispatch_next_pull(20, &capsule, None).expect("d") {
PullDispatchOutcome::Scheduled(d) => *d,
other => panic!("expected Scheduled, got {other:?}"), };
assert_eq!(d2.window, SequenceWindow::new(6, 10).expect("w"));
consumer
.queue_pull_request(PullRequest::new(5, ConsumerDemandClass::CatchUp).expect("req"))
.expect("queue");
let d3 = match consumer.dispatch_next_pull(20, &capsule, None).expect("d") {
PullDispatchOutcome::Scheduled(d) => *d,
other => panic!("expected Scheduled, got {other:?}"), };
consumer.acknowledge_delivery(&d3.attempt).expect("ack d3");
assert!(
consumer.state().ack_floor <= 5,
"ack_floor must not advance past pending [6,10]; got {}",
consumer.state().ack_floor
);
consumer.acknowledge_delivery(&d2.attempt).expect("ack d2");
assert_eq!(
consumer.state().ack_floor,
10,
"ack_floor should advance to 10 once all windows up to 10 are acked"
);
}
#[test]
fn consumer_actor_lifecycle_transitions() {
let cell = test_cell();
let config = FabricConsumerConfig {
durable_name: Some("test-actor".to_owned()),
..FabricConsumerConfig::default()
};
let consumer = FabricConsumer::new(&cell, config).expect("consumer");
let actor = ConsumerActor::new(consumer);
assert_eq!(actor.lifecycle(), ConsumerActorLifecycle::Running);
assert_eq!(
actor.consumer().config().durable_name.as_deref(),
Some("test-actor")
);
}
#[test]
fn consumer_actor_pause_and_resume() {
let cell = test_cell();
let consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let mut actor = ConsumerActor::new(consumer);
let cx = crate::cx::Cx::for_testing();
assert_eq!(actor.lifecycle(), ConsumerActorLifecycle::Running);
futures_lite::future::block_on(
<ConsumerActor as crate::gen_server::GenServer>::handle_cast(
&mut actor,
&cx,
ConsumerCast::Pause,
),
);
assert_eq!(actor.lifecycle(), ConsumerActorLifecycle::Paused);
futures_lite::future::block_on(
<ConsumerActor as crate::gen_server::GenServer>::handle_cast(
&mut actor,
&cx,
ConsumerCast::Resume,
),
);
assert_eq!(actor.lifecycle(), ConsumerActorLifecycle::Running);
}
#[test]
fn consumer_actor_stopping_lifecycle() {
let cell = test_cell();
let consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let mut actor = ConsumerActor::new(consumer);
let cx = crate::cx::Cx::for_testing();
futures_lite::future::block_on(<ConsumerActor as crate::gen_server::GenServer>::on_stop(
&mut actor, &cx,
));
assert_eq!(actor.lifecycle(), ConsumerActorLifecycle::Stopping);
}
#[test]
fn consumer_actor_state_query() {
let cell = test_cell();
let consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let actor = ConsumerActor::new(consumer);
let state = actor.consumer().state();
assert_eq!(state.delivered_count, 0);
assert_eq!(state.pending_count, 0);
assert_eq!(state.ack_floor, 0);
}
#[test]
fn consumer_actor_mutable_consumer_access() {
let cell = test_cell();
let consumer =
FabricConsumer::new(&cell, FabricConsumerConfig::default()).expect("consumer");
let mut actor = ConsumerActor::new(consumer);
let consumer_ref = actor.consumer_mut();
assert_eq!(consumer_ref.state().delivered_count, 0);
}
}