Skip to main content

capo_agent/extensions/
dispatcher.rs

1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3//! Spawn-per-event dispatcher.
4//!
5//! The contract per spawn (mirrors §4 of the spec):
6//!   1. spawn(command, args, env), stdin/stdout/stderr = piped
7//!   2. write ONE JSON line + '\n' to stdin, close stdin
8//!   3. read up to ONE line from stdout, with timeout_ms deadline
9//!   4. parse Action, surface errors as `Continue`
10
11use std::time::Duration;
12
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::Command;
15use tracing::warn;
16
17use crate::extensions::{
18    registry::RegisteredExtension,
19    wire::{Action, Event, EventName},
20    ExtensionRegistry,
21};
22
23/// Result of dispatching a hook event through the extension chain.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum HookOutcome {
26    Continue,
27    Cancelled {
28        extension_name: String,
29        reason: Option<String>,
30    },
31}
32
33/// Result of dispatching `before_user_message` through the extension chain.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum BeforeUserMessageOutcome {
36    /// All extensions returned `continue` (or `transform_text`). The
37    /// `text` field is the final-transformed text (or the original if
38    /// no extension transformed). `attachments` is unchanged from input
39    /// — extensions don't modify attachments in `before_user_message`.
40    Proceed {
41        text: String,
42        attachments: Vec<crate::user_message::Attachment>,
43    },
44    /// An extension returned `cancel`. The turn does not start.
45    Cancelled {
46        extension_name: String,
47        reason: Option<String>,
48    },
49}
50
51/// Result of dispatching a `command` event to its owner extension.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum CommandOutcome {
54    /// Owner returned `continue` or any wrong-event action; treat as
55    /// no-op. The slash-command invocation produced nothing.
56    NoOp,
57    /// Owner returned `reply` — TUI emits a Notice with `text`.
58    Reply { text: String },
59    /// Owner returned `send` — TUI synthesizes a
60    /// `Command::SendUserMessage` with `text` + `attachments`.
61    Send {
62        text: String,
63        attachments: Vec<crate::user_message::Attachment>,
64    },
65    /// Owner returned `cancel` — TUI emits a Notice with the reason.
66    /// (Cancel from a command event is unusual but spec-permitted —
67    /// degrades to a user-visible message.)
68    Cancelled {
69        extension_name: String,
70        reason: Option<String>,
71    },
72    /// No extension owns this command name.
73    Unknown,
74}
75
76/// Spawn one extension, send the event, await the action.
77///
78/// Returns `Action::Continue` on any error path — spawn failure, timeout,
79/// non-zero exit, garbage stdout, unknown action. All such cases log at
80/// `warn` level via `tracing`; capo never crashes for an extension fault.
81pub(crate) async fn spawn_one(extension: &RegisteredExtension, event: &Event) -> Action {
82    let entry = &extension.entry;
83    let name = entry.name.as_str();
84    let timeout_ms = extension.effective_timeout_ms;
85    let event_json = match serde_json::to_string(event) {
86        Ok(s) => s,
87        Err(err) => {
88            warn!("[ext:{name}] failed to serialize event: {err}");
89            return Action::Continue;
90        }
91    };
92
93    let mut cmd = Command::new(&entry.command);
94    cmd.args(&entry.args)
95        .stdin(std::process::Stdio::piped())
96        .stdout(std::process::Stdio::piped())
97        .stderr(std::process::Stdio::piped())
98        .kill_on_drop(true);
99    for (key, value) in &entry.env {
100        if std::env::var_os(key).is_none() {
101            cmd.env(key, value);
102        }
103    }
104
105    let mut child = match cmd.spawn() {
106        Ok(c) => c,
107        Err(err) => {
108            warn!("[ext:{name}] spawn failed: {err}");
109            return Action::Continue;
110        }
111    };
112
113    if let Some(mut stdin) = child.stdin.take() {
114        let line = format!("{event_json}\n");
115        if let Err(err) = stdin.write_all(line.as_bytes()).await {
116            warn!("[ext:{name}] writing stdin failed: {err}");
117        }
118        drop(stdin);
119    }
120
121    let stdout = match child.stdout.take() {
122        Some(out) => out,
123        None => {
124            warn!("[ext:{name}] no stdout pipe");
125            return Action::Continue;
126        }
127    };
128
129    let deadline = Duration::from_millis(timeout_ms);
130    let read_future = async {
131        let mut reader = BufReader::new(stdout);
132        let mut line = String::new();
133        reader.read_line(&mut line).await.map(|_| line)
134    };
135
136    let line = match tokio::time::timeout(deadline, read_future).await {
137        Ok(Ok(line)) => line,
138        Ok(Err(err)) => {
139            warn!("[ext:{name}] reading stdout failed: {err}");
140            let _ = child.kill().await;
141            return Action::Continue;
142        }
143        Err(_) => {
144            warn!("[ext:{name}] timed out after {timeout_ms}ms");
145            let _ = child.kill().await;
146            return Action::Continue;
147        }
148    };
149
150    if let Some(stderr) = child.stderr.take() {
151        let name = entry.name.clone();
152        tokio::spawn(async move {
153            let mut reader = BufReader::new(stderr);
154            let mut buf = String::new();
155            while let Ok(n) = reader.read_line(&mut buf).await {
156                if n == 0 {
157                    break;
158                }
159                let line = buf.trim_end();
160                warn!("[ext:{name}] stderr: {line}");
161                buf.clear();
162            }
163        });
164    }
165    match tokio::time::timeout(Duration::from_millis(100), child.wait()).await {
166        Ok(Ok(status)) if !status.success() => {
167            warn!("[ext:{name}] exited with {status}");
168            return Action::Continue;
169        }
170        Ok(Ok(_)) => {}
171        Ok(Err(err)) => warn!("[ext:{name}] checking exit status failed: {err}"),
172        Err(_) => {
173            warn!("[ext:{name}] did not exit promptly after stdout; killed");
174            let _ = child.kill().await;
175            return Action::Continue;
176        }
177    }
178
179    let trimmed = line.trim();
180    if trimmed.is_empty() {
181        return Action::Continue;
182    }
183    match serde_json::from_str::<Action>(trimmed) {
184        Ok(action) => action,
185        Err(err) => {
186            let preview: String = trimmed.chars().take(200).collect();
187            warn!("[ext:{name}] could not parse response (`{preview}`): {err}");
188            Action::Continue
189        }
190    }
191}
192
193/// Dispatch `session_before_switch` through every subscribed extension
194/// in manifest order. First `Cancel` short-circuits.
195pub async fn dispatch_session_before_switch(
196    registry: &ExtensionRegistry,
197    reason: &str,
198    session_id: Option<&str>,
199) -> HookOutcome {
200    let subscribed_indices = match registry.hook_index.get(&EventName::SessionBeforeSwitch) {
201        Some(v) => v.clone(),
202        None => return HookOutcome::Continue,
203    };
204    let event = Event::SessionBeforeSwitch {
205        reason: reason.to_string(),
206        session_id: session_id.map(|s| s.to_string()),
207    };
208
209    for idx in subscribed_indices {
210        let extension = &registry.extensions[idx];
211        match spawn_one(extension, &event).await {
212            Action::Continue => continue,
213            Action::Cancel { reason } => {
214                return HookOutcome::Cancelled {
215                    extension_name: extension.entry.name.clone(),
216                    reason,
217                };
218            }
219            Action::TransformText { .. } | Action::Reply { .. } | Action::Send { .. } => {
220                tracing::warn!(
221                    target: "extensions",
222                    "[ext:{}] returned action not valid for session_before_switch; treating as continue",
223                    extension.entry.name
224                );
225                continue;
226            }
227        }
228    }
229    HookOutcome::Continue
230}
231
232/// Dispatch `before_user_message` through every subscribed extension in
233/// manifest order. `TransformText` updates the text and continues the
234/// chain. `Cancel` short-circuits.
235pub async fn dispatch_before_user_message(
236    registry: &ExtensionRegistry,
237    text: String,
238    attachments: Vec<crate::user_message::Attachment>,
239) -> BeforeUserMessageOutcome {
240    let subscribed_indices = match registry.hook_index.get(&EventName::BeforeUserMessage) {
241        Some(v) => v.clone(),
242        None => return BeforeUserMessageOutcome::Proceed { text, attachments },
243    };
244
245    let mut current_text = text;
246    for idx in subscribed_indices {
247        let extension = &registry.extensions[idx];
248        let event = Event::BeforeUserMessage {
249            text: current_text.clone(),
250            attachments: attachments.clone(),
251        };
252        match spawn_one(extension, &event).await {
253            Action::Continue => continue,
254            Action::Cancel { reason } => {
255                return BeforeUserMessageOutcome::Cancelled {
256                    extension_name: extension.entry.name.clone(),
257                    reason,
258                };
259            }
260            Action::TransformText { text } => {
261                current_text = text;
262            }
263            // Wrong-event actions degrade to continue per spec §4.2.
264            Action::Reply { .. } | Action::Send { .. } => {
265                tracing::warn!(
266                    target: "extensions",
267                    "[ext:{}] returned reply/send for before_user_message; treating as continue",
268                    extension.entry.name
269                );
270                continue;
271            }
272        }
273    }
274    BeforeUserMessageOutcome::Proceed {
275        text: current_text,
276        attachments,
277    }
278}
279
280/// Dispatch a `command` event to the single owning extension.
281pub async fn dispatch_command(
282    registry: &ExtensionRegistry,
283    command_name: &str,
284    args: &str,
285) -> CommandOutcome {
286    let idx = match registry.command_index.get(command_name) {
287        Some(i) => *i,
288        None => return CommandOutcome::Unknown,
289    };
290    let extension = &registry.extensions[idx];
291    let event = Event::Command {
292        name: command_name.to_string(),
293        args: args.to_string(),
294    };
295    match spawn_one(extension, &event).await {
296        Action::Continue => CommandOutcome::NoOp,
297        Action::Cancel { reason } => CommandOutcome::Cancelled {
298            extension_name: extension.entry.name.clone(),
299            reason,
300        },
301        Action::Reply { text } => CommandOutcome::Reply { text },
302        Action::Send { text, attachments } => CommandOutcome::Send { text, attachments },
303        // Wrong-event action degrades to NoOp per spec §4.2.
304        Action::TransformText { .. } => {
305            tracing::warn!(
306                target: "extensions",
307                "[ext:{}] returned transform_text for command event; treating as no-op",
308                extension.entry.name
309            );
310            CommandOutcome::NoOp
311        }
312    }
313}