strontium-palladium 0.5.0

Deterministic fault-bridge contracts for Palladium actor simulations under Strontium control.
Documentation
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ActorId {
    pub path: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub node: Option<String>,
}

impl ActorId {
    pub fn new(path: impl Into<String>) -> Self {
        Self {
            path: path.into(),
            node: None,
        }
    }

    pub fn on_node(mut self, node: impl Into<String>) -> Self {
        self.node = Some(node.into());
        self
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActorIncarnation {
    pub actor_id: ActorId,
    pub incarnation: u64,
}

impl ActorIncarnation {
    pub fn new(actor_id: ActorId, incarnation: u64) -> Self {
        Self {
            actor_id,
            incarnation,
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum FaultTarget {
    Actor(ActorId),
    Link { from: ActorId, to: ActorId },
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum PartitionScope {
    Symmetric,
    FromSourceOnly,
    ToDestinationOnly,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum DelayScope {
    Delivery,
    Restart,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", tag = "kind")]
pub enum FaultCommand {
    Kill {
        actor_id: ActorId,
    },
    Partition {
        actor_a: ActorId,
        actor_b: ActorId,
        scope: PartitionScope,
    },
    Delay {
        actor_id: ActorId,
        ticks: u64,
        scope: DelayScope,
    },
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FaultCommandEnvelope {
    pub sequence: u64,
    pub tick: u64,
    pub command: FaultCommand,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum LifecycleState {
    Started,
    Crashed,
    Restarted,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum BridgeEventKind {
    Lifecycle {
        actor: ActorIncarnation,
        state: LifecycleState,
        restart_count: u32,
    },
    CommandApplied {
        command: FaultCommand,
    },
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct BridgeEvent {
    pub tick: u64,
    pub sequence: u64,
    pub kind: BridgeEventKind,
}

#[derive(Debug, Default)]
pub struct DeterministicFaultBridge {
    next_command_sequence: u64,
    next_event_sequence: u64,
    scheduled: VecDeque<FaultCommandEnvelope>,
    events: VecDeque<BridgeEvent>,
}

impl DeterministicFaultBridge {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn schedule(&mut self, tick: u64, command: FaultCommand) -> u64 {
        let sequence = self.next_command_sequence;
        self.next_command_sequence += 1;

        let envelope = FaultCommandEnvelope {
            sequence,
            tick,
            command,
        };

        let insert_at = self
            .scheduled
            .iter()
            .position(|item| item.tick > tick || (item.tick == tick && item.sequence > sequence))
            .unwrap_or(self.scheduled.len());
        self.scheduled.insert(insert_at, envelope);
        sequence
    }

    pub fn commands_ready_at(&mut self, tick: u64) -> Vec<FaultCommandEnvelope> {
        let mut ready = Vec::new();
        while matches!(self.scheduled.front(), Some(front) if front.tick <= tick) {
            if let Some(front) = self.scheduled.pop_front() {
                ready.push(front);
            }
        }
        ready
    }

    pub fn record_command_applied(&mut self, tick: u64, command: FaultCommand) -> u64 {
        self.push_event(tick, BridgeEventKind::CommandApplied { command })
    }

    pub fn record_lifecycle(
        &mut self,
        tick: u64,
        actor: ActorIncarnation,
        state: LifecycleState,
        restart_count: u32,
    ) -> u64 {
        self.push_event(
            tick,
            BridgeEventKind::Lifecycle {
                actor,
                state,
                restart_count,
            },
        )
    }

    pub fn drain_events(&mut self) -> Vec<BridgeEvent> {
        self.events.drain(..).collect()
    }

    fn push_event(&mut self, tick: u64, kind: BridgeEventKind) -> u64 {
        let sequence = self.next_event_sequence;
        self.next_event_sequence += 1;
        self.events.push_back(BridgeEvent {
            tick,
            sequence,
            kind,
        });
        sequence
    }
}

#[cfg(test)]
mod tests {
    use super::{
        ActorId, ActorIncarnation, BridgeEventKind, DelayScope, DeterministicFaultBridge,
        FaultCommand, LifecycleState, PartitionScope,
    };

    #[test]
    fn schedules_faults_in_tick_then_sequence_order() {
        let mut bridge = DeterministicFaultBridge::new();
        bridge.schedule(
            5,
            FaultCommand::Kill {
                actor_id: ActorId::new("/user/a"),
            },
        );
        bridge.schedule(
            3,
            FaultCommand::Partition {
                actor_a: ActorId::new("/user/a"),
                actor_b: ActorId::new("/user/b"),
                scope: PartitionScope::Symmetric,
            },
        );
        bridge.schedule(
            5,
            FaultCommand::Delay {
                actor_id: ActorId::new("/user/c"),
                ticks: 4,
                scope: DelayScope::Delivery,
            },
        );

        let ready = bridge.commands_ready_at(5);
        assert_eq!(ready.len(), 3);
        assert!(matches!(ready[0].command, FaultCommand::Partition { .. }));
        assert!(matches!(ready[1].command, FaultCommand::Kill { .. }));
        assert!(matches!(ready[2].command, FaultCommand::Delay { .. }));
    }

    #[test]
    fn records_lifecycle_events_with_monotonic_event_sequence() {
        let mut bridge = DeterministicFaultBridge::new();
        let actor = ActorId::new("/user/task-coordinator").on_node("node-a");

        bridge.record_lifecycle(
            10,
            ActorIncarnation::new(actor.clone(), 0),
            LifecycleState::Crashed,
            0,
        );
        bridge.record_lifecycle(
            14,
            ActorIncarnation::new(actor, 1),
            LifecycleState::Restarted,
            1,
        );

        let events = bridge.drain_events();
        assert_eq!(events.len(), 2);
        assert_eq!(events[0].sequence, 0);
        assert_eq!(events[1].sequence, 1);

        match &events[1].kind {
            BridgeEventKind::Lifecycle {
                actor,
                state,
                restart_count,
            } => {
                assert_eq!(actor.incarnation, 1);
                assert_eq!(*state, LifecycleState::Restarted);
                assert_eq!(*restart_count, 1);
            }
            other => panic!("unexpected event kind: {other:?}"),
        }
    }
}