1#![cfg_attr(test, allow(clippy::expect_used, clippy::unwrap_used))]
2
3use std::time::Duration;
12
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::Command;
15use tracing::warn;
16
17use crate::extensions::{
18 registry::RegisteredExtension,
19 wire::{Action, Event, EventName},
20 ExtensionRegistry,
21};
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum HookOutcome {
26 Continue,
27 Cancelled {
28 extension_name: String,
29 reason: Option<String>,
30 },
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum BeforeUserMessageOutcome {
36 Proceed {
41 text: String,
42 attachments: Vec<crate::user_message::Attachment>,
43 },
44 Cancelled {
46 extension_name: String,
47 reason: Option<String>,
48 },
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum CommandOutcome {
54 NoOp,
57 Reply { text: String },
59 Send {
62 text: String,
63 attachments: Vec<crate::user_message::Attachment>,
64 },
65 Cancelled {
69 extension_name: String,
70 reason: Option<String>,
71 },
72 Unknown,
74}
75
76pub(crate) async fn spawn_one(extension: &RegisteredExtension, event: &Event) -> Action {
82 let entry = &extension.entry;
83 let name = entry.name.as_str();
84 let timeout_ms = extension.effective_timeout_ms;
85 let event_json = match serde_json::to_string(event) {
86 Ok(s) => s,
87 Err(err) => {
88 warn!("[ext:{name}] failed to serialize event: {err}");
89 return Action::Continue;
90 }
91 };
92
93 let mut cmd = Command::new(&entry.command);
94 cmd.args(&entry.args)
95 .stdin(std::process::Stdio::piped())
96 .stdout(std::process::Stdio::piped())
97 .stderr(std::process::Stdio::piped())
98 .kill_on_drop(true);
99 for (key, value) in &entry.env {
100 if std::env::var_os(key).is_none() {
101 cmd.env(key, value);
102 }
103 }
104
105 let mut child = match cmd.spawn() {
106 Ok(c) => c,
107 Err(err) => {
108 warn!("[ext:{name}] spawn failed: {err}");
109 return Action::Continue;
110 }
111 };
112
113 if let Some(mut stdin) = child.stdin.take() {
114 let line = format!("{event_json}\n");
115 if let Err(err) = stdin.write_all(line.as_bytes()).await {
116 warn!("[ext:{name}] writing stdin failed: {err}");
117 }
118 drop(stdin);
119 }
120
121 let stdout = match child.stdout.take() {
122 Some(out) => out,
123 None => {
124 warn!("[ext:{name}] no stdout pipe");
125 return Action::Continue;
126 }
127 };
128
129 let deadline = Duration::from_millis(timeout_ms);
130 let read_future = async {
131 let mut reader = BufReader::new(stdout);
132 let mut line = String::new();
133 reader.read_line(&mut line).await.map(|_| line)
134 };
135
136 let line = match tokio::time::timeout(deadline, read_future).await {
137 Ok(Ok(line)) => line,
138 Ok(Err(err)) => {
139 warn!("[ext:{name}] reading stdout failed: {err}");
140 let _ = child.kill().await;
141 return Action::Continue;
142 }
143 Err(_) => {
144 warn!("[ext:{name}] timed out after {timeout_ms}ms");
145 let _ = child.kill().await;
146 return Action::Continue;
147 }
148 };
149
150 if let Some(stderr) = child.stderr.take() {
151 let name = entry.name.clone();
152 tokio::spawn(async move {
153 let mut reader = BufReader::new(stderr);
154 let mut buf = String::new();
155 while let Ok(n) = reader.read_line(&mut buf).await {
156 if n == 0 {
157 break;
158 }
159 let line = buf.trim_end();
160 warn!("[ext:{name}] stderr: {line}");
161 buf.clear();
162 }
163 });
164 }
165 match tokio::time::timeout(Duration::from_millis(100), child.wait()).await {
166 Ok(Ok(status)) if !status.success() => {
167 warn!("[ext:{name}] exited with {status}");
168 return Action::Continue;
169 }
170 Ok(Ok(_)) => {}
171 Ok(Err(err)) => warn!("[ext:{name}] checking exit status failed: {err}"),
172 Err(_) => {
173 warn!("[ext:{name}] did not exit promptly after stdout; killed");
174 let _ = child.kill().await;
175 return Action::Continue;
176 }
177 }
178
179 let trimmed = line.trim();
180 if trimmed.is_empty() {
181 return Action::Continue;
182 }
183 match serde_json::from_str::<Action>(trimmed) {
184 Ok(action) => action,
185 Err(err) => {
186 let preview: String = trimmed.chars().take(200).collect();
187 warn!("[ext:{name}] could not parse response (`{preview}`): {err}");
188 Action::Continue
189 }
190 }
191}
192
193pub async fn dispatch_session_before_switch(
196 registry: &ExtensionRegistry,
197 reason: &str,
198 session_id: Option<&str>,
199) -> HookOutcome {
200 let subscribed_indices = match registry.hook_index.get(&EventName::SessionBeforeSwitch) {
201 Some(v) => v.clone(),
202 None => return HookOutcome::Continue,
203 };
204 let event = Event::SessionBeforeSwitch {
205 reason: reason.to_string(),
206 session_id: session_id.map(|s| s.to_string()),
207 };
208
209 for idx in subscribed_indices {
210 let extension = ®istry.extensions[idx];
211 match spawn_one(extension, &event).await {
212 Action::Continue => continue,
213 Action::Cancel { reason } => {
214 return HookOutcome::Cancelled {
215 extension_name: extension.entry.name.clone(),
216 reason,
217 };
218 }
219 Action::TransformText { .. } | Action::Reply { .. } | Action::Send { .. } => {
220 tracing::warn!(
221 target: "extensions",
222 "[ext:{}] returned action not valid for session_before_switch; treating as continue",
223 extension.entry.name
224 );
225 continue;
226 }
227 }
228 }
229 HookOutcome::Continue
230}
231
232pub async fn dispatch_before_user_message(
236 registry: &ExtensionRegistry,
237 text: String,
238 attachments: Vec<crate::user_message::Attachment>,
239) -> BeforeUserMessageOutcome {
240 let subscribed_indices = match registry.hook_index.get(&EventName::BeforeUserMessage) {
241 Some(v) => v.clone(),
242 None => return BeforeUserMessageOutcome::Proceed { text, attachments },
243 };
244
245 let mut current_text = text;
246 for idx in subscribed_indices {
247 let extension = ®istry.extensions[idx];
248 let event = Event::BeforeUserMessage {
249 text: current_text.clone(),
250 attachments: attachments.clone(),
251 };
252 match spawn_one(extension, &event).await {
253 Action::Continue => continue,
254 Action::Cancel { reason } => {
255 return BeforeUserMessageOutcome::Cancelled {
256 extension_name: extension.entry.name.clone(),
257 reason,
258 };
259 }
260 Action::TransformText { text } => {
261 current_text = text;
262 }
263 Action::Reply { .. } | Action::Send { .. } => {
265 tracing::warn!(
266 target: "extensions",
267 "[ext:{}] returned reply/send for before_user_message; treating as continue",
268 extension.entry.name
269 );
270 continue;
271 }
272 }
273 }
274 BeforeUserMessageOutcome::Proceed {
275 text: current_text,
276 attachments,
277 }
278}
279
280pub async fn dispatch_command(
282 registry: &ExtensionRegistry,
283 command_name: &str,
284 args: &str,
285) -> CommandOutcome {
286 let idx = match registry.command_index.get(command_name) {
287 Some(i) => *i,
288 None => return CommandOutcome::Unknown,
289 };
290 let extension = ®istry.extensions[idx];
291 let event = Event::Command {
292 name: command_name.to_string(),
293 args: args.to_string(),
294 };
295 match spawn_one(extension, &event).await {
296 Action::Continue => CommandOutcome::NoOp,
297 Action::Cancel { reason } => CommandOutcome::Cancelled {
298 extension_name: extension.entry.name.clone(),
299 reason,
300 },
301 Action::Reply { text } => CommandOutcome::Reply { text },
302 Action::Send { text, attachments } => CommandOutcome::Send { text, attachments },
303 Action::TransformText { .. } => {
305 tracing::warn!(
306 target: "extensions",
307 "[ext:{}] returned transform_text for command event; treating as no-op",
308 extension.entry.name
309 );
310 CommandOutcome::NoOp
311 }
312 }
313}