marshal-daemon 0.9.0

Coordination daemon for the marshal multi-agent service. Owns the live roster and routes messages between Claude Code sessions.
Documentation
//! Integration tests for the `SendMessage` server command (pull model).
//!
//! Spins up a real `CellServer` (no WS listener), pre-populates the
//! Session store, then invokes `SendMessage::execute` directly through a
//! synthetic `CommandContext`. Under the pull model the handler should:
//!   - resolve the sender from the WS `client_id` OR an explicit
//!     `as_session` (self-identify, for connectionless HTTP-MCP callers),
//!   - reject an unknown recipient and an unresolvable sender with a clear
//!     `CommandError` (these are the only hard failures),
//!   - **persist the `Message` regardless of whether the recipient is
//!     online** — delivery is decoupled from acceptance; an offline
//!     recipient pulls it on its next hook turn,
//!   - report `delivered_live` = whether a best-effort live push to a
//!     connected recipient landed (always `false` in these fixtures, which
//!     have no live `client_registry`).

use std::sync::Arc;

use hyphae::Gettable;
use marshal_entities::{Message, SendMessage, Session, SessionId};
use myko::{
    command::{CommandContext, CommandHandler},
    core::item::Eventable,
    entities::client::ClientId,
    request::RequestContext,
    server::{CellServerCtx, Persister},
    wire::{MEvent, MEventType},
};
use myko_server::{BlackholePersister, CellServer};
use uuid::Uuid;

fn setup() -> CellServerCtx {
    marshal_entities::link();
    daemon::link();

    let blackhole: Arc<dyn Persister> = Arc::new(BlackholePersister);
    let server = CellServer::builder()
        .with_default_persister(blackhole)
        .build();
    let ctx = server.ctx();
    let server: &'static CellServer = Box::leak(Box::new(server));
    let _ = server;
    ctx
}

fn session(id: &str, nickname: &str, client_id: Option<&str>) -> Session {
    Session {
        id: SessionId(Arc::from(id)),
        client_id: client_id.map(|c| ClientId(Arc::from(c))),
        nickname: nickname.into(),
        pid: 0,
        cwd: "/repo".into(),
        git_branch: None,
        current_task: None,
        connected_at: 100,
        last_activity_at: None,
        last_tool: None,
        last_tool_at: None,
        operator: None,
        host: None,
        project: None,
    }
}

fn set_session(ctx: &CellServerCtx, s: &Session) {
    let event = MEvent::from_item(s, MEventType::SET, &Uuid::new_v4().to_string());
    ctx.apply_event_batch(vec![event])
        .expect("apply Session SET");
}

/// Build a CommandContext as if the WS server had just dispatched a
/// command from the given client connection (`None` = no connection
/// identity, the HTTP-MCP / hook path).
fn cmd_ctx(ctx: &CellServerCtx, caller_client_id: Option<&str>) -> CommandContext {
    let req = RequestContext::new(
        Arc::<str>::from(Uuid::new_v4().to_string().as_str()),
        caller_client_id.map(Arc::<str>::from),
        vec![Arc::<str>::from("test")],
        Uuid::new_v4(),
        chrono::Utc::now().to_rfc3339(),
    );
    CommandContext::new(
        Arc::<str>::from("SendMessage"),
        Arc::new(req),
        Arc::new(ctx.clone()),
    )
}

fn message_count(ctx: &CellServerCtx) -> usize {
    ctx.registry
        .get(Message::ENTITY_NAME_STATIC)
        .map(|store| store.entries().get().len())
        .unwrap_or(0)
}

fn only_message(ctx: &CellServerCtx) -> Message {
    let store = ctx
        .registry
        .get(Message::ENTITY_NAME_STATIC)
        .expect("Message store exists");
    let entries = store.entries().get();
    assert_eq!(entries.len(), 1, "expected exactly one Message");
    myko::utils::downcast_item::<Message>(&entries.into_iter().next().unwrap().1)
        .expect("entry is a Message")
}

#[test]
fn offline_recipient_succeeds_and_persists_for_pull() {
    // Pull model: an offline recipient (no live client) is NOT an error.
    // The message persists so the recipient pulls it on its next hook turn.
    let ctx = setup();
    set_session(&ctx, &session("sender", "sender", Some("c-sender")));
    set_session(&ctx, &session("recipient", "recipient", None));

    let cmd = SendMessage {
        to_session_id: SessionId(Arc::from("recipient")),
        body: "hi".into(),
        as_session: None,
    };
    let result = cmd
        .execute(cmd_ctx(&ctx, Some("c-sender")))
        .expect("offline recipient is success under the pull model");

    assert!(
        !result.delivered_live,
        "no live client → not delivered live"
    );
    assert_eq!(message_count(&ctx), 1, "message must persist for pull");
    assert_eq!(only_message(&ctx).body, "hi");
}

#[test]
fn stale_binding_succeeds_and_persists() {
    // A recipient whose `client_id` points at a connection that is no
    // longer live (post-bounce). The best-effort push finds no live client
    // and reports `delivered_live = false`, but the message still persists.
    let ctx = setup();
    set_session(&ctx, &session("sender", "sender", Some("c-sender")));
    set_session(&ctx, &session("recipient", "recipient", Some("c-stale")));

    let cmd = SendMessage {
        to_session_id: SessionId(Arc::from("recipient")),
        body: "hi".into(),
        as_session: None,
    };
    let result = cmd
        .execute(cmd_ctx(&ctx, Some("c-sender")))
        .expect("stale binding is not a hard failure under the pull model");

    assert!(!result.delivered_live);
    assert_eq!(message_count(&ctx), 1);
}

#[test]
fn self_identified_sender_via_as_session_succeeds() {
    // The HTTP-MCP path: no connection `client_id`; the caller names itself
    // via `as_session`. Sender resolves from that, message persists, and
    // it's attributed to the self-identified session.
    let ctx = setup();
    set_session(&ctx, &session("sender", "sender", None));
    set_session(&ctx, &session("recipient", "recipient", None));

    let cmd = SendMessage {
        to_session_id: SessionId(Arc::from("recipient")),
        body: "from http".into(),
        as_session: Some(SessionId(Arc::from("sender"))),
    };
    let result = cmd
        .execute(cmd_ctx(&ctx, None))
        .expect("self-identified send succeeds");

    assert!(!result.delivered_live);
    let msg = only_message(&ctx);
    assert_eq!(msg.from_session_id, SessionId(Arc::from("sender")));
    assert_eq!(msg.body, "from http");
}

#[test]
fn unknown_recipient_errors_and_does_not_persist() {
    let ctx = setup();
    set_session(&ctx, &session("sender", "sender", Some("c-sender")));

    let cmd = SendMessage {
        to_session_id: SessionId(Arc::from("does-not-exist")),
        body: "?".into(),
        as_session: None,
    };
    let err = cmd
        .execute(cmd_ctx(&ctx, Some("c-sender")))
        .expect_err("missing recipient should error");

    assert!(
        err.message.contains("does-not-exist"),
        "error should name the missing session id, got: {}",
        err.message,
    );
    assert_eq!(message_count(&ctx), 0, "no Message should be persisted");
}

#[test]
fn caller_without_session_errors() {
    // Caller's client_id maps to no session and no `as_session` was given.
    let ctx = setup();
    set_session(
        &ctx,
        &session("recipient", "recipient", Some("c-recipient")),
    );

    let cmd = SendMessage {
        to_session_id: SessionId(Arc::from("recipient")),
        body: "?".into(),
        as_session: None,
    };
    let err = cmd
        .execute(cmd_ctx(&ctx, Some("c-orphan")))
        .expect_err("orphan caller should error");

    assert!(
        err.message.contains("c-orphan"),
        "error should name the unbound client id, got: {}",
        err.message,
    );
    assert_eq!(message_count(&ctx), 0);
}

#[test]
fn unidentified_caller_errors() {
    // No client_id AND no as_session → cannot resolve a sender at all.
    let ctx = setup();
    set_session(&ctx, &session("recipient", "recipient", None));

    let cmd = SendMessage {
        to_session_id: SessionId(Arc::from("recipient")),
        body: "?".into(),
        as_session: None,
    };
    let err = cmd
        .execute(cmd_ctx(&ctx, None))
        .expect_err("no identity should error");

    assert!(
        err.message.to_lowercase().contains("assession")
            || err.message.to_lowercase().contains("connected client"),
        "error should explain the missing identity, got: {}",
        err.message,
    );
    assert_eq!(message_count(&ctx), 0);
}