agentmux 0.2.0

Multi-agent coordination runtime with inter-agent messaging across CLI, MCP, tmux, and ACP.
Documentation
use std::{
    thread,
    time::{Duration, Instant},
};

use serde_json::{Value, json};
use time::format_description::well_known::Rfc3339;

use crate::runtime::signals::shutdown_requested;

use super::super::stream::{RelayStreamEvent, StreamEventSendOutcome, send_event_to_registered_ui};
use super::super::{AsyncDeliveryTask, ChatOutcome, ChatResult};

const DROPPED_ON_SHUTDOWN_REASON: &str = "relay shutdown requested before delivery";
const DROPPED_ON_SHUTDOWN_REASON_CODE: &str = "dropped_on_shutdown";
const UI_RECONNECT_POLL_INTERVAL_MS: u64 = 100;

pub(super) fn deliver_one_target_ui(
    task: &AsyncDeliveryTask,
    sender_session: &str,
    cc_sessions: &[String],
    target_session: String,
    message_id: String,
    message: &str,
) -> ChatResult {
    let bundle_name = task.bundle.bundle_name.as_str();
    let timeout = task.quiescence.quiescence_timeout;
    let start = Instant::now();
    loop {
        if shutdown_requested() {
            let _ = emit_delivery_outcome_event(
                bundle_name,
                target_session.as_str(),
                message_id.as_str(),
                "failed",
                Some("failed"),
                Some(DROPPED_ON_SHUTDOWN_REASON_CODE),
                Some(DROPPED_ON_SHUTDOWN_REASON),
            );
            return ChatResult {
                target_session,
                message_id,
                outcome: ChatOutcome::DroppedOnShutdown,
                reason_code: Some(DROPPED_ON_SHUTDOWN_REASON_CODE.to_string()),
                reason: Some(DROPPED_ON_SHUTDOWN_REASON.to_string()),
                details: None,
            };
        }

        let incoming_event = RelayStreamEvent {
            event_type: "incoming_message".to_string(),
            bundle_name: bundle_name.to_string(),
            target_session: target_session.clone(),
            created_at: timestamp_rfc3339(),
            payload: json!({
                "message_id": message_id.clone(),
                "sender_session": sender_session,
                "body": message,
                "cc_sessions": if cc_sessions.is_empty() {
                    Value::Null
                } else {
                    json!(cc_sessions)
                },
            }),
        };
        let routed_outcome = emit_delivery_outcome_event(
            bundle_name,
            target_session.as_str(),
            message_id.as_str(),
            "routed",
            None,
            None,
            None,
        );
        match routed_outcome {
            Ok(StreamEventSendOutcome::Delivered) => {}
            Ok(StreamEventSendOutcome::NoUiEndpoint) | Ok(StreamEventSendOutcome::Disconnected) => {
                if timeout.is_some_and(|value| start.elapsed() >= value) {
                    return ChatResult {
                        target_session,
                        message_id,
                        outcome: ChatOutcome::Timeout,
                        reason_code: None,
                        reason: Some(format!(
                            "ui relay stream was disconnected for {}ms",
                            start.elapsed().as_millis()
                        )),
                        details: None,
                    };
                }
                thread::sleep(Duration::from_millis(UI_RECONNECT_POLL_INTERVAL_MS));
                continue;
            }
            Err(source) => {
                return ChatResult {
                    target_session,
                    message_id,
                    outcome: ChatOutcome::Failed,
                    reason_code: None,
                    reason: Some(format!("failed to emit relay stream event: {}", source)),
                    details: None,
                };
            }
        }
        match send_event_to_registered_ui(bundle_name, target_session.as_str(), &incoming_event) {
            Ok(StreamEventSendOutcome::Delivered) => {
                let _ = emit_delivery_outcome_event(
                    bundle_name,
                    target_session.as_str(),
                    message_id.as_str(),
                    "delivered",
                    Some("success"),
                    None,
                    None,
                );
                return ChatResult {
                    target_session,
                    message_id,
                    outcome: ChatOutcome::Delivered,
                    reason_code: None,
                    reason: None,
                    details: None,
                };
            }
            Ok(StreamEventSendOutcome::NoUiEndpoint) | Ok(StreamEventSendOutcome::Disconnected) => {
            }
            Err(source) => {
                return ChatResult {
                    target_session,
                    message_id,
                    outcome: ChatOutcome::Failed,
                    reason_code: None,
                    reason: Some(format!("failed to emit relay stream event: {}", source)),
                    details: None,
                };
            }
        }
        if timeout.is_some_and(|value| start.elapsed() >= value) {
            return ChatResult {
                target_session,
                message_id,
                outcome: ChatOutcome::Timeout,
                reason_code: None,
                reason: Some(format!(
                    "ui relay stream was disconnected for {}ms",
                    start.elapsed().as_millis()
                )),
                details: None,
            };
        }
        thread::sleep(Duration::from_millis(UI_RECONNECT_POLL_INTERVAL_MS));
    }
}

fn emit_delivery_outcome_event(
    bundle_name: &str,
    target_session: &str,
    message_id: &str,
    phase: &str,
    outcome: Option<&str>,
    reason_code: Option<&str>,
    reason: Option<&str>,
) -> Result<StreamEventSendOutcome, std::io::Error> {
    let mut payload = serde_json::Map::new();
    payload.insert(
        "message_id".to_string(),
        Value::String(message_id.to_string()),
    );
    payload.insert("phase".to_string(), Value::String(phase.to_string()));
    payload.insert(
        "outcome".to_string(),
        outcome
            .map(|value| Value::String(value.to_string()))
            .unwrap_or(Value::Null),
    );
    if let Some(value) = reason_code {
        payload.insert("reason_code".to_string(), Value::String(value.to_string()));
    }
    if let Some(value) = reason {
        payload.insert("reason".to_string(), Value::String(value.to_string()));
    }
    let event = RelayStreamEvent {
        event_type: "delivery_outcome".to_string(),
        bundle_name: bundle_name.to_string(),
        target_session: target_session.to_string(),
        created_at: timestamp_rfc3339(),
        payload: Value::Object(payload),
    };
    send_event_to_registered_ui(bundle_name, target_session, &event)
}

fn timestamp_rfc3339() -> String {
    time::OffsetDateTime::now_utc()
        .format(&Rfc3339)
        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
}