use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ActorId {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub node: Option<String>,
}
impl ActorId {
pub fn new(path: impl Into<String>) -> Self {
Self {
path: path.into(),
node: None,
}
}
pub fn on_node(mut self, node: impl Into<String>) -> Self {
self.node = Some(node.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorIncarnation {
pub actor_id: ActorId,
pub incarnation: u64,
}
impl ActorIncarnation {
pub fn new(actor_id: ActorId, incarnation: u64) -> Self {
Self {
actor_id,
incarnation,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum FaultTarget {
Actor(ActorId),
Link { from: ActorId, to: ActorId },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PartitionScope {
Symmetric,
FromSourceOnly,
ToDestinationOnly,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum DelayScope {
Delivery,
Restart,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", tag = "kind")]
pub enum FaultCommand {
Kill {
actor_id: ActorId,
},
Partition {
actor_a: ActorId,
actor_b: ActorId,
scope: PartitionScope,
},
Delay {
actor_id: ActorId,
ticks: u64,
scope: DelayScope,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FaultCommandEnvelope {
pub sequence: u64,
pub tick: u64,
pub command: FaultCommand,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum LifecycleState {
Started,
Crashed,
Restarted,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum BridgeEventKind {
Lifecycle {
actor: ActorIncarnation,
state: LifecycleState,
restart_count: u32,
},
CommandApplied {
command: FaultCommand,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BridgeEvent {
pub tick: u64,
pub sequence: u64,
pub kind: BridgeEventKind,
}
#[derive(Debug, Default)]
pub struct DeterministicFaultBridge {
next_command_sequence: u64,
next_event_sequence: u64,
scheduled: VecDeque<FaultCommandEnvelope>,
events: VecDeque<BridgeEvent>,
}
impl DeterministicFaultBridge {
pub fn new() -> Self {
Self::default()
}
pub fn schedule(&mut self, tick: u64, command: FaultCommand) -> u64 {
let sequence = self.next_command_sequence;
self.next_command_sequence += 1;
let envelope = FaultCommandEnvelope {
sequence,
tick,
command,
};
let insert_at = self
.scheduled
.iter()
.position(|item| item.tick > tick || (item.tick == tick && item.sequence > sequence))
.unwrap_or(self.scheduled.len());
self.scheduled.insert(insert_at, envelope);
sequence
}
pub fn commands_ready_at(&mut self, tick: u64) -> Vec<FaultCommandEnvelope> {
let mut ready = Vec::new();
while matches!(self.scheduled.front(), Some(front) if front.tick <= tick) {
if let Some(front) = self.scheduled.pop_front() {
ready.push(front);
}
}
ready
}
pub fn record_command_applied(&mut self, tick: u64, command: FaultCommand) -> u64 {
self.push_event(tick, BridgeEventKind::CommandApplied { command })
}
pub fn record_lifecycle(
&mut self,
tick: u64,
actor: ActorIncarnation,
state: LifecycleState,
restart_count: u32,
) -> u64 {
self.push_event(
tick,
BridgeEventKind::Lifecycle {
actor,
state,
restart_count,
},
)
}
pub fn drain_events(&mut self) -> Vec<BridgeEvent> {
self.events.drain(..).collect()
}
fn push_event(&mut self, tick: u64, kind: BridgeEventKind) -> u64 {
let sequence = self.next_event_sequence;
self.next_event_sequence += 1;
self.events.push_back(BridgeEvent {
tick,
sequence,
kind,
});
sequence
}
}
#[cfg(test)]
mod tests {
use super::{
ActorId, ActorIncarnation, BridgeEventKind, DelayScope, DeterministicFaultBridge,
FaultCommand, LifecycleState, PartitionScope,
};
#[test]
fn schedules_faults_in_tick_then_sequence_order() {
let mut bridge = DeterministicFaultBridge::new();
bridge.schedule(
5,
FaultCommand::Kill {
actor_id: ActorId::new("/user/a"),
},
);
bridge.schedule(
3,
FaultCommand::Partition {
actor_a: ActorId::new("/user/a"),
actor_b: ActorId::new("/user/b"),
scope: PartitionScope::Symmetric,
},
);
bridge.schedule(
5,
FaultCommand::Delay {
actor_id: ActorId::new("/user/c"),
ticks: 4,
scope: DelayScope::Delivery,
},
);
let ready = bridge.commands_ready_at(5);
assert_eq!(ready.len(), 3);
assert!(matches!(ready[0].command, FaultCommand::Partition { .. }));
assert!(matches!(ready[1].command, FaultCommand::Kill { .. }));
assert!(matches!(ready[2].command, FaultCommand::Delay { .. }));
}
#[test]
fn records_lifecycle_events_with_monotonic_event_sequence() {
let mut bridge = DeterministicFaultBridge::new();
let actor = ActorId::new("/user/task-coordinator").on_node("node-a");
bridge.record_lifecycle(
10,
ActorIncarnation::new(actor.clone(), 0),
LifecycleState::Crashed,
0,
);
bridge.record_lifecycle(
14,
ActorIncarnation::new(actor, 1),
LifecycleState::Restarted,
1,
);
let events = bridge.drain_events();
assert_eq!(events.len(), 2);
assert_eq!(events[0].sequence, 0);
assert_eq!(events[1].sequence, 1);
match &events[1].kind {
BridgeEventKind::Lifecycle {
actor,
state,
restart_count,
} => {
assert_eq!(actor.incarnation, 1);
assert_eq!(*state, LifecycleState::Restarted);
assert_eq!(*restart_count, 1);
}
other => panic!("unexpected event kind: {other:?}"),
}
}
}