meerkat-runtime 0.7.4

v9 runtime control-plane for Meerkat agent lifecycle
Documentation
//! MobRuntimeAdapter — bridges mob provisioning to v9 RuntimeDriver lifecycle.
//!
//! When a mob member is spawned, the adapter registers a RuntimeDriver for that
//! session. When retired, the adapter retires/unregisters the driver. Flow steps
//! are delivered as FlowStepInput through accept_input().
//!
//! This adapter is optional — mob works without it (existing SessionService path).
//! When present, it enables v9 input lifecycle tracking for mob members.

use meerkat_core::lifecycle::InputId;
use meerkat_core::types::ContentInput;
use meerkat_core::types::SessionId;

use crate::MeerkatMachine;
use crate::input::{
    FlowStepInput, Input, InputDurability, InputHeader, InputOrigin, InputVisibility,
};
#[allow(unused_imports)]
use crate::service_ext::SessionServiceRuntimeExt as _;
use crate::traits::{RuntimeControlPlaneError, RuntimeDriverError};

/// Create a FlowStepInput for a mob flow step.
pub fn create_flow_step_input(
    step_id: &str,
    instructions: ContentInput,
    flow_id: &str,
    step_index: usize,
    turn_metadata: Option<meerkat_core::lifecycle::run_primitive::RuntimeTurnMetadata>,
) -> Input {
    Input::FlowStep(FlowStepInput {
        header: InputHeader {
            id: InputId::new(),
            timestamp: chrono::Utc::now(),
            source: InputOrigin::Flow {
                flow_id: flow_id.into(),
                step_index,
            },
            durability: InputDurability::Durable,
            visibility: InputVisibility::default(),
            idempotency_key: None,
            supersession_key: None,
            correlation_id: None,
        },
        step_id: step_id.into(),
        content: instructions,
        turn_metadata,
    })
}

/// Register a mob member's session with the runtime adapter.
///
/// Registration is a control-plane prerequisite. A failed register is propagated
/// as a typed error rather than swallowed so the mob provisioning caller can abort
/// instead of proceeding as if the member's runtime exists.
pub async fn register_mob_member(
    adapter: &MeerkatMachine,
    session_id: SessionId,
) -> Result<(), RuntimeControlPlaneError> {
    adapter.register_session(session_id).await
}

/// Unregister a mob member's session from the runtime adapter.
pub async fn unregister_mob_member(adapter: &MeerkatMachine, session_id: &SessionId) {
    adapter.unregister_session(session_id).await;
}

/// Deliver a flow step to a mob member through the runtime path.
pub async fn deliver_flow_step(
    adapter: &MeerkatMachine,
    session_id: &SessionId,
    step_id: &str,
    instructions: impl Into<ContentInput>,
    flow_id: &str,
    step_index: usize,
) -> Result<crate::AcceptOutcome, RuntimeDriverError> {
    let input = create_flow_step_input(step_id, instructions.into(), flow_id, step_index, None);
    adapter.accept_input(session_id, input).await
}

/// Retire a mob member's runtime.
///
/// If the session is attached to a live `RuntimeLoop`, queued inputs remain
/// pending for drain. For plain registered sessions without a loop, retirement
/// abandons queued work because nothing can execute the drain path.
pub async fn retire_mob_member(
    adapter: &MeerkatMachine,
    session_id: &SessionId,
) -> Result<crate::traits::RetireReport, RuntimeDriverError> {
    adapter.retire_runtime(session_id).await
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;
    use crate::policy_table::DefaultPolicyTable;
    use std::sync::Arc;

    #[tokio::test]
    async fn spawn_creates_runtime_driver_session() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();

        register_mob_member(&adapter, sid.clone()).await.unwrap();

        // Session should have a runtime driver
        let state = adapter.runtime_state(&sid).await.unwrap();
        assert_eq!(state, crate::RuntimeState::Idle);
    }

    #[tokio::test]
    async fn flow_step_delivered_as_input() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();
        register_mob_member(&adapter, sid.clone()).await.unwrap();

        let outcome = deliver_flow_step(&adapter, &sid, "step-1", "analyze the data", "flow-1", 0)
            .await
            .unwrap();

        assert!(outcome.is_accepted());

        // Verify policy: flow_step → StageRunStart + WakeIfIdle
        let input = create_flow_step_input("s", "i".into(), "f", 0, None);
        let policy = DefaultPolicyTable::resolve(&input, true);
        assert_eq!(policy.apply_mode, crate::ApplyMode::StageRunStart);
        assert_eq!(policy.wake_mode, crate::WakeMode::WakeIfIdle);
    }

    #[tokio::test]
    async fn retire_without_runtime_loop_abandons_pending_inputs() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();
        register_mob_member(&adapter, sid.clone()).await.unwrap();

        // Accept an input first
        deliver_flow_step(&adapter, &sid, "s1", "do it", "f1", 0)
            .await
            .unwrap();

        // No RuntimeLoop is attached for plain registration, so retirement
        // abandons queued work instead of leaving it pending forever.
        let report = retire_mob_member(&adapter, &sid).await.unwrap();
        assert_eq!(report.inputs_abandoned, 1);
        assert_eq!(report.inputs_pending_drain, 0);
    }

    #[tokio::test]
    async fn create_flow_step_input_preserves_multimodal_blocks() -> Result<(), String> {
        let input = create_flow_step_input(
            "s",
            ContentInput::Blocks(vec![
                meerkat_core::types::ContentBlock::Text {
                    text: "inspect image".into(),
                },
                meerkat_core::types::ContentBlock::Image {
                    media_type: "image/png".into(),
                    data: "abc123".into(),
                },
            ]),
            "f",
            0,
            None,
        );

        let flow_step = match input {
            Input::FlowStep(flow_step) => flow_step,
            other => return Err(format!("expected flow step input, got {other:?}")),
        };
        assert_eq!(
            flow_step.content.text_content(),
            "inspect image\n[image: image/png]"
        );
        assert!(matches!(
            &flow_step.content,
            meerkat_core::types::ContentInput::Blocks(blocks) if blocks.len() == 2
        ));
        Ok(())
    }

    #[tokio::test]
    async fn unregister_removes_driver() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();
        register_mob_member(&adapter, sid.clone()).await.unwrap();

        unregister_mob_member(&adapter, &sid).await;

        // Should fail now
        let result = adapter.runtime_state(&sid).await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn register_exposes_driver_state() {
        // Mob-member registration is owned by the runtime control plane:
        // the registered session must have a live driver entry that surfaces
        // through typed runtime-state queries.
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();
        register_mob_member(&adapter, sid.clone()).await.unwrap();

        let active = adapter.list_active_inputs(&sid).await.unwrap();
        assert!(active.is_empty()); // No inputs yet
    }

    /// Gate (#99/#277): a control-plane registration that fails (the session was
    /// destroyed, so the RegisterSession command returns `Destroyed`) must surface
    /// a typed `Err` from `register_session` rather than being laundered to success.
    /// Pre-fix the helper dropped the result with `let _ = ...` and returned `()`,
    /// so the failure was invisible to callers.
    #[tokio::test]
    async fn register_session_surfaces_failure_on_destroyed_session() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();

        // Establish the session, then destroy it so re-registration must fail.
        adapter.register_session(sid.clone()).await.unwrap();
        let runtime_id = MeerkatMachine::logical_runtime_id(&sid);
        crate::traits::RuntimeControlPlane::destroy(&*adapter, &runtime_id)
            .await
            .unwrap();

        let result = adapter.register_session(sid.clone()).await;
        assert!(
            matches!(result, Err(RuntimeControlPlaneError::Internal(_))),
            "register_session on a destroyed session must surface a typed control-plane error, got {result:?}"
        );
    }

    /// Gate (#99): `register_mob_member` must propagate the typed registration
    /// failure rather than swallow it — a mob member whose runtime cannot be
    /// registered must not be reported as provisioned.
    #[tokio::test]
    async fn register_mob_member_propagates_failure_on_destroyed_session() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();

        register_mob_member(&adapter, sid.clone()).await.unwrap();
        let runtime_id = MeerkatMachine::logical_runtime_id(&sid);
        crate::traits::RuntimeControlPlane::destroy(&*adapter, &runtime_id)
            .await
            .unwrap();

        let result = register_mob_member(&adapter, sid.clone()).await;
        assert!(
            result.is_err(),
            "register_mob_member must propagate the typed registration failure, got {result:?}"
        );
    }

    /// Gate (#41): `set_session_silent_intents` must surface the inner command's
    /// typed `Err` to the caller instead of dropping it with `let _ = ...`. A
    /// destroyed session makes the SetSilentIntents command return `Destroyed`.
    #[tokio::test]
    async fn set_session_silent_intents_surfaces_failure_on_destroyed_session() {
        let adapter = Arc::new(MeerkatMachine::ephemeral());
        let sid = SessionId::new();

        adapter.register_session(sid.clone()).await.unwrap();
        let runtime_id = MeerkatMachine::logical_runtime_id(&sid);
        crate::traits::RuntimeControlPlane::destroy(&*adapter, &runtime_id)
            .await
            .unwrap();

        let result = adapter
            .set_session_silent_intents(&sid, vec!["status".to_string()])
            .await;
        assert!(
            matches!(result, Err(RuntimeDriverError::Destroyed)),
            "set_session_silent_intents must surface the inner command failure, got {result:?}"
        );
    }
}