meerkat-mob 0.5.2

Multi-agent orchestration runtime for Meerkat
Documentation
#![allow(clippy::expect_used, clippy::panic, clippy::unwrap_used)]

use crate::{FlowRunConfig, MobBuilder, MobDefinition, MobError, MobStorage, SpawnMemberSpec};

/// Pipeline definition with flows, topology, supervisor, and limits.
/// Used for tests that exercise FlowRunConfig derivation.
const PIPELINE_TOML: &str = r#"
[mob]
id = "pipeline"
orchestrator = "lead"

[profiles.lead]
model = "claude-opus-4-6"
skills = ["orchestrator"]
peer_description = "Orchestrator"
external_addressable = true

[profiles.lead.tools]
builtins = true
comms = true
mob = true
mob_tasks = true

[profiles.worker]
model = "claude-sonnet-4-5"
skills = ["worker"]
peer_description = "Worker"
external_addressable = false

[profiles.worker.tools]
builtins = true
shell = true
comms = true
mob_tasks = true

[wiring]
auto_wire_orchestrator = true

[skills.orchestrator]
source = "inline"
content = "Drive staged pipeline execution."

[skills.worker]
source = "inline"
content = "Execute your stage deterministically and emit handoff artifacts."

[flows.pipeline]
description = "pipeline flow"

[flows.pipeline.steps.start]
role = "lead"
message = "go"
dispatch_mode = "one_to_one"
depends_on_mode = "all"

[flows.pipeline.steps.branch_a]
role = "worker"
message = "a"
depends_on = ["start"]
branch = "choose"
condition = { op = "eq", path = "params.choice", value = "a" }

[flows.pipeline.steps.branch_b]
role = "worker"
message = "b"
depends_on = ["start"]
branch = "choose"
condition = { op = "eq", path = "params.choice", value = "b" }

[flows.pipeline.steps.join]
role = "lead"
message = "join"
depends_on = ["branch_a", "branch_b"]
depends_on_mode = "any"
collection_policy = { type = "quorum", n = 1 }
timeout_ms = 1000
expected_schema_ref = "schemas/join.json"

[topology]
mode = "strict"
rules = [{ from_role = "lead", to_role = "worker", allowed = true }]

[supervisor]
role = "lead"
escalation_threshold = 2

[limits]
max_flow_duration_ms = 30000
max_step_retries = 1
max_orphaned_turns = 8
"#;
use meerkat_core::comms::TrustedPeerSpec;
use meerkat_core::ops_lifecycle::{
    OperationKind, OperationPeerHandle, OperationProgressUpdate, OperationResult, OperationSpec,
    OperationStatus, OperationTerminalOutcome, OpsLifecycleRegistry,
};
use meerkat_core::types::SessionId;
use meerkat_runtime::RuntimeOpsLifecycleRegistry;

fn mob_spec(name: &str) -> OperationSpec {
    OperationSpec {
        id: meerkat_core::ops_lifecycle::OperationId::new(),
        kind: OperationKind::MobMemberChild,
        owner_session_id: SessionId::new(),
        display_name: name.into(),
        source_label: "phase1-mob".into(),
        child_session_id: Some(SessionId::new()),
        expect_peer_channel: true,
    }
}

fn background_spec(name: &str) -> OperationSpec {
    OperationSpec {
        id: meerkat_core::ops_lifecycle::OperationId::new(),
        kind: OperationKind::BackgroundToolOp,
        owner_session_id: SessionId::new(),
        display_name: name.into(),
        source_label: "phase1-tool".into(),
        child_session_id: None,
        expect_peer_channel: false,
    }
}

#[tokio::test]
#[ignore = "Phase 1 red-ok mob decomposition suite"]
async fn mob_decomposition_red_ok_builder_mode_guards_keep_create_and_resume_paths_distinct() {
    let definition = MobDefinition::from_toml(PIPELINE_TOML).expect("pipeline toml");

    let resume_on_create = match MobBuilder::new(definition.clone(), MobStorage::in_memory())
        .resume()
        .await
    {
        Ok(_) => panic!("create-mode builders should reject resume()"),
        Err(err) => err,
    };
    assert!(
        matches!(resume_on_create, MobError::Internal(message) if message.contains("requires MobBuilder::for_resume")),
        "builder mode boundary should keep create and resume paths distinct"
    );

    let create_on_resume = match MobBuilder::for_resume(MobStorage::in_memory())
        .create()
        .await
    {
        Ok(_) => panic!("resume-mode builders should reject create()"),
        Err(err) => err,
    };
    assert!(
        matches!(create_on_resume, MobError::Internal(message) if message.contains("cannot be used with for_resume")),
        "resume builders should not become a second create path"
    );
}

#[test]
#[ignore = "Phase 1 red-ok mob decomposition suite"]
fn mob_decomposition_red_ok_flow_run_config_carries_orchestrator_and_flow_truth() {
    let definition = MobDefinition::from_toml(PIPELINE_TOML).expect("pipeline toml");
    let flow_id = definition
        .flows
        .keys()
        .next()
        .cloned()
        .expect("pipeline definition should define at least one flow");
    let config = FlowRunConfig::from_definition(flow_id.clone(), &definition)
        .expect("flow config should be derivable from durable mob definition");

    assert_eq!(config.flow_id, flow_id);
    assert!(
        !config.flow_spec.steps.is_empty(),
        "durable flow truth should include concrete step ownership"
    );
    assert!(
        config.orchestrator_role.is_some(),
        "flow truth should preserve orchestrator ownership when present"
    );
    assert_eq!(config.topology, definition.topology);
    assert_eq!(config.supervisor, definition.supervisor);
    assert_eq!(config.limits, definition.limits);
}

#[tokio::test]
#[ignore = "Phase 1 red-ok shared lifecycle integration suite"]
async fn ops_registry_integration_red_ok_tracks_mob_member_peer_ready_and_completion() {
    let registry = RuntimeOpsLifecycleRegistry::new();
    let spec = mob_spec("member-alpha");
    let op_id = spec.id.clone();

    registry
        .register_operation(spec.clone())
        .expect("register operation");
    let watch = registry.register_watcher(&op_id).expect("register watcher");
    registry
        .provisioning_succeeded(&op_id)
        .expect("provision success");
    registry
        .report_progress(
            &op_id,
            OperationProgressUpdate {
                message: "member booted".into(),
                percent: Some(0.5),
            },
        )
        .expect("report progress");
    registry
        .peer_ready(
            &op_id,
            OperationPeerHandle {
                peer_name: "member-alpha".into(),
                trusted_peer: TrustedPeerSpec::new(
                    "member-alpha",
                    "member-alpha-id",
                    "inproc://member-alpha",
                )
                .expect("trusted peer"),
            },
        )
        .expect("peer ready");
    registry
        .complete_operation(
            &op_id,
            OperationResult {
                id: op_id.clone(),
                content: "member complete".into(),
                is_error: false,
                duration_ms: 42,
                tokens_used: 8,
            },
        )
        .expect("complete operation");

    assert_eq!(
        watch.wait().await,
        OperationTerminalOutcome::Completed(OperationResult {
            id: op_id.clone(),
            content: "member complete".into(),
            is_error: false,
            duration_ms: 42,
            tokens_used: 8,
        })
    );

    let snapshot = registry.snapshot(&op_id).expect("snapshot");
    assert_eq!(snapshot.status, OperationStatus::Completed);
    assert!(snapshot.peer_ready);
    assert_eq!(snapshot.child_session_id, spec.child_session_id);
}

#[tokio::test]
#[ignore = "Phase 1 red-ok shared lifecycle integration suite"]
async fn ops_registry_integration_red_ok_background_ops_retire_without_peer_handoff() {
    let registry = RuntimeOpsLifecycleRegistry::new();
    let spec = background_spec("shell-job");
    let op_id = spec.id.clone();

    registry
        .register_operation(spec)
        .expect("register background op");
    let watch = registry.register_watcher(&op_id).expect("register watcher");
    registry
        .provisioning_succeeded(&op_id)
        .expect("provision success");
    registry
        .report_progress(
            &op_id,
            OperationProgressUpdate {
                message: "job draining".into(),
                percent: Some(0.9),
            },
        )
        .expect("report progress");
    registry.request_retire(&op_id).expect("request retire");
    registry.mark_retired(&op_id).expect("mark retired");

    assert_eq!(watch.wait().await, OperationTerminalOutcome::Retired);
    assert_eq!(
        registry.snapshot(&op_id).expect("snapshot").status,
        OperationStatus::Retired
    );
}

#[test]
#[ignore = "Phase 1 red-ok shared lifecycle integration suite"]
fn ops_registry_integration_red_ok_existing_session_attach_reuses_spawn_control_plane() {
    let session_id = SessionId::new();
    let spec = SpawnMemberSpec::new("orchestrator", "member-alpha")
        .with_resume_session_id(session_id.clone());

    assert_eq!(spec.resume_session_id(), Some(&session_id));
    assert_eq!(spec.role_name.as_str(), "orchestrator");
}