use crate::combinator::{
Bulkhead, BulkheadError, CircuitBreaker, CircuitBreakerError, Either, Select, SelectError,
};
use crate::error::{Error, ErrorKind};
use crate::observability::LogEntry;
use crate::time::TimeSource;
use crate::types::Time;
use crate::util::det_hash::DetHashMap;
use parking_lot::RwLock;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::task::{Context, Poll};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EpochId(pub u64);
impl EpochId {
pub const GENESIS: Self = Self(0);
pub const MAX: Self = Self(u64::MAX);
#[must_use]
pub const fn new(id: u64) -> Self {
Self(id)
}
#[must_use]
pub const fn next(self) -> Self {
match self.0.checked_add(1) {
Some(v) => Self(v),
None => panic!("EpochId overflow"),
}
}
#[must_use]
pub const fn saturating_next(self) -> Self {
Self(self.0.saturating_add(1))
}
#[must_use]
pub const fn prev(self) -> Option<Self> {
if self.0 == 0 {
None
} else {
Some(Self(self.0 - 1))
}
}
#[must_use]
pub const fn is_before(self, other: Self) -> bool {
self.0 < other.0
}
#[must_use]
pub const fn is_after(self, other: Self) -> bool {
self.0 > other.0
}
#[must_use]
pub const fn distance(self, other: Self) -> u64 {
self.0.abs_diff(other.0)
}
#[must_use]
pub const fn as_u64(self) -> u64 {
self.0
}
}
impl std::fmt::Display for EpochId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Epoch({})", self.0)
}
}
impl From<u64> for EpochId {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<EpochId> for u64 {
fn from(epoch: EpochId) -> Self {
epoch.0
}
}
#[derive(Debug, Clone)]
pub struct EpochConfig {
pub target_duration: Time,
pub min_duration: Time,
pub max_duration: Time,
pub grace_period: Time,
pub retention_epochs: u32,
pub require_quorum: bool,
pub quorum_size: u32,
}
impl Default for EpochConfig {
fn default() -> Self {
Self {
target_duration: Time::from_secs(60),
min_duration: Time::from_secs(30),
max_duration: Time::from_secs(120),
grace_period: Time::from_secs(10),
retention_epochs: 10,
require_quorum: false,
quorum_size: 0,
}
}
}
impl EpochConfig {
#[must_use]
pub fn short_lived() -> Self {
Self {
target_duration: Time::from_millis(100),
min_duration: Time::from_millis(50),
max_duration: Time::from_millis(200),
grace_period: Time::from_millis(20),
retention_epochs: 5,
require_quorum: false,
quorum_size: 0,
}
}
#[must_use]
pub fn long_lived() -> Self {
Self {
target_duration: Time::from_secs(300),
min_duration: Time::from_secs(120),
max_duration: Time::from_secs(600),
grace_period: Time::from_secs(30),
retention_epochs: 20,
require_quorum: true,
quorum_size: 3,
}
}
pub fn validate(&self) -> Result<(), Box<Error>> {
if self.min_duration > self.target_duration {
return Err(Box::new(
Error::new(ErrorKind::InvalidEncodingParams)
.with_message("min_duration must not exceed target_duration"),
));
}
if self.target_duration > self.max_duration {
return Err(Box::new(
Error::new(ErrorKind::InvalidEncodingParams)
.with_message("target_duration must not exceed max_duration"),
));
}
if self.require_quorum && self.quorum_size == 0 {
return Err(Box::new(
Error::new(ErrorKind::InvalidEncodingParams)
.with_message("quorum_size must be > 0 when require_quorum is true"),
));
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EpochState {
Preparing,
Active,
Ending,
Ended,
}
impl EpochState {
#[must_use]
pub const fn is_active(self) -> bool {
matches!(self, Self::Active)
}
#[must_use]
pub const fn is_terminal(self) -> bool {
matches!(self, Self::Ended)
}
#[must_use]
pub const fn allows_completion(self) -> bool {
matches!(self, Self::Active | Self::Ending)
}
}
#[derive(Debug, Clone)]
pub struct Epoch {
pub id: EpochId,
pub state: EpochState,
pub started_at: Time,
pub expected_end: Time,
pub ended_at: Option<Time>,
pub config: EpochConfig,
pub operation_count: u64,
pub metadata: DetHashMap<String, String>,
}
impl Epoch {
#[must_use]
pub fn new(id: EpochId, started_at: Time, config: EpochConfig) -> Self {
let expected_end = Time::from_nanos(
started_at
.as_nanos()
.saturating_add(config.target_duration.as_nanos()),
);
Self {
id,
state: EpochState::Active,
started_at,
expected_end,
ended_at: None,
config,
operation_count: 0,
metadata: DetHashMap::default(),
}
}
#[must_use]
pub fn genesis(config: EpochConfig) -> Self {
Self::new(EpochId::GENESIS, Time::ZERO, config)
}
#[must_use]
pub fn duration(&self, now: Time) -> Duration {
let end = self.ended_at.unwrap_or(now);
Duration::from_nanos(end.as_nanos().saturating_sub(self.started_at.as_nanos()))
}
#[must_use]
pub fn is_overdue(&self, now: Time) -> bool {
let max_end = Time::from_nanos(
self.started_at
.as_nanos()
.saturating_add(self.config.max_duration.as_nanos()),
);
now > max_end
}
#[must_use]
pub fn can_transition(&self, now: Time) -> bool {
let min_end = Time::from_nanos(
self.started_at
.as_nanos()
.saturating_add(self.config.min_duration.as_nanos()),
);
now >= min_end
}
#[must_use]
pub fn remaining(&self, now: Time) -> Option<Duration> {
if now >= self.expected_end {
None
} else {
Some(Duration::from_nanos(
self.expected_end.as_nanos() - now.as_nanos(),
))
}
}
pub fn record_operation(&mut self) {
self.operation_count = self.operation_count.saturating_add(1);
}
pub fn begin_ending(&mut self, _now: Time) -> Result<(), Box<Error>> {
if self.state != EpochState::Active {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message(format!("Cannot end epoch in state {:?}", self.state)),
));
}
self.state = EpochState::Ending;
Ok(())
}
pub fn complete(&mut self, now: Time) -> Result<(), Box<Error>> {
if !matches!(self.state, EpochState::Active | EpochState::Ending) {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message(format!("Cannot complete epoch in state {:?}", self.state)),
));
}
self.state = EpochState::Ended;
self.ended_at = Some(now);
Ok(())
}
pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.metadata.insert(key.into(), value.into());
}
#[allow(dead_code)]
fn log_created(&self) -> LogEntry {
LogEntry::info("Epoch created")
.with_field("epoch_id", format!("{}", self.id))
.with_field("started_at", format!("{}", self.started_at))
.with_field("expected_end", format!("{}", self.expected_end))
}
#[allow(dead_code)]
fn log_state_change(&self, old_state: EpochState) -> LogEntry {
LogEntry::info("Epoch state changed")
.with_field("epoch_id", format!("{}", self.id))
.with_field("from_state", format!("{old_state:?}"))
.with_field("to_state", format!("{:?}", self.state))
}
#[allow(dead_code)]
fn log_completed(&self) -> LogEntry {
LogEntry::info("Epoch completed")
.with_field("epoch_id", format!("{}", self.id))
.with_field("operations", format!("{}", self.operation_count))
.with_field("duration", format!("{:?}", self.ended_at))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SymbolValidityWindow {
pub start: EpochId,
pub end: EpochId,
}
impl SymbolValidityWindow {
#[must_use]
pub fn new(start: EpochId, end: EpochId) -> Self {
assert!(
!end.is_before(start),
"end epoch must not be before start epoch"
);
Self { start, end }
}
#[must_use]
pub fn single(epoch: EpochId) -> Self {
Self {
start: epoch,
end: epoch,
}
}
#[must_use]
pub fn infinite() -> Self {
Self {
start: EpochId::GENESIS,
end: EpochId::MAX,
}
}
#[must_use]
pub fn from_epoch(start: EpochId) -> Self {
Self {
start,
end: EpochId::MAX,
}
}
#[must_use]
pub fn until_epoch(end: EpochId) -> Self {
Self {
start: EpochId::GENESIS,
end,
}
}
#[must_use]
pub fn contains(&self, epoch: EpochId) -> bool {
epoch >= self.start && epoch <= self.end
}
#[must_use]
pub fn overlaps(&self, other: &Self) -> bool {
self.start <= other.end && other.start <= self.end
}
#[must_use]
pub fn intersection(&self, other: &Self) -> Option<Self> {
let start = std::cmp::max(self.start, other.start);
let end = std::cmp::min(self.end, other.end);
if start <= end {
Some(Self { start, end })
} else {
None
}
}
#[must_use]
pub fn span(&self) -> u64 {
(self.end.0 - self.start.0).saturating_add(1)
}
#[must_use]
pub fn extend_to(&self, epoch: EpochId) -> Self {
Self {
start: std::cmp::min(self.start, epoch),
end: std::cmp::max(self.end, epoch),
}
}
}
impl Default for SymbolValidityWindow {
fn default() -> Self {
Self::infinite()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BarrierTrigger {
AllArrived,
Timeout,
Cancelled,
Forced,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BarrierResult {
pub trigger: BarrierTrigger,
pub arrived: u32,
pub expected: u32,
pub triggered_at: Time,
}
#[derive(Debug)]
pub struct EpochBarrier {
epoch: EpochId,
expected: u32,
arrived: AtomicU64,
participants: RwLock<Vec<String>>,
triggered: RwLock<Option<BarrierResult>>,
timeout: Option<Time>,
created_at: Time,
}
impl EpochBarrier {
#[must_use]
pub fn new(epoch: EpochId, expected: u32, created_at: Time) -> Self {
Self {
epoch,
expected,
arrived: AtomicU64::new(0),
participants: RwLock::new(Vec::with_capacity(expected as usize)),
triggered: RwLock::new(None),
timeout: None,
created_at,
}
}
#[must_use]
pub fn with_timeout(mut self, timeout: Time) -> Self {
self.timeout = Some(timeout);
self
}
#[must_use]
pub fn epoch(&self) -> EpochId {
self.epoch
}
#[must_use]
pub fn expected(&self) -> u32 {
self.expected
}
#[must_use]
pub fn arrived(&self) -> u32 {
let val = self.arrived.load(Ordering::Acquire);
u32::try_from(val).unwrap_or(u32::MAX)
}
#[must_use]
pub fn remaining(&self) -> u32 {
self.expected.saturating_sub(self.arrived())
}
#[must_use]
pub fn is_triggered(&self) -> bool {
self.triggered.read().is_some()
}
#[must_use]
pub fn result(&self) -> Option<BarrierResult> {
self.triggered.read().clone()
}
pub fn arrive(
&self,
participant_id: &str,
now: Time,
) -> Result<Option<BarrierResult>, Box<Error>> {
if self.is_triggered() {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message("Barrier already triggered"),
));
}
if let Some(timeout) = self.timeout {
let deadline = Time::from_nanos(
self.created_at
.as_nanos()
.saturating_add(timeout.as_nanos()),
);
if now > deadline {
let result = BarrierResult {
trigger: BarrierTrigger::Timeout,
arrived: self.arrived(),
expected: self.expected,
triggered_at: now,
};
let mut triggered = self.triggered.write();
if triggered.is_some() {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message("Barrier already triggered"),
));
}
*triggered = Some(result.clone());
drop(triggered);
return Ok(Some(result));
}
}
let arrived = {
let mut participants = self.participants.write();
if participants.contains(&participant_id.to_string()) {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message("Participant already arrived"),
));
}
participants.push(participant_id.to_string());
let count = self.arrived.fetch_add(1, Ordering::AcqRel) + 1;
drop(participants);
count
};
if arrived >= u64::from(self.expected) {
let result = BarrierResult {
trigger: BarrierTrigger::AllArrived,
arrived: u32::try_from(arrived).unwrap_or(u32::MAX),
expected: self.expected,
triggered_at: now,
};
let mut triggered = self.triggered.write();
if triggered.is_some() {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message("Barrier already triggered"),
));
}
*triggered = Some(result.clone());
drop(triggered);
Ok(Some(result))
} else {
Ok(None)
}
}
pub fn force_trigger(&self, now: Time) -> BarrierResult {
let result = BarrierResult {
trigger: BarrierTrigger::Forced,
arrived: self.arrived(),
expected: self.expected,
triggered_at: now,
};
*self.triggered.write() = Some(result.clone());
result
}
pub fn cancel(&self, now: Time) -> BarrierResult {
let result = BarrierResult {
trigger: BarrierTrigger::Cancelled,
arrived: self.arrived(),
expected: self.expected,
triggered_at: now,
};
*self.triggered.write() = Some(result.clone());
result
}
#[must_use]
pub fn participants(&self) -> Vec<String> {
self.participants.read().clone()
}
#[allow(dead_code)]
fn log_arrival(&self, participant: &str) -> LogEntry {
LogEntry::debug("Epoch barrier arrival")
.with_field("epoch_id", format!("{}", self.epoch))
.with_field("participant", participant)
.with_field("arrived", format!("{}", self.arrived()))
.with_field("expected", format!("{}", self.expected))
}
#[allow(dead_code)]
fn log_triggered(&self, result: &BarrierResult) -> LogEntry {
LogEntry::info("Epoch barrier triggered")
.with_field("epoch_id", format!("{}", self.epoch))
.with_field("trigger", format!("{:?}", result.trigger))
.with_field("arrived", format!("{}", result.arrived))
.with_field("expected", format!("{}", result.expected))
}
}
#[derive(Debug)]
pub struct EpochClock {
current: AtomicU64,
config: EpochConfig,
history: RwLock<Vec<Epoch>>,
active_epoch: RwLock<Option<Epoch>>,
}
impl EpochClock {
#[must_use]
pub fn new(config: EpochConfig) -> Self {
Self {
current: AtomicU64::new(0),
config,
history: RwLock::new(Vec::new()),
active_epoch: RwLock::new(None),
}
}
pub fn initialize(&self, started_at: Time) {
let epoch = Epoch::new(EpochId::GENESIS, started_at, self.config.clone());
*self.active_epoch.write() = Some(epoch);
}
#[must_use]
pub fn current(&self) -> EpochId {
EpochId(self.current.load(Ordering::Acquire))
}
#[must_use]
pub fn active_epoch(&self) -> Option<Epoch> {
self.active_epoch.read().clone()
}
pub fn advance(&self, now: Time) -> Result<EpochId, Box<Error>> {
let mut active = self.active_epoch.write();
if let Some(ref mut epoch) = *active {
if !epoch.can_transition(now) && !epoch.is_overdue(now) {
return Err(Box::new(
Error::new(ErrorKind::InvalidStateTransition)
.with_message("Epoch has not met minimum duration"),
));
}
epoch.complete(now)?;
let mut history = self.history.write();
history.push(epoch.clone());
let retention = self.config.retention_epochs as usize;
let len = history.len();
if len > retention {
history.drain(0..len - retention);
}
}
let prev = self
.current
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
Some(v.saturating_add(1))
})
.unwrap_or_else(|v| v); let new_id = EpochId(prev.saturating_add(1));
let new_epoch = Epoch::new(new_id, now, self.config.clone());
*active = Some(new_epoch);
drop(active);
Ok(new_id)
}
#[must_use]
pub fn history(&self) -> Vec<Epoch> {
self.history.read().clone()
}
#[must_use]
pub fn get_epoch(&self, id: EpochId) -> Option<Epoch> {
if let Some(ref active) = *self.active_epoch.read() {
if active.id == id {
return Some(active.clone());
}
}
self.history.read().iter().find(|e| e.id == id).cloned()
}
}
pub trait EpochSource: Send + Sync {
fn current(&self) -> EpochId;
}
impl EpochSource for EpochClock {
fn current(&self) -> EpochId {
self.current()
}
}
impl EpochSource for EpochId {
fn current(&self) -> EpochId {
*self
}
}
#[derive(Debug, Clone)]
pub struct EpochContext {
pub epoch_id: EpochId,
pub started_at: Time,
pub deadline: Time,
pub operation_budget: Option<u32>,
operations_used: Arc<AtomicU32>,
}
impl EpochContext {
#[must_use]
pub fn new(epoch_id: EpochId, started_at: Time, deadline: Time) -> Self {
Self {
epoch_id,
started_at,
deadline,
operation_budget: None,
operations_used: Arc::new(AtomicU32::new(0)),
}
}
#[must_use]
pub fn with_operation_budget(mut self, budget: u32) -> Self {
self.operation_budget = Some(budget);
self
}
#[must_use]
pub fn is_expired(&self, now: Time) -> bool {
now >= self.deadline
}
#[must_use]
pub fn is_budget_exhausted(&self) -> bool {
self.operation_budget
.is_some_and(|limit| self.operations_used.load(Ordering::Acquire) >= limit)
}
#[must_use]
pub fn record_operation(&self) -> bool {
if let Some(limit) = self.operation_budget {
let mut current = self.operations_used.load(Ordering::Acquire);
loop {
if current >= limit {
return false;
}
match self.operations_used.compare_exchange_weak(
current,
current + 1,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(actual) => current = actual,
}
}
} else {
self.operations_used.fetch_add(1, Ordering::AcqRel);
true
}
}
#[must_use]
pub fn operations_used(&self) -> u32 {
self.operations_used.load(Ordering::Acquire)
}
#[must_use]
pub fn remaining_time(&self, now: Time) -> Option<Duration> {
if now >= self.deadline {
None
} else {
Some(Duration::from_nanos(
self.deadline.as_nanos() - now.as_nanos(),
))
}
}
#[allow(dead_code)] fn log_created(&self) -> LogEntry {
LogEntry::debug("Epoch context created")
.with_field("epoch_id", format!("{}", self.epoch_id))
.with_field("deadline_ms", format!("{}", self.deadline.as_millis()))
.with_field("operation_budget", format!("{:?}", self.operation_budget))
}
#[allow(dead_code)]
fn log_expired(&self, now: Time) -> LogEntry {
LogEntry::warn("Epoch expired")
.with_field("epoch_id", format!("{}", self.epoch_id))
.with_field("deadline_ms", format!("{}", self.deadline.as_millis()))
.with_field("current_time_ms", format!("{}", now.as_millis()))
}
#[allow(dead_code)]
fn log_budget_exhausted(&self) -> LogEntry {
LogEntry::info("Epoch operation budget exhausted")
.with_field("epoch_id", format!("{}", self.epoch_id))
.with_field("operations_used", format!("{}", self.operations_used()))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum EpochTransitionBehavior {
#[default]
AbortAll,
DrainExecuting,
Fail,
Ignore,
}
#[derive(Debug, Clone)]
pub struct EpochPolicy {
pub on_transition: EpochTransitionBehavior,
pub check_on_poll: bool,
pub propagate_to_children: bool,
pub grace_period: Option<Time>,
}
impl Default for EpochPolicy {
fn default() -> Self {
Self {
on_transition: EpochTransitionBehavior::AbortAll,
check_on_poll: true,
propagate_to_children: true,
grace_period: None,
}
}
}
impl EpochPolicy {
#[must_use]
pub fn strict() -> Self {
Self::default()
}
#[must_use]
pub fn lenient() -> Self {
Self {
on_transition: EpochTransitionBehavior::DrainExecuting,
check_on_poll: false,
propagate_to_children: true,
grace_period: Some(Time::from_millis(100)),
}
}
#[must_use]
pub fn ignore() -> Self {
Self {
on_transition: EpochTransitionBehavior::Ignore,
check_on_poll: false,
propagate_to_children: false,
grace_period: None,
}
}
}
pub struct EpochScoped<F, TS: TimeSource, ES: EpochSource> {
inner: F,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
started: bool,
}
impl<F, TS: TimeSource, ES: EpochSource> EpochScoped<F, TS, ES> {
#[must_use]
pub fn new(
inner: F,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> Self {
Self {
inner,
epoch_ctx,
policy,
time_source,
epoch_source,
started: false,
}
}
#[must_use]
pub fn epoch_context(&self) -> &EpochContext {
&self.epoch_ctx
}
#[must_use]
pub fn policy(&self) -> &EpochPolicy {
&self.policy
}
}
fn effective_deadline(deadline: Time, grace: Option<Time>) -> Time {
grace.map_or(deadline, |grace| {
Time::from_nanos(deadline.as_nanos().saturating_add(grace.as_nanos()))
})
}
fn check_epoch<TS: TimeSource, ES: EpochSource>(
epoch_ctx: &EpochContext,
policy: &EpochPolicy,
time_source: &TS,
epoch_source: &ES,
started: bool,
) -> Result<(), EpochError> {
let now = time_source.now();
if now >= effective_deadline(epoch_ctx.deadline, policy.grace_period) {
return Err(EpochError::Expired {
epoch: epoch_ctx.epoch_id,
});
}
if !policy.check_on_poll && started {
return Ok(());
}
let current = epoch_source.current();
if current == epoch_ctx.epoch_id {
Ok(())
} else {
match policy.on_transition {
EpochTransitionBehavior::Ignore => Ok(()),
EpochTransitionBehavior::DrainExecuting => {
if started {
Ok(())
} else {
Err(EpochError::TransitionOccurred {
from: epoch_ctx.epoch_id,
to: current,
})
}
}
EpochTransitionBehavior::Fail | EpochTransitionBehavior::AbortAll => {
Err(EpochError::TransitionOccurred {
from: epoch_ctx.epoch_id,
to: current,
})
}
}
}
}
impl<F, TS, ES> Future for EpochScoped<F, TS, ES>
where
F: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
type Output = Result<F::Output, EpochError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.started {
if let Err(err) = check_epoch(
&self.epoch_ctx,
&self.policy,
self.time_source.as_ref(),
self.epoch_source.as_ref(),
false,
) {
return Poll::Ready(Err(err));
}
self.started = true;
if !self.epoch_ctx.record_operation() {
let budget = self.epoch_ctx.operation_budget.unwrap_or(0);
return Poll::Ready(Err(EpochError::BudgetExhausted {
epoch: self.epoch_ctx.epoch_id,
budget,
used: self.epoch_ctx.operations_used(),
}));
}
}
if let Err(err) = check_epoch(
&self.epoch_ctx,
&self.policy,
self.time_source.as_ref(),
self.epoch_source.as_ref(),
true,
) {
return Poll::Ready(Err(err));
}
Pin::new(&mut self.inner).poll(cx).map(Ok)
}
}
pub struct EpochSelect<A, B, TS: TimeSource, ES: EpochSource> {
inner: Select<EpochScoped<A, TS, ES>, EpochScoped<B, TS, ES>>,
}
impl<A, B, TS: TimeSource, ES: EpochSource> EpochSelect<A, B, TS, ES> {
#[must_use]
pub fn new(
a: A,
b: B,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> Self {
let scoped_a = EpochScoped::new(
a,
epoch_ctx.clone(),
policy.clone(),
Arc::clone(&time_source),
Arc::clone(&epoch_source),
);
let scoped_b = EpochScoped::new(b, epoch_ctx, policy, time_source, epoch_source);
Self {
inner: Select::new(scoped_a, scoped_b),
}
}
}
impl<A, B, TS, ES> Future for EpochSelect<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
type Output =
Result<Either<Result<A::Output, EpochError>, Result<B::Output, EpochError>>, SelectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).poll(cx)
}
}
pub struct EpochJoin2<A, B, TS: TimeSource, ES: EpochSource>
where
A: Future + Unpin,
B: Future + Unpin,
{
a: EpochScoped<A, TS, ES>,
b: EpochScoped<B, TS, ES>,
a_done: Option<Result<A::Output, EpochError>>,
b_done: Option<Result<B::Output, EpochError>>,
}
impl<A, B, TS, ES> Unpin for EpochJoin2<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
}
impl<A, B, TS, ES> EpochJoin2<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
#[must_use]
pub fn new(
a: A,
b: B,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> Self {
Self {
a: EpochScoped::new(
a,
epoch_ctx.clone(),
policy.clone(),
Arc::clone(&time_source),
Arc::clone(&epoch_source),
),
b: EpochScoped::new(b, epoch_ctx, policy, time_source, epoch_source),
a_done: None,
b_done: None,
}
}
}
impl<A, B, TS, ES> Future for EpochJoin2<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
type Output = (Result<A::Output, EpochError>, Result<B::Output, EpochError>);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = Pin::get_mut(self);
if this.a_done.is_none() {
if let Poll::Ready(val) = Pin::new(&mut this.a).poll(cx) {
this.a_done = Some(val);
}
}
if this.b_done.is_none() {
if let Poll::Ready(val) = Pin::new(&mut this.b).poll(cx) {
this.b_done = Some(val);
}
}
match (&this.a_done, &this.b_done) {
(Some(_), Some(_)) => Poll::Ready((
this.a_done.take().expect("a_done missing"),
this.b_done.take().expect("b_done missing"),
)),
_ => Poll::Pending,
}
}
}
pub struct EpochRace2<A, B, TS: TimeSource, ES: EpochSource> {
a: EpochScoped<A, TS, ES>,
b: EpochScoped<B, TS, ES>,
}
impl<A, B, TS: TimeSource, ES: EpochSource> EpochRace2<A, B, TS, ES> {
#[must_use]
pub fn new(
a: A,
b: B,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> Self {
Self {
a: EpochScoped::new(
a,
epoch_ctx.clone(),
policy.clone(),
Arc::clone(&time_source),
Arc::clone(&epoch_source),
),
b: EpochScoped::new(b, epoch_ctx, policy, time_source, epoch_source),
}
}
}
impl<A, B, TS, ES> Future for EpochRace2<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
type Output = Either<Result<A::Output, EpochError>, Result<B::Output, EpochError>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(val) = Pin::new(&mut self.a).poll(cx) {
return Poll::Ready(Either::Left(val));
}
if let Poll::Ready(val) = Pin::new(&mut self.b).poll(cx) {
return Poll::Ready(Either::Right(val));
}
Poll::Pending
}
}
#[must_use]
pub fn epoch_select<A, B, TS, ES>(
a: A,
b: B,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> EpochSelect<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
EpochSelect::new(a, b, epoch_ctx, policy, time_source, epoch_source)
}
#[must_use]
pub fn epoch_join2<A, B, TS, ES>(
a: A,
b: B,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> EpochJoin2<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
EpochJoin2::new(a, b, epoch_ctx, policy, time_source, epoch_source)
}
#[must_use]
pub fn epoch_race2<A, B, TS, ES>(
a: A,
b: B,
epoch_ctx: EpochContext,
policy: EpochPolicy,
time_source: Arc<TS>,
epoch_source: Arc<ES>,
) -> EpochRace2<A, B, TS, ES>
where
A: Future + Unpin,
B: Future + Unpin,
TS: TimeSource,
ES: EpochSource,
{
EpochRace2::new(a, b, epoch_ctx, policy, time_source, epoch_source)
}
#[derive(Debug, Clone)]
pub enum EpochBulkheadError<E> {
Epoch(EpochError),
Bulkhead(BulkheadError<E>),
}
impl<E: fmt::Display> fmt::Display for EpochBulkheadError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Epoch(e) => write!(f, "{e}"),
Self::Bulkhead(e) => write!(f, "{e}"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for EpochBulkheadError<E> {}
#[derive(Debug, Clone)]
pub enum EpochCircuitBreakerError<E> {
Epoch(EpochError),
CircuitBreaker(CircuitBreakerError<E>),
}
impl<E: fmt::Display> fmt::Display for EpochCircuitBreakerError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Epoch(e) => write!(f, "{e}"),
Self::CircuitBreaker(e) => write!(f, "{e}"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for EpochCircuitBreakerError<E> {}
fn ensure_epoch_ready<TS: TimeSource, ES: EpochSource>(
epoch_ctx: &EpochContext,
policy: &EpochPolicy,
time_source: &TS,
epoch_source: &ES,
) -> Result<(), EpochError> {
check_epoch(epoch_ctx, policy, time_source, epoch_source, false)
}
pub fn bulkhead_call_in_epoch<T, E, F, TS, ES>(
bulkhead: &Bulkhead,
epoch_ctx: &EpochContext,
policy: &EpochPolicy,
time_source: &TS,
epoch_source: &ES,
op: F,
) -> Result<T, EpochBulkheadError<E>>
where
F: FnOnce() -> Result<T, E>,
E: fmt::Display,
TS: TimeSource,
ES: EpochSource,
{
ensure_epoch_ready(epoch_ctx, policy, time_source, epoch_source)
.map_err(EpochBulkheadError::Epoch)?;
bulkhead.call(op).map_err(EpochBulkheadError::Bulkhead)
}
pub fn bulkhead_call_weighted_in_epoch<T, E, F, TS, ES>(
bulkhead: &Bulkhead,
weight: u32,
epoch_ctx: &EpochContext,
policy: &EpochPolicy,
time_source: &TS,
epoch_source: &ES,
op: F,
) -> Result<T, EpochBulkheadError<E>>
where
F: FnOnce() -> Result<T, E>,
E: fmt::Display,
TS: TimeSource,
ES: EpochSource,
{
ensure_epoch_ready(epoch_ctx, policy, time_source, epoch_source)
.map_err(EpochBulkheadError::Epoch)?;
bulkhead
.call_weighted(weight, op)
.map_err(EpochBulkheadError::Bulkhead)
}
pub fn circuit_breaker_call_in_epoch<T, E, F, TS, ES>(
breaker: &CircuitBreaker,
epoch_ctx: &EpochContext,
policy: &EpochPolicy,
time_source: &TS,
epoch_source: &ES,
op: F,
) -> Result<T, EpochCircuitBreakerError<E>>
where
F: FnOnce() -> Result<T, E>,
E: fmt::Display,
TS: TimeSource,
ES: EpochSource,
{
ensure_epoch_ready(epoch_ctx, policy, time_source, epoch_source)
.map_err(EpochCircuitBreakerError::Epoch)?;
let now = time_source.now();
breaker
.call(now, op)
.map_err(EpochCircuitBreakerError::CircuitBreaker)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EpochError {
Expired {
epoch: EpochId,
},
BudgetExhausted {
epoch: EpochId,
budget: u32,
used: u32,
},
TransitionOccurred {
from: EpochId,
to: EpochId,
},
Mismatch {
expected: EpochId,
actual: EpochId,
},
ValidityViolation {
symbol_epoch: EpochId,
window: SymbolValidityWindow,
},
BarrierTimeout {
epoch: EpochId,
arrived: u32,
expected: u32,
},
}
impl std::fmt::Display for EpochError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Expired { epoch } => write!(f, "epoch {epoch} expired"),
Self::BudgetExhausted {
epoch,
budget,
used,
} => write!(f, "epoch {epoch} budget exhausted: used {used}/{budget}"),
Self::TransitionOccurred { from, to } => {
write!(f, "epoch transition from {from} to {to}")
}
Self::Mismatch { expected, actual } => {
write!(f, "epoch mismatch: expected {expected}, got {actual}")
}
Self::ValidityViolation {
symbol_epoch,
window,
} => {
write!(
f,
"symbol epoch {symbol_epoch} outside validity window [{}, {}]",
window.start, window.end
)
}
Self::BarrierTimeout {
epoch,
arrived,
expected,
} => {
write!(
f,
"barrier timeout for epoch {epoch}: {arrived}/{expected} arrived"
)
}
}
}
}
impl std::error::Error for EpochError {}
impl From<EpochError> for Error {
fn from(e: EpochError) -> Self {
match e {
EpochError::Expired { .. } => {
Self::new(ErrorKind::LeaseExpired).with_message(e.to_string())
}
EpochError::BudgetExhausted { .. } => {
Self::new(ErrorKind::PollQuotaExhausted).with_message(e.to_string())
}
EpochError::TransitionOccurred { .. } => {
Self::new(ErrorKind::Cancelled).with_message(e.to_string())
}
EpochError::Mismatch { .. } => {
Self::new(ErrorKind::InvalidStateTransition).with_message(e.to_string())
}
EpochError::ValidityViolation { .. } => {
Self::new(ErrorKind::ObjectMismatch).with_message(e.to_string())
}
EpochError::BarrierTimeout { .. } => {
Self::new(ErrorKind::ThresholdTimeout).with_message(e.to_string())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::combinator::{BulkheadPolicy, CircuitBreakerPolicy};
use crate::time::VirtualClock;
use futures_lite::future::block_on;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[derive(Debug)]
struct TestEpochSource {
current: AtomicU64,
}
impl TestEpochSource {
fn new(epoch: EpochId) -> Self {
Self {
current: AtomicU64::new(epoch.as_u64()),
}
}
fn set(&self, epoch: EpochId) {
self.current.store(epoch.as_u64(), Ordering::Release);
}
}
impl EpochSource for TestEpochSource {
fn current(&self) -> EpochId {
EpochId(self.current.load(Ordering::Acquire))
}
}
#[test]
fn test_epoch_id_ordering() {
init_test("test_epoch_id_ordering");
let e1 = EpochId(5);
let e2 = EpochId(10);
let before = e1.is_before(e2);
crate::assert_with_log!(before, "e1 before e2", true, before);
let after = e2.is_after(e1);
crate::assert_with_log!(after, "e2 after e1", true, after);
let same_before = e1.is_before(e1);
crate::assert_with_log!(!same_before, "e1 before e1", false, same_before);
let dist12 = e1.distance(e2);
crate::assert_with_log!(dist12 == 5, "distance 1->2", 5, dist12);
let dist21 = e2.distance(e1);
crate::assert_with_log!(dist21 == 5, "distance 2->1", 5, dist21);
crate::test_complete!("test_epoch_id_ordering");
}
#[test]
fn test_epoch_id_navigation() {
init_test("test_epoch_id_navigation");
let e = EpochId(5);
crate::assert_with_log!(e.next() == EpochId(6), "next", EpochId(6), e.next());
crate::assert_with_log!(
e.prev() == Some(EpochId(4)),
"prev",
Some(EpochId(4)),
e.prev()
);
let genesis_prev_none = EpochId::GENESIS.prev().is_none();
crate::assert_with_log!(
genesis_prev_none,
"genesis prev none",
true,
genesis_prev_none
);
crate::assert_with_log!(
EpochId::MAX.saturating_next() == EpochId::MAX,
"max saturating_next",
EpochId::MAX,
EpochId::MAX.saturating_next()
);
crate::test_complete!("test_epoch_id_navigation");
}
#[test]
fn test_epoch_config_validation() {
init_test("test_epoch_config_validation");
let valid = EpochConfig::default();
let valid_ok = valid.validate().is_ok();
crate::assert_with_log!(valid_ok, "valid ok", true, valid_ok);
let invalid_min = EpochConfig {
min_duration: Time::from_secs(100),
target_duration: Time::from_secs(60),
..EpochConfig::default()
};
let invalid_min_err = invalid_min.validate().is_err();
crate::assert_with_log!(invalid_min_err, "invalid min err", true, invalid_min_err);
let invalid_quorum = EpochConfig {
require_quorum: true,
quorum_size: 0,
..EpochConfig::default()
};
let invalid_quorum_err = invalid_quorum.validate().is_err();
crate::assert_with_log!(
invalid_quorum_err,
"invalid quorum err",
true,
invalid_quorum_err
);
crate::test_complete!("test_epoch_config_validation");
}
#[test]
fn test_epoch_lifecycle() {
init_test("test_epoch_lifecycle");
let config = EpochConfig::default();
let mut epoch = Epoch::new(EpochId(1), Time::from_millis(0), config);
crate::assert_with_log!(
epoch.state == EpochState::Active,
"state active",
EpochState::Active,
epoch.state
);
let active = epoch.state.is_active();
crate::assert_with_log!(active, "is_active", true, active);
epoch.begin_ending(Time::from_secs(60)).unwrap();
crate::assert_with_log!(
epoch.state == EpochState::Ending,
"state ending",
EpochState::Ending,
epoch.state
);
let allows = epoch.state.allows_completion();
crate::assert_with_log!(allows, "allows completion", true, allows);
epoch.complete(Time::from_secs(70)).unwrap();
crate::assert_with_log!(
epoch.state == EpochState::Ended,
"state ended",
EpochState::Ended,
epoch.state
);
let terminal = epoch.state.is_terminal();
crate::assert_with_log!(terminal, "terminal", true, terminal);
crate::test_complete!("test_epoch_lifecycle");
}
#[test]
fn test_epoch_transition_timing() {
init_test("test_epoch_transition_timing");
let config = EpochConfig {
min_duration: Time::from_secs(30),
target_duration: Time::from_secs(60),
max_duration: Time::from_secs(120),
..EpochConfig::default()
};
let epoch = Epoch::new(EpochId(1), Time::from_secs(0), config);
let can = epoch.can_transition(Time::from_secs(20));
crate::assert_with_log!(!can, "before min", false, can);
let can = epoch.can_transition(Time::from_secs(40));
crate::assert_with_log!(can, "after min", true, can);
let overdue = epoch.is_overdue(Time::from_secs(100));
crate::assert_with_log!(!overdue, "not overdue", false, overdue);
let overdue = epoch.is_overdue(Time::from_secs(130));
crate::assert_with_log!(overdue, "overdue", true, overdue);
crate::test_complete!("test_epoch_transition_timing");
}
#[test]
fn test_validity_window_contains() {
init_test("test_validity_window_contains");
let window = SymbolValidityWindow::new(EpochId(4), EpochId(10));
let contains4 = window.contains(EpochId(4));
crate::assert_with_log!(contains4, "contains 4", true, contains4);
let contains5 = window.contains(EpochId(5));
crate::assert_with_log!(contains5, "contains 5", true, contains5);
let contains7 = window.contains(EpochId(7));
crate::assert_with_log!(contains7, "contains 7", true, contains7);
let contains10 = window.contains(EpochId(10));
crate::assert_with_log!(contains10, "contains 10", true, contains10);
let contains11 = window.contains(EpochId(11));
crate::assert_with_log!(!contains11, "contains 11", false, contains11);
crate::test_complete!("test_validity_window_contains");
}
#[test]
fn test_validity_window_overlap() {
init_test("test_validity_window_overlap");
let w1 = SymbolValidityWindow::new(EpochId(1), EpochId(5));
let w2 = SymbolValidityWindow::new(EpochId(4), EpochId(8));
let w3 = SymbolValidityWindow::new(EpochId(6), EpochId(10));
let w1_w2 = w1.overlaps(&w2);
crate::assert_with_log!(w1_w2, "w1 overlaps w2", true, w1_w2);
let w2_w1 = w2.overlaps(&w1);
crate::assert_with_log!(w2_w1, "w2 overlaps w1", true, w2_w1);
let w1_w3 = w1.overlaps(&w3);
crate::assert_with_log!(!w1_w3, "w1 overlaps w3", false, w1_w3);
let intersection = w1.intersection(&w2);
crate::assert_with_log!(
intersection == Some(SymbolValidityWindow::new(EpochId(4), EpochId(5))),
"intersection",
Some(SymbolValidityWindow::new(EpochId(4), EpochId(5))),
intersection
);
crate::test_complete!("test_validity_window_overlap");
}
#[test]
fn test_validity_window_constructors() {
init_test("test_validity_window_constructors");
let single = SymbolValidityWindow::single(EpochId(5));
let span = single.span();
crate::assert_with_log!(span == 1, "single span", 1, span);
let contains5 = single.contains(EpochId(5));
crate::assert_with_log!(contains5, "contains 5", true, contains5);
let contains4 = single.contains(EpochId(4));
crate::assert_with_log!(!contains4, "contains 4", false, contains4);
let infinite = SymbolValidityWindow::infinite();
let contains_genesis = infinite.contains(EpochId::GENESIS);
crate::assert_with_log!(contains_genesis, "contains genesis", true, contains_genesis);
let contains_max = infinite.contains(EpochId::MAX);
crate::assert_with_log!(contains_max, "contains max", true, contains_max);
let from = SymbolValidityWindow::from_epoch(EpochId(5));
let contains4 = from.contains(EpochId(4));
crate::assert_with_log!(!contains4, "from contains 4", false, contains4);
let contains5 = from.contains(EpochId(5));
crate::assert_with_log!(contains5, "from contains 5", true, contains5);
let contains_max = from.contains(EpochId::MAX);
crate::assert_with_log!(contains_max, "from contains max", true, contains_max);
crate::test_complete!("test_validity_window_constructors");
}
#[test]
fn test_epoch_barrier_basic() {
init_test("test_epoch_barrier_basic");
let barrier = EpochBarrier::new(EpochId(1), 3, Time::ZERO);
let remaining = barrier.remaining();
crate::assert_with_log!(remaining == 3, "remaining", 3, remaining);
let triggered = barrier.is_triggered();
crate::assert_with_log!(!triggered, "triggered", false, triggered);
barrier.arrive("node1", Time::from_secs(1)).unwrap();
let arrived = barrier.arrived();
crate::assert_with_log!(arrived == 1, "arrived", 1, arrived);
let remaining = barrier.remaining();
crate::assert_with_log!(remaining == 2, "remaining", 2, remaining);
barrier.arrive("node2", Time::from_secs(2)).unwrap();
let arrived = barrier.arrived();
crate::assert_with_log!(arrived == 2, "arrived", 2, arrived);
let result = barrier.arrive("node3", Time::from_secs(3)).unwrap();
let some = result.is_some();
crate::assert_with_log!(some, "result some", true, some);
let trigger = result.unwrap().trigger;
crate::assert_with_log!(
trigger == BarrierTrigger::AllArrived,
"trigger",
BarrierTrigger::AllArrived,
trigger
);
let triggered = barrier.is_triggered();
crate::assert_with_log!(triggered, "triggered", true, triggered);
crate::test_complete!("test_epoch_barrier_basic");
}
#[test]
fn test_epoch_barrier_duplicate() {
init_test("test_epoch_barrier_duplicate");
let barrier = EpochBarrier::new(EpochId(1), 2, Time::ZERO);
barrier.arrive("node1", Time::from_secs(1)).unwrap();
let result = barrier.arrive("node1", Time::from_secs(2));
let err = result.is_err();
crate::assert_with_log!(err, "duplicate err", true, err);
crate::test_complete!("test_epoch_barrier_duplicate");
}
#[test]
fn test_epoch_barrier_timeout() {
init_test("test_epoch_barrier_timeout");
let barrier =
EpochBarrier::new(EpochId(1), 3, Time::ZERO).with_timeout(Time::from_secs(10));
barrier.arrive("node1", Time::from_secs(1)).unwrap();
let result = barrier.arrive("node2", Time::from_secs(15)).unwrap();
let some = result.is_some();
crate::assert_with_log!(some, "result some", true, some);
let trigger = result.unwrap().trigger;
crate::assert_with_log!(
trigger == BarrierTrigger::Timeout,
"trigger",
BarrierTrigger::Timeout,
trigger
);
crate::test_complete!("test_epoch_barrier_timeout");
}
#[test]
fn test_epoch_clock_advance() {
init_test("test_epoch_clock_advance");
let config = EpochConfig::short_lived();
let clock = EpochClock::new(config);
clock.initialize(Time::ZERO);
crate::assert_with_log!(
clock.current() == EpochId::GENESIS,
"current genesis",
EpochId::GENESIS,
clock.current()
);
let new_epoch = clock.advance(Time::from_millis(100)).unwrap();
crate::assert_with_log!(new_epoch == EpochId(1), "new epoch", EpochId(1), new_epoch);
crate::assert_with_log!(
clock.current() == EpochId(1),
"current",
EpochId(1),
clock.current()
);
crate::test_complete!("test_epoch_clock_advance");
}
#[test]
fn test_epoch_clock_history() {
init_test("test_epoch_clock_history");
let config = EpochConfig {
min_duration: Time::from_millis(10),
target_duration: Time::from_millis(50),
max_duration: Time::from_millis(100),
retention_epochs: 3,
..EpochConfig::default()
};
let clock = EpochClock::new(config);
clock.initialize(Time::ZERO);
for i in 1..=5 {
clock.advance(Time::from_millis(i * 20)).unwrap();
}
let history = clock.history();
let within = history.len() <= 3;
crate::assert_with_log!(within, "history len <= 3", true, within);
crate::test_complete!("test_epoch_clock_history");
}
#[test]
fn test_epoch_error_display() {
init_test("test_epoch_error_display");
let expired = EpochError::Expired { epoch: EpochId(5) };
let expired_str = expired.to_string();
let has_num = expired_str.contains('5');
crate::assert_with_log!(has_num, "contains 5", true, has_num);
let has_expired = expired_str.contains("expired");
crate::assert_with_log!(has_expired, "contains expired", true, has_expired);
let transition = EpochError::TransitionOccurred {
from: EpochId(1),
to: EpochId(2),
};
let transition_str = transition.to_string();
let has_transition = transition_str.contains("transition");
crate::assert_with_log!(has_transition, "contains transition", true, has_transition);
crate::test_complete!("test_epoch_error_display");
}
#[test]
fn test_epoch_metadata() {
init_test("test_epoch_metadata");
let config = EpochConfig::default();
let mut epoch = Epoch::new(EpochId(1), Time::ZERO, config);
epoch.set_metadata("version", "1.0.0");
epoch.set_metadata("leader", "node-1");
let expected_version = "1.0.0".to_string();
let expected_leader = "node-1".to_string();
let version = epoch.metadata.get("version");
crate::assert_with_log!(
version == Some(&expected_version),
"version",
Some(&expected_version),
version
);
let leader = epoch.metadata.get("leader");
crate::assert_with_log!(
leader == Some(&expected_leader),
"leader",
Some(&expected_leader),
leader
);
crate::test_complete!("test_epoch_metadata");
}
#[test]
fn test_epoch_context_budget() {
init_test("test_epoch_context_budget");
let ctx =
EpochContext::new(EpochId(1), Time::ZERO, Time::from_secs(10)).with_operation_budget(1);
let first = ctx.record_operation();
crate::assert_with_log!(first, "first record", true, first);
let second = ctx.record_operation();
crate::assert_with_log!(!second, "second record", false, second);
let exhausted = ctx.is_budget_exhausted();
crate::assert_with_log!(exhausted, "exhausted", true, exhausted);
let used = ctx.operations_used();
crate::assert_with_log!(used == 1, "operations used", 1, used);
crate::test_complete!("test_epoch_context_budget");
}
#[test]
fn test_epoch_scoped_expired() {
init_test("test_epoch_scoped_expired");
let clock = Arc::new(VirtualClock::starting_at(Time::from_secs(5)));
let epoch = EpochContext::new(EpochId(1), Time::ZERO, Time::from_secs(1));
let source = Arc::new(TestEpochSource::new(EpochId(1)));
let policy = EpochPolicy::strict();
let fut = EpochScoped::new(Box::pin(async { 42u32 }), epoch, policy, clock, source);
let result = block_on(fut);
let expired = matches!(result, Err(EpochError::Expired { .. }));
crate::assert_with_log!(expired, "expired", true, expired);
crate::test_complete!("test_epoch_scoped_expired");
}
#[test]
fn test_epoch_scoped_transition() {
init_test("test_epoch_scoped_transition");
let clock = Arc::new(VirtualClock::starting_at(Time::ZERO));
let source = Arc::new(TestEpochSource::new(EpochId(1)));
source.set(EpochId(2));
let epoch = EpochContext::new(EpochId(1), Time::ZERO, Time::from_secs(10));
let policy = EpochPolicy::strict();
let fut = EpochScoped::new(Box::pin(async { 7u8 }), epoch, policy, clock, source);
let result = block_on(fut);
let transitioned = matches!(result, Err(EpochError::TransitionOccurred { .. }));
crate::assert_with_log!(transitioned, "transitioned", true, transitioned);
crate::test_complete!("test_epoch_scoped_transition");
}
#[test]
fn test_epoch_select_left() {
init_test("test_epoch_select_left");
let clock = Arc::new(VirtualClock::starting_at(Time::ZERO));
let source = Arc::new(TestEpochSource::new(EpochId(1)));
let epoch = EpochContext::new(EpochId(1), Time::ZERO, Time::from_secs(10));
let policy = EpochPolicy::strict();
let fut = epoch_select(
Box::pin(async { 1u8 }),
Box::pin(async { 2u8 }),
epoch,
policy,
clock,
source,
);
let result = block_on(fut);
let ok = matches!(result, Ok(Either::Left(Ok(1))));
crate::assert_with_log!(ok, "epoch_select left result", true, ok);
crate::test_complete!("test_epoch_select_left");
}
#[test]
fn test_epoch_wrapped_bulkhead_and_circuit_breaker() {
init_test("test_epoch_wrapped_bulkhead_and_circuit_breaker");
let clock = Arc::new(VirtualClock::starting_at(Time::ZERO));
let epoch_source = Arc::new(TestEpochSource::new(EpochId(1)));
let policy = EpochPolicy::strict();
let epoch_ctx = EpochContext::new(EpochId(1), Time::ZERO, Time::from_secs(10));
let bulkhead = Bulkhead::new(BulkheadPolicy {
max_concurrent: 1,
..Default::default()
});
let breaker = CircuitBreaker::new(CircuitBreakerPolicy::default());
let bulkhead_result = bulkhead_call_in_epoch(
&bulkhead,
&epoch_ctx,
&policy,
&*clock,
&*epoch_source,
|| Ok::<_, &str>(5u32),
)
.expect("bulkhead call should succeed");
crate::assert_with_log!(bulkhead_result == 5, "bulkhead result", 5, bulkhead_result);
let breaker_result = circuit_breaker_call_in_epoch(
&breaker,
&epoch_ctx,
&policy,
&*clock,
&*epoch_source,
|| Ok::<_, &str>(9u32),
)
.expect("circuit breaker call should succeed");
crate::assert_with_log!(breaker_result == 9, "breaker result", 9, breaker_result);
crate::test_complete!("test_epoch_wrapped_bulkhead_and_circuit_breaker");
}
#[test]
fn test_epoch_id_overflow_saturation() {
init_test("test_epoch_id_overflow_saturation");
let max_epoch = EpochId::MAX;
let saturated = max_epoch.saturating_next();
crate::assert_with_log!(
saturated == EpochId::MAX,
"saturating_next at MAX",
EpochId::MAX,
saturated
);
let double_saturated = saturated.saturating_next();
crate::assert_with_log!(
double_saturated == EpochId::MAX,
"double saturating_next",
EpochId::MAX,
double_saturated
);
crate::test_complete!("test_epoch_id_overflow_saturation");
}
#[test]
#[should_panic(expected = "EpochId overflow")]
fn test_epoch_id_next_overflow_panics() {
init_test("test_epoch_id_next_overflow_panics");
let max_epoch = EpochId::MAX;
let _ = max_epoch.next();
}
#[test]
fn test_epoch_barrier_force_trigger() {
init_test("test_epoch_barrier_force_trigger");
let barrier = EpochBarrier::new(EpochId(1), 5, Time::ZERO);
barrier.arrive("node1", Time::from_secs(1)).unwrap();
let arrived = barrier.arrived();
crate::assert_with_log!(arrived == 1, "arrived before force", 1, arrived);
let result = barrier.force_trigger(Time::from_secs(2));
crate::assert_with_log!(
result.trigger == BarrierTrigger::Forced,
"trigger type",
BarrierTrigger::Forced,
result.trigger
);
crate::assert_with_log!(result.arrived == 1, "arrived in result", 1, result.arrived);
crate::assert_with_log!(
result.expected == 5,
"expected in result",
5,
result.expected
);
let triggered = barrier.is_triggered();
crate::assert_with_log!(triggered, "is_triggered", true, triggered);
crate::test_complete!("test_epoch_barrier_force_trigger");
}
#[test]
fn test_epoch_barrier_cancel() {
init_test("test_epoch_barrier_cancel");
let barrier = EpochBarrier::new(EpochId(2), 3, Time::ZERO);
barrier.arrive("node1", Time::from_secs(1)).unwrap();
barrier.arrive("node2", Time::from_secs(2)).unwrap();
let result = barrier.cancel(Time::from_secs(3));
crate::assert_with_log!(
result.trigger == BarrierTrigger::Cancelled,
"trigger type",
BarrierTrigger::Cancelled,
result.trigger
);
crate::assert_with_log!(result.arrived == 2, "arrived", 2, result.arrived);
let late_arrival = barrier.arrive("node3", Time::from_secs(4));
let err = late_arrival.is_err();
crate::assert_with_log!(err, "late arrival err", true, err);
crate::test_complete!("test_epoch_barrier_cancel");
}
#[test]
fn test_epoch_clock_advance_too_early() {
init_test("test_epoch_clock_advance_too_early");
let config = EpochConfig {
min_duration: Time::from_secs(30),
target_duration: Time::from_secs(60),
max_duration: Time::from_secs(120),
..EpochConfig::default()
};
let clock = EpochClock::new(config);
clock.initialize(Time::ZERO);
let result = clock.advance(Time::from_secs(10));
let err = result.is_err();
crate::assert_with_log!(err, "advance too early err", true, err);
crate::assert_with_log!(
clock.current() == EpochId::GENESIS,
"still genesis",
EpochId::GENESIS,
clock.current()
);
crate::test_complete!("test_epoch_clock_advance_too_early");
}
#[test]
fn test_epoch_clock_advance_overdue() {
init_test("test_epoch_clock_advance_overdue");
let config = EpochConfig {
min_duration: Time::from_secs(30),
target_duration: Time::from_secs(60),
max_duration: Time::from_secs(120),
..EpochConfig::default()
};
let clock = EpochClock::new(config);
clock.initialize(Time::ZERO);
let result = clock.advance(Time::from_secs(150));
let ok = result.is_ok();
crate::assert_with_log!(ok, "advance when overdue ok", true, ok);
crate::assert_with_log!(
clock.current() == EpochId(1),
"advanced to epoch 1",
EpochId(1),
clock.current()
);
crate::test_complete!("test_epoch_clock_advance_overdue");
}
#[test]
fn test_epoch_context_expiry() {
init_test("test_epoch_context_expiry");
let ctx = EpochContext::new(EpochId(1), Time::ZERO, Time::from_secs(10));
let expired_before = ctx.is_expired(Time::from_secs(5));
crate::assert_with_log!(!expired_before, "not expired at t=5", false, expired_before);
let expired_at = ctx.is_expired(Time::from_secs(10));
crate::assert_with_log!(expired_at, "expired at t=10", true, expired_at);
let expired_after = ctx.is_expired(Time::from_secs(15));
crate::assert_with_log!(expired_after, "expired at t=15", true, expired_after);
let remaining = ctx.remaining_time(Time::from_secs(3));
crate::assert_with_log!(
remaining == Some(Duration::from_secs(7)),
"remaining at t=3",
Some(Duration::from_secs(7)),
remaining
);
let no_remaining = ctx.remaining_time(Time::from_secs(12));
crate::assert_with_log!(
no_remaining.is_none(),
"no remaining at t=12",
true,
no_remaining.is_none()
);
crate::test_complete!("test_epoch_context_expiry");
}
#[test]
fn test_validity_window_extend() {
init_test("test_validity_window_extend");
let window = SymbolValidityWindow::new(EpochId(5), EpochId(10));
let extended_earlier = window.extend_to(EpochId(2));
crate::assert_with_log!(
extended_earlier.start == EpochId(2),
"extended start",
EpochId(2),
extended_earlier.start
);
crate::assert_with_log!(
extended_earlier.end == EpochId(10),
"extended end unchanged",
EpochId(10),
extended_earlier.end
);
let extended_later = window.extend_to(EpochId(15));
crate::assert_with_log!(
extended_later.start == EpochId(5),
"extended start unchanged",
EpochId(5),
extended_later.start
);
crate::assert_with_log!(
extended_later.end == EpochId(15),
"extended end",
EpochId(15),
extended_later.end
);
let no_change = window.extend_to(EpochId(7));
crate::assert_with_log!(
no_change == window,
"no change for contained epoch",
window,
no_change
);
crate::test_complete!("test_validity_window_extend");
}
#[test]
fn test_epoch_state_predicates_comprehensive() {
init_test("test_epoch_state_predicates_comprehensive");
let preparing = EpochState::Preparing;
crate::assert_with_log!(
!preparing.is_active(),
"preparing not active",
false,
preparing.is_active()
);
crate::assert_with_log!(
!preparing.is_terminal(),
"preparing not terminal",
false,
preparing.is_terminal()
);
crate::assert_with_log!(
!preparing.allows_completion(),
"preparing not allows_completion",
false,
preparing.allows_completion()
);
let active = EpochState::Active;
crate::assert_with_log!(
active.is_active(),
"active is_active",
true,
active.is_active()
);
crate::assert_with_log!(
!active.is_terminal(),
"active not terminal",
false,
active.is_terminal()
);
crate::assert_with_log!(
active.allows_completion(),
"active allows_completion",
true,
active.allows_completion()
);
let ending = EpochState::Ending;
crate::assert_with_log!(
!ending.is_active(),
"ending not active",
false,
ending.is_active()
);
crate::assert_with_log!(
!ending.is_terminal(),
"ending not terminal",
false,
ending.is_terminal()
);
crate::assert_with_log!(
ending.allows_completion(),
"ending allows_completion",
true,
ending.allows_completion()
);
let ended = EpochState::Ended;
crate::assert_with_log!(
!ended.is_active(),
"ended not active",
false,
ended.is_active()
);
crate::assert_with_log!(
ended.is_terminal(),
"ended is_terminal",
true,
ended.is_terminal()
);
crate::assert_with_log!(
!ended.allows_completion(),
"ended not allows_completion",
false,
ended.allows_completion()
);
crate::test_complete!("test_epoch_state_predicates_comprehensive");
}
#[test]
fn test_epoch_operation_counting() {
init_test("test_epoch_operation_counting");
let config = EpochConfig::default();
let mut epoch = Epoch::new(EpochId(1), Time::ZERO, config);
crate::assert_with_log!(
epoch.operation_count == 0,
"initial count",
0,
epoch.operation_count
);
epoch.record_operation();
crate::assert_with_log!(
epoch.operation_count == 1,
"after first",
1,
epoch.operation_count
);
epoch.record_operation();
epoch.record_operation();
crate::assert_with_log!(
epoch.operation_count == 3,
"after three",
3,
epoch.operation_count
);
crate::test_complete!("test_epoch_operation_counting");
}
#[test]
fn test_epoch_remaining_time() {
init_test("test_epoch_remaining_time");
let config = EpochConfig {
target_duration: Time::from_secs(100),
..EpochConfig::default()
};
let mut epoch = Epoch::new(EpochId(1), Time::from_secs(10), config);
let duration_active = epoch.duration(Time::from_secs(25));
crate::assert_with_log!(
duration_active == Duration::from_secs(15),
"active duration",
Duration::from_secs(15),
duration_active
);
epoch.begin_ending(Time::from_secs(50)).unwrap();
epoch.complete(Time::from_secs(60)).unwrap();
let duration_ended = epoch.duration(Time::from_secs(100));
crate::assert_with_log!(
duration_ended == Duration::from_secs(50),
"ended duration",
Duration::from_secs(50),
duration_ended
);
crate::test_complete!("test_epoch_remaining_time");
}
#[test]
fn test_epoch_policy_variants() {
init_test("test_epoch_policy_variants");
let strict = EpochPolicy::strict();
crate::assert_with_log!(
strict.on_transition == EpochTransitionBehavior::AbortAll,
"strict aborts",
EpochTransitionBehavior::AbortAll,
strict.on_transition
);
crate::assert_with_log!(
strict.check_on_poll,
"strict checks",
true,
strict.check_on_poll
);
let lenient = EpochPolicy::lenient();
crate::assert_with_log!(
lenient.on_transition == EpochTransitionBehavior::DrainExecuting,
"lenient drains",
EpochTransitionBehavior::DrainExecuting,
lenient.on_transition
);
crate::assert_with_log!(
!lenient.check_on_poll,
"lenient no check",
false,
lenient.check_on_poll
);
crate::assert_with_log!(
lenient.grace_period.is_some(),
"lenient has grace",
true,
lenient.grace_period.is_some()
);
let ignore = EpochPolicy::ignore();
crate::assert_with_log!(
ignore.on_transition == EpochTransitionBehavior::Ignore,
"ignore ignores",
EpochTransitionBehavior::Ignore,
ignore.on_transition
);
crate::assert_with_log!(
!ignore.propagate_to_children,
"ignore no propagate",
false,
ignore.propagate_to_children
);
crate::test_complete!("test_epoch_policy_variants");
}
#[test]
fn epoch_id_debug_format() {
let id = EpochId::new(42);
let dbg = format!("{id:?}");
assert!(dbg.contains("42"), "Debug should show value: {dbg}");
}
#[test]
fn epoch_id_display_format() {
let id = EpochId::new(7);
let disp = format!("{id}");
assert_eq!(disp, "Epoch(7)");
}
#[test]
fn epoch_id_from_conversions() {
let id: EpochId = 99u64.into();
assert_eq!(id, EpochId::new(99));
let raw: u64 = id.into();
assert_eq!(raw, 99);
assert_eq!(id.as_u64(), 99);
}
#[test]
fn epoch_id_hash_consistency() {
use std::collections::HashSet;
let mut set = HashSet::new();
set.insert(EpochId::new(1));
set.insert(EpochId::new(2));
set.insert(EpochId::new(1)); assert_eq!(set.len(), 2);
}
#[test]
fn epoch_id_constants() {
assert_eq!(EpochId::GENESIS, EpochId::new(0));
assert_eq!(EpochId::MAX, EpochId::new(u64::MAX));
assert!(EpochId::GENESIS.is_before(EpochId::MAX));
}
#[test]
fn epoch_config_debug_clone() {
let cfg = EpochConfig::default();
let dbg = format!("{cfg:?}");
assert!(dbg.contains("EpochConfig"));
let cfg2 = cfg.clone();
assert_eq!(cfg2.target_duration, cfg.target_duration);
assert_eq!(cfg2.retention_epochs, cfg.retention_epochs);
}
#[test]
fn epoch_config_short_lived_values() {
let cfg = EpochConfig::short_lived();
assert_eq!(cfg.target_duration, Time::from_millis(100));
assert_eq!(cfg.min_duration, Time::from_millis(50));
assert_eq!(cfg.max_duration, Time::from_millis(200));
assert_eq!(cfg.retention_epochs, 5);
assert!(!cfg.require_quorum);
}
#[test]
fn epoch_config_long_lived_values() {
let cfg = EpochConfig::long_lived();
assert_eq!(cfg.target_duration, Time::from_secs(300));
assert_eq!(cfg.min_duration, Time::from_secs(120));
assert!(cfg.require_quorum);
assert_eq!(cfg.quorum_size, 3);
}
#[test]
fn epoch_config_validate_target_exceeds_max() {
let cfg = EpochConfig {
target_duration: Time::from_secs(200),
max_duration: Time::from_secs(100),
..EpochConfig::default()
};
assert!(cfg.validate().is_err());
}
#[test]
fn epoch_state_debug_clone_copy() {
let s = EpochState::Active;
let s2 = s; let s3 = s; assert_eq!(s2, s3);
let dbg = format!("{s:?}");
assert!(dbg.contains("Active"));
}
#[test]
fn epoch_debug_clone() {
let cfg = EpochConfig::default();
let epoch = Epoch::new(EpochId::new(5), Time::from_secs(10), cfg);
let dbg = format!("{epoch:?}");
assert!(dbg.contains("Epoch"));
let epoch2 = epoch;
assert_eq!(epoch2.id, EpochId::new(5));
assert_eq!(epoch2.state, EpochState::Active);
}
#[test]
fn epoch_genesis_constructor() {
let cfg = EpochConfig::default();
let epoch = Epoch::genesis(cfg);
assert_eq!(epoch.id, EpochId::GENESIS);
assert_eq!(epoch.started_at, Time::ZERO);
assert_eq!(epoch.state, EpochState::Active);
assert_eq!(epoch.operation_count, 0);
}
#[test]
fn symbol_validity_window_default_is_infinite() {
let w = SymbolValidityWindow::default();
assert_eq!(w.start, EpochId::GENESIS);
assert_eq!(w.end, EpochId::MAX);
assert!(w.contains(EpochId::new(1_000_000)));
}
#[test]
fn symbol_validity_window_until_epoch() {
let w = SymbolValidityWindow::until_epoch(EpochId::new(10));
assert_eq!(w.start, EpochId::GENESIS);
assert_eq!(w.end, EpochId::new(10));
assert!(w.contains(EpochId::new(0)));
assert!(w.contains(EpochId::new(10)));
assert!(!w.contains(EpochId::new(11)));
}
#[test]
fn symbol_validity_window_span_multi() {
let w = SymbolValidityWindow::new(EpochId::new(3), EpochId::new(7));
assert_eq!(w.span(), 5); }
#[test]
fn barrier_trigger_debug_clone_eq() {
let t = BarrierTrigger::AllArrived;
let t2 = t.clone();
assert_eq!(t, t2);
assert!(format!("{t:?}").contains("AllArrived"));
assert_ne!(BarrierTrigger::Timeout, BarrierTrigger::Cancelled);
assert_ne!(BarrierTrigger::Forced, BarrierTrigger::AllArrived);
}
#[test]
fn barrier_result_debug_clone_eq() {
let r = BarrierResult {
trigger: BarrierTrigger::AllArrived,
arrived: 3,
expected: 3,
triggered_at: Time::from_secs(10),
};
let r2 = r.clone();
assert_eq!(r, r2);
assert!(format!("{r:?}").contains("BarrierResult"));
assert_eq!(r.arrived, 3);
}
#[test]
fn epoch_transition_behavior_default() {
let b = EpochTransitionBehavior::default();
assert_eq!(b, EpochTransitionBehavior::AbortAll);
let b2 = b; assert_eq!(b, b2);
assert!(format!("{b:?}").contains("AbortAll"));
}
#[test]
fn epoch_error_display_all_variants() {
let e1 = EpochError::BudgetExhausted {
epoch: EpochId::new(1),
budget: 100,
used: 100,
};
let s1 = e1.to_string();
assert!(s1.contains("budget"), "BudgetExhausted: {s1}");
assert!(s1.contains("100"));
let e2 = EpochError::Mismatch {
expected: EpochId::new(1),
actual: EpochId::new(2),
};
assert!(e2.to_string().contains("mismatch"));
let e3 = EpochError::ValidityViolation {
symbol_epoch: EpochId::new(5),
window: SymbolValidityWindow::new(EpochId::new(1), EpochId::new(3)),
};
assert!(e3.to_string().contains("validity"));
let e4 = EpochError::BarrierTimeout {
epoch: EpochId::new(1),
arrived: 2,
expected: 5,
};
let s4 = e4.to_string();
assert!(s4.contains("barrier") || s4.contains("timeout"), "{s4}");
}
#[test]
fn epoch_error_is_std_error() {
let e = EpochError::Expired {
epoch: EpochId::new(1),
};
let err: &dyn std::error::Error = &e;
assert!(!err.to_string().is_empty());
}
#[test]
fn epoch_error_clone_eq() {
let e1 = EpochError::Expired {
epoch: EpochId::new(3),
};
let e2 = e1.clone();
assert_eq!(e1, e2);
}
#[test]
fn epoch_context_debug_clone() {
let ctx = EpochContext::new(EpochId::new(1), Time::ZERO, Time::from_secs(10));
let dbg = format!("{ctx:?}");
assert!(dbg.contains("EpochContext"));
let ctx2 = ctx;
assert_eq!(ctx2.epoch_id, EpochId::new(1));
}
#[test]
fn epoch_policy_debug_clone_default() {
let p = EpochPolicy::default();
let dbg = format!("{p:?}");
assert!(dbg.contains("EpochPolicy"));
let p2 = p;
assert_eq!(p2.on_transition, EpochTransitionBehavior::AbortAll);
assert!(p2.check_on_poll);
assert!(p2.propagate_to_children);
assert!(p2.grace_period.is_none());
}
}