use crate::error::MobError;
use crate::ids::MeerkatId;
use crate::roster::RosterEntry;
use meerkat_core::agent::CommsRuntime as CoreCommsRuntime;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(super) enum DisposalStep {
StopHostLoop,
NotifyPeers,
RemoveTrustEdges,
ArchiveSession,
}
impl DisposalStep {
pub(super) const ORDERED: [DisposalStep; 4] = [
DisposalStep::StopHostLoop,
DisposalStep::NotifyPeers,
DisposalStep::RemoveTrustEdges,
DisposalStep::ArchiveSession,
];
pub(super) fn is_peer_step(self) -> bool {
matches!(self, Self::NotifyPeers | Self::RemoveTrustEdges)
}
}
impl std::fmt::Display for DisposalStep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StopHostLoop => f.write_str("StopHostLoop"),
Self::NotifyPeers => f.write_str("NotifyPeers"),
Self::RemoveTrustEdges => f.write_str("RemoveTrustEdges"),
Self::ArchiveSession => f.write_str("ArchiveSession"),
}
}
}
pub(super) struct DisposalContext {
pub meerkat_id: MeerkatId,
pub entry: RosterEntry,
pub retiring_comms: Option<Arc<dyn CoreCommsRuntime>>,
pub retiring_key: Option<String>,
}
pub(super) struct DisposalReport {
pub completed: Vec<DisposalStep>,
pub skipped: Vec<(DisposalStep, MobError)>,
pub aborted_at: Option<(DisposalStep, MobError)>,
pub roster_removed: bool,
}
impl DisposalReport {
pub(super) fn new() -> Self {
Self {
completed: Vec::new(),
skipped: Vec::new(),
aborted_at: None,
roster_removed: false,
}
}
#[cfg_attr(not(test), allow(dead_code))]
pub(super) fn is_clean(&self) -> bool {
self.skipped.is_empty() && self.aborted_at.is_none()
}
}
pub(super) trait ErrorPolicy: Send {
fn on_step_error(
&mut self,
step: DisposalStep,
error: &MobError,
ctx: &DisposalContext,
) -> bool;
}
pub(super) struct WarnAndContinue;
impl ErrorPolicy for WarnAndContinue {
fn on_step_error(
&mut self,
step: DisposalStep,
error: &MobError,
ctx: &DisposalContext,
) -> bool {
tracing::warn!(
meerkat_id = %ctx.meerkat_id,
step = %step,
error = %error,
"retire: step failed (continuing)"
);
true
}
}
pub(super) struct BulkBestEffort;
impl ErrorPolicy for BulkBestEffort {
fn on_step_error(
&mut self,
step: DisposalStep,
error: &MobError,
ctx: &DisposalContext,
) -> bool {
if step.is_peer_step() {
tracing::debug!(
meerkat_id = %ctx.meerkat_id,
step = %step,
error = %error,
"retire(bulk): step failed (expected during concurrent teardown)"
);
} else {
tracing::warn!(
meerkat_id = %ctx.meerkat_id,
step = %step,
error = %error,
"retire(bulk): step failed (continuing)"
);
}
true
}
}
#[cfg_attr(not(test), allow(dead_code))]
pub(super) struct AbortOnError;
impl ErrorPolicy for AbortOnError {
fn on_step_error(
&mut self,
_step: DisposalStep,
_error: &MobError,
_ctx: &DisposalContext,
) -> bool {
false
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::event::MemberRef;
use crate::ids::ProfileName;
use crate::roster::MemberState;
use meerkat_core::types::SessionId;
use std::collections::BTreeSet;
fn test_ctx() -> DisposalContext {
DisposalContext {
meerkat_id: MeerkatId::from("test-member"),
entry: RosterEntry {
meerkat_id: MeerkatId::from("test-member"),
profile: ProfileName::from("worker"),
member_ref: MemberRef::from_session_id(SessionId::new()),
runtime_mode: crate::MobRuntimeMode::TurnDriven,
state: MemberState::Retiring,
wired_to: BTreeSet::new(),
labels: std::collections::BTreeMap::new(),
},
retiring_comms: None,
retiring_key: None,
}
}
fn test_error() -> MobError {
MobError::Internal("test error".to_string())
}
#[test]
fn test_warn_and_continue_always_continues() {
let mut policy = WarnAndContinue;
let ctx = test_ctx();
for step in DisposalStep::ORDERED {
assert!(policy.on_step_error(step, &test_error(), &ctx));
}
}
#[test]
fn test_bulk_best_effort_always_continues() {
let mut policy = BulkBestEffort;
let ctx = test_ctx();
for step in DisposalStep::ORDERED {
assert!(policy.on_step_error(step, &test_error(), &ctx));
}
}
#[test]
fn test_bulk_best_effort_uses_is_peer_step() {
assert!(!DisposalStep::StopHostLoop.is_peer_step());
assert!(DisposalStep::NotifyPeers.is_peer_step());
assert!(DisposalStep::RemoveTrustEdges.is_peer_step());
assert!(!DisposalStep::ArchiveSession.is_peer_step());
}
#[test]
fn test_abort_on_error_stops() {
let mut policy = AbortOnError;
let ctx = test_ctx();
for step in DisposalStep::ORDERED {
assert!(!policy.on_step_error(step, &test_error(), &ctx));
}
}
#[test]
fn test_disposal_report_is_clean() {
let mut report = DisposalReport::new();
assert!(report.is_clean());
report.completed.push(DisposalStep::StopHostLoop);
assert!(report.is_clean());
}
#[test]
fn test_disposal_report_tracks_skipped_steps() {
let mut report = DisposalReport::new();
report
.skipped
.push((DisposalStep::NotifyPeers, test_error()));
assert!(!report.is_clean());
}
#[test]
fn test_disposal_report_tracks_abort() {
let mut report = DisposalReport::new();
report.aborted_at = Some((DisposalStep::ArchiveSession, test_error()));
assert!(!report.is_clean());
}
#[test]
fn test_disposal_step_ordering_invariants() {
let steps = DisposalStep::ORDERED;
let stop_idx = steps
.iter()
.position(|s| *s == DisposalStep::StopHostLoop)
.unwrap();
let notify_idx = steps
.iter()
.position(|s| *s == DisposalStep::NotifyPeers)
.unwrap();
let trust_idx = steps
.iter()
.position(|s| *s == DisposalStep::RemoveTrustEdges)
.unwrap();
let archive_idx = steps
.iter()
.position(|s| *s == DisposalStep::ArchiveSession)
.unwrap();
assert!(
stop_idx < notify_idx,
"StopHostLoop must precede NotifyPeers"
);
assert!(
notify_idx < trust_idx,
"NotifyPeers must precede RemoveTrustEdges"
);
assert!(
trust_idx < archive_idx,
"RemoveTrustEdges must precede ArchiveSession"
);
}
#[test]
fn test_disposal_step_display() {
assert_eq!(DisposalStep::StopHostLoop.to_string(), "StopHostLoop");
assert_eq!(DisposalStep::NotifyPeers.to_string(), "NotifyPeers");
assert_eq!(
DisposalStep::RemoveTrustEdges.to_string(),
"RemoveTrustEdges"
);
assert_eq!(DisposalStep::ArchiveSession.to_string(), "ArchiveSession");
}
}