meerkat-mobkit 0.6.52

Companion orchestration platform for the Meerkat multi-agent runtime
Documentation
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SendState {
    Requested,
    AcceptedPersisted,
    Dispatching,
    Delivered,
    DeliveryFailed,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SendTransition {
    PersistAccepted,
    StartDispatch,
    MarkDelivered,
    MarkDeliveryFailed,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SendEffect {
    AppendFrame,
    UpdateFrameStatus,
    EmitFrame,
    DispatchSend,
}

impl SendState {
    pub fn apply(
        self,
        transition: SendTransition,
    ) -> Result<(Self, Vec<SendEffect>), &'static str> {
        match (self, transition) {
            (Self::Requested, SendTransition::PersistAccepted) => Ok((
                Self::AcceptedPersisted,
                vec![SendEffect::AppendFrame, SendEffect::EmitFrame],
            )),
            (Self::AcceptedPersisted, SendTransition::StartDispatch) => Ok((
                Self::Dispatching,
                vec![SendEffect::UpdateFrameStatus, SendEffect::DispatchSend],
            )),
            (Self::Dispatching, SendTransition::MarkDelivered) => Ok((
                Self::Delivered,
                vec![SendEffect::UpdateFrameStatus, SendEffect::EmitFrame],
            )),
            (Self::Dispatching, SendTransition::MarkDeliveryFailed) => Ok((
                Self::DeliveryFailed,
                vec![SendEffect::UpdateFrameStatus, SendEffect::EmitFrame],
            )),
            _ => Err("illegal send state transition"),
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceIngestionState {
    Registered,
    Backfilling,
    CaughtUp,
    Live,
    GapDetected,
    ReplayUnavailable,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceIngestionTransition {
    StartBackfill,
    BackfillComplete,
    StartLive,
    DetectGap,
    MarkReplayUnavailable,
    Recover,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SourceIngestionEffect {
    QuerySourceHistory,
    AppendFrame,
    SubscribeLive,
    MarkReplayUnavailable,
}

impl SourceIngestionState {
    pub fn apply(
        self,
        transition: SourceIngestionTransition,
    ) -> Result<(Self, Vec<SourceIngestionEffect>), &'static str> {
        match (self, transition) {
            (Self::Registered, SourceIngestionTransition::StartBackfill) => Ok((
                Self::Backfilling,
                vec![SourceIngestionEffect::QuerySourceHistory],
            )),
            (Self::Backfilling, SourceIngestionTransition::BackfillComplete) => {
                Ok((Self::CaughtUp, vec![SourceIngestionEffect::AppendFrame]))
            }
            (Self::CaughtUp, SourceIngestionTransition::StartLive) => {
                Ok((Self::Live, vec![SourceIngestionEffect::SubscribeLive]))
            }
            (Self::Live, SourceIngestionTransition::DetectGap) => {
                Ok((Self::GapDetected, Vec::new()))
            }
            (Self::GapDetected, SourceIngestionTransition::Recover) => Ok((
                Self::Backfilling,
                vec![SourceIngestionEffect::QuerySourceHistory],
            )),
            (Self::GapDetected, SourceIngestionTransition::MarkReplayUnavailable) => Ok((
                Self::ReplayUnavailable,
                vec![SourceIngestionEffect::MarkReplayUnavailable],
            )),
            (Self::ReplayUnavailable, SourceIngestionTransition::Recover) => Ok((
                Self::Backfilling,
                vec![SourceIngestionEffect::QuerySourceHistory],
            )),
            _ => Err("illegal source ingestion state transition"),
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplaySubscriptionState {
    Snapshotting,
    Live,
    Lagged,
    Closed,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplaySubscriptionTransition {
    SnapshotComplete,
    MarkLagged,
    RecoverLive,
    Close,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplaySubscriptionEffect {
    EmitSnapshotStarted,
    EmitReplayFrames,
    EmitSnapshotComplete,
    EmitLagged,
    CloseStream,
}

impl ReplaySubscriptionState {
    pub fn apply(
        self,
        transition: ReplaySubscriptionTransition,
    ) -> Result<(Self, Vec<ReplaySubscriptionEffect>), &'static str> {
        match (self, transition) {
            (Self::Snapshotting, ReplaySubscriptionTransition::SnapshotComplete) => Ok((
                Self::Live,
                vec![
                    ReplaySubscriptionEffect::EmitSnapshotStarted,
                    ReplaySubscriptionEffect::EmitReplayFrames,
                    ReplaySubscriptionEffect::EmitSnapshotComplete,
                ],
            )),
            (Self::Live, ReplaySubscriptionTransition::MarkLagged) => {
                Ok((Self::Lagged, vec![ReplaySubscriptionEffect::EmitLagged]))
            }
            (Self::Lagged, ReplaySubscriptionTransition::RecoverLive) => {
                Ok((Self::Live, Vec::new()))
            }
            (_, ReplaySubscriptionTransition::Close) if self != Self::Closed => {
                Ok((Self::Closed, vec![ReplaySubscriptionEffect::CloseStream]))
            }
            _ => Err("illegal replay subscription state transition"),
        }
    }
}

#[cfg(test)]
#[allow(clippy::expect_used)]
mod tests {
    use super::*;

    #[test]
    fn send_reducer_allows_happy_path() {
        let (state, effects) = SendState::Requested
            .apply(SendTransition::PersistAccepted)
            .expect("requested -> accepted should be legal");
        assert_eq!(state, SendState::AcceptedPersisted);
        assert!(effects.contains(&SendEffect::AppendFrame));

        let (state, effects) = state
            .apply(SendTransition::StartDispatch)
            .expect("accepted -> dispatching should be legal");
        assert_eq!(state, SendState::Dispatching);
        assert!(effects.contains(&SendEffect::DispatchSend));

        let (state, effects) = state
            .apply(SendTransition::MarkDelivered)
            .expect("dispatching -> delivered should be legal");
        assert_eq!(state, SendState::Delivered);
        assert!(effects.contains(&SendEffect::UpdateFrameStatus));
    }

    #[test]
    fn send_reducer_rejects_terminal_to_dispatch() {
        assert_eq!(
            SendState::Delivered.apply(SendTransition::StartDispatch),
            Err("illegal send state transition")
        );
    }

    #[test]
    fn source_ingestion_reducer_models_backfill_to_live() {
        let (state, _) = SourceIngestionState::Registered
            .apply(SourceIngestionTransition::StartBackfill)
            .expect("registered -> backfilling");
        let (state, _) = state
            .apply(SourceIngestionTransition::BackfillComplete)
            .expect("backfilling -> caught up");
        let (state, effects) = state
            .apply(SourceIngestionTransition::StartLive)
            .expect("caught up -> live");
        assert_eq!(state, SourceIngestionState::Live);
        assert!(effects.contains(&SourceIngestionEffect::SubscribeLive));
    }

    #[test]
    fn replay_subscription_reducer_emits_snapshot_boundary() {
        let (state, effects) = ReplaySubscriptionState::Snapshotting
            .apply(ReplaySubscriptionTransition::SnapshotComplete)
            .expect("snapshotting -> live");
        assert_eq!(state, ReplaySubscriptionState::Live);
        assert_eq!(
            effects,
            vec![
                ReplaySubscriptionEffect::EmitSnapshotStarted,
                ReplaySubscriptionEffect::EmitReplayFrames,
                ReplaySubscriptionEffect::EmitSnapshotComplete,
            ]
        );
    }
}