capo-agent 0.9.0

Coding-agent library built on motosan-agent-loop. Composable, embeddable.
Documentation
#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]

//! Spawn-per-event dispatcher.
//!
//! The contract per spawn (mirrors §4 of the spec):
//!   1. spawn(command, args, env), stdin/stdout/stderr = piped
//!   2. write ONE JSON line + '\n' to stdin, close stdin
//!   3. read up to ONE line from stdout, with timeout_ms deadline
//!   4. parse Action, surface errors as `Continue`

use std::time::Duration;

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tracing::warn;

use crate::extensions::{
    registry::RegisteredExtension,
    wire::{Action, Event, EventName},
    ExtensionRegistry,
};

/// Result of dispatching a hook event through the extension chain.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HookOutcome {
    Continue,
    Cancelled {
        extension_name: String,
        reason: Option<String>,
    },
}

/// Result of dispatching `before_user_message` through the extension chain.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BeforeUserMessageOutcome {
    /// All extensions returned `continue` (or `transform_text`). The
    /// `text` field is the final-transformed text (or the original if
    /// no extension transformed). `attachments` is unchanged from input
    /// — extensions don't modify attachments in `before_user_message`.
    Proceed {
        text: String,
        attachments: Vec<crate::user_message::Attachment>,
    },
    /// An extension returned `cancel`. The turn does not start.
    Cancelled {
        extension_name: String,
        reason: Option<String>,
    },
}

/// Result of dispatching a `command` event to its owner extension.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CommandOutcome {
    /// Owner returned `continue` or any wrong-event action; treat as
    /// no-op. The slash-command invocation produced nothing.
    NoOp,
    /// Owner returned `reply` — TUI emits a Notice with `text`.
    Reply { text: String },
    /// Owner returned `send` — TUI synthesizes a
    /// `Command::SendUserMessage` with `text` + `attachments`.
    Send {
        text: String,
        attachments: Vec<crate::user_message::Attachment>,
    },
    /// Owner returned `cancel` — TUI emits a Notice with the reason.
    /// (Cancel from a command event is unusual but spec-permitted —
    /// degrades to a user-visible message.)
    Cancelled {
        extension_name: String,
        reason: Option<String>,
    },
    /// No extension owns this command name.
    Unknown,
}

/// Spawn one extension, send the event, await the action.
///
/// Returns `Action::Continue` on any error path — spawn failure, timeout,
/// non-zero exit, garbage stdout, unknown action. All such cases log at
/// `warn` level via `tracing`; capo never crashes for an extension fault.
pub(crate) async fn spawn_one(extension: &RegisteredExtension, event: &Event) -> Action {
    let entry = &extension.entry;
    let name = entry.name.as_str();
    let timeout_ms = extension.effective_timeout_ms;
    let event_json = match serde_json::to_string(event) {
        Ok(s) => s,
        Err(err) => {
            warn!("[ext:{name}] failed to serialize event: {err}");
            return Action::Continue;
        }
    };

    let mut cmd = Command::new(&entry.command);
    cmd.args(&entry.args)
        .stdin(std::process::Stdio::piped())
        .stdout(std::process::Stdio::piped())
        .stderr(std::process::Stdio::piped())
        .kill_on_drop(true);
    for (key, value) in &entry.env {
        if std::env::var_os(key).is_none() {
            cmd.env(key, value);
        }
    }

    let mut child = match cmd.spawn() {
        Ok(c) => c,
        Err(err) => {
            warn!("[ext:{name}] spawn failed: {err}");
            return Action::Continue;
        }
    };

    if let Some(mut stdin) = child.stdin.take() {
        let line = format!("{event_json}\n");
        if let Err(err) = stdin.write_all(line.as_bytes()).await {
            warn!("[ext:{name}] writing stdin failed: {err}");
        }
        drop(stdin);
    }

    let stdout = match child.stdout.take() {
        Some(out) => out,
        None => {
            warn!("[ext:{name}] no stdout pipe");
            return Action::Continue;
        }
    };

    let deadline = Duration::from_millis(timeout_ms);
    let read_future = async {
        let mut reader = BufReader::new(stdout);
        let mut line = String::new();
        reader.read_line(&mut line).await.map(|_| line)
    };

    let line = match tokio::time::timeout(deadline, read_future).await {
        Ok(Ok(line)) => line,
        Ok(Err(err)) => {
            warn!("[ext:{name}] reading stdout failed: {err}");
            let _ = child.kill().await;
            return Action::Continue;
        }
        Err(_) => {
            warn!("[ext:{name}] timed out after {timeout_ms}ms");
            let _ = child.kill().await;
            return Action::Continue;
        }
    };

    if let Some(stderr) = child.stderr.take() {
        let name = entry.name.clone();
        tokio::spawn(async move {
            let mut reader = BufReader::new(stderr);
            let mut buf = String::new();
            while let Ok(n) = reader.read_line(&mut buf).await {
                if n == 0 {
                    break;
                }
                let line = buf.trim_end();
                warn!("[ext:{name}] stderr: {line}");
                buf.clear();
            }
        });
    }
    match tokio::time::timeout(Duration::from_millis(100), child.wait()).await {
        Ok(Ok(status)) if !status.success() => {
            warn!("[ext:{name}] exited with {status}");
            return Action::Continue;
        }
        Ok(Ok(_)) => {}
        Ok(Err(err)) => warn!("[ext:{name}] checking exit status failed: {err}"),
        Err(_) => {
            warn!("[ext:{name}] did not exit promptly after stdout; killed");
            let _ = child.kill().await;
            return Action::Continue;
        }
    }

    let trimmed = line.trim();
    if trimmed.is_empty() {
        return Action::Continue;
    }
    match serde_json::from_str::<Action>(trimmed) {
        Ok(action) => action,
        Err(err) => {
            let preview: String = trimmed.chars().take(200).collect();
            warn!("[ext:{name}] could not parse response (`{preview}`): {err}");
            Action::Continue
        }
    }
}

/// Dispatch `session_before_switch` through every subscribed extension
/// in manifest order. First `Cancel` short-circuits.
pub async fn dispatch_session_before_switch(
    registry: &ExtensionRegistry,
    reason: &str,
    session_id: Option<&str>,
) -> HookOutcome {
    let subscribed_indices = match registry.hook_index.get(&EventName::SessionBeforeSwitch) {
        Some(v) => v.clone(),
        None => return HookOutcome::Continue,
    };
    let event = Event::SessionBeforeSwitch {
        reason: reason.to_string(),
        session_id: session_id.map(|s| s.to_string()),
    };

    for idx in subscribed_indices {
        let extension = &registry.extensions[idx];
        match spawn_one(extension, &event).await {
            Action::Continue => continue,
            Action::Cancel { reason } => {
                return HookOutcome::Cancelled {
                    extension_name: extension.entry.name.clone(),
                    reason,
                };
            }
            Action::TransformText { .. } | Action::Reply { .. } | Action::Send { .. } => {
                tracing::warn!(
                    target: "extensions",
                    "[ext:{}] returned action not valid for session_before_switch; treating as continue",
                    extension.entry.name
                );
                continue;
            }
        }
    }
    HookOutcome::Continue
}

/// Dispatch `before_user_message` through every subscribed extension in
/// manifest order. `TransformText` updates the text and continues the
/// chain. `Cancel` short-circuits.
pub async fn dispatch_before_user_message(
    registry: &ExtensionRegistry,
    text: String,
    attachments: Vec<crate::user_message::Attachment>,
) -> BeforeUserMessageOutcome {
    let subscribed_indices = match registry.hook_index.get(&EventName::BeforeUserMessage) {
        Some(v) => v.clone(),
        None => return BeforeUserMessageOutcome::Proceed { text, attachments },
    };

    let mut current_text = text;
    for idx in subscribed_indices {
        let extension = &registry.extensions[idx];
        let event = Event::BeforeUserMessage {
            text: current_text.clone(),
            attachments: attachments.clone(),
        };
        match spawn_one(extension, &event).await {
            Action::Continue => continue,
            Action::Cancel { reason } => {
                return BeforeUserMessageOutcome::Cancelled {
                    extension_name: extension.entry.name.clone(),
                    reason,
                };
            }
            Action::TransformText { text } => {
                current_text = text;
            }
            // Wrong-event actions degrade to continue per spec §4.2.
            Action::Reply { .. } | Action::Send { .. } => {
                tracing::warn!(
                    target: "extensions",
                    "[ext:{}] returned reply/send for before_user_message; treating as continue",
                    extension.entry.name
                );
                continue;
            }
        }
    }
    BeforeUserMessageOutcome::Proceed {
        text: current_text,
        attachments,
    }
}

/// Dispatch a `command` event to the single owning extension.
pub async fn dispatch_command(
    registry: &ExtensionRegistry,
    command_name: &str,
    args: &str,
) -> CommandOutcome {
    let idx = match registry.command_index.get(command_name) {
        Some(i) => *i,
        None => return CommandOutcome::Unknown,
    };
    let extension = &registry.extensions[idx];
    let event = Event::Command {
        name: command_name.to_string(),
        args: args.to_string(),
    };
    match spawn_one(extension, &event).await {
        Action::Continue => CommandOutcome::NoOp,
        Action::Cancel { reason } => CommandOutcome::Cancelled {
            extension_name: extension.entry.name.clone(),
            reason,
        },
        Action::Reply { text } => CommandOutcome::Reply { text },
        Action::Send { text, attachments } => CommandOutcome::Send { text, attachments },
        // Wrong-event action degrades to NoOp per spec §4.2.
        Action::TransformText { .. } => {
            tracing::warn!(
                target: "extensions",
                "[ext:{}] returned transform_text for command event; treating as no-op",
                extension.entry.name
            );
            CommandOutcome::NoOp
        }
    }
}