1use super::manager::{
22 DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
23};
24use crate::core::plugin::{DescribeV1, ResponseV1};
25use anyhow::{Result, anyhow};
26use std::io::Read;
27use std::process::{Child, Command, Output, Stdio};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31
32const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
33const ETXTBSY_RETRY_COUNT: usize = 5;
34const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
35const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
36
37enum CommandRunError {
38 Execute(std::io::Error),
39 TimedOut { timeout: Duration, stderr: Vec<u8> },
40}
41
42impl PluginManager {
43 pub fn dispatch(
45 &self,
46 command: &str,
47 args: &[String],
48 context: &PluginDispatchContext,
49 ) -> std::result::Result<ResponseV1, PluginDispatchError> {
50 let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
51
52 let raw = run_provider(&provider, command, args, context, self.process_timeout)?;
53 if raw.status_code != 0 {
54 tracing::warn!(
55 plugin_id = %provider.plugin_id,
56 command = %command,
57 status_code = raw.status_code,
58 stderr = %raw.stderr.trim(),
59 "plugin command exited with non-zero status"
60 );
61 return Err(PluginDispatchError::NonZeroExit {
62 plugin_id: provider.plugin_id.clone(),
63 status_code: raw.status_code,
64 stderr: raw.stderr,
65 });
66 }
67
68 let response: ResponseV1 = serde_json::from_str(&raw.stdout).map_err(|source| {
69 tracing::warn!(
70 plugin_id = %provider.plugin_id,
71 command = %command,
72 error = %source,
73 "plugin command returned invalid JSON"
74 );
75 PluginDispatchError::InvalidJsonResponse {
76 plugin_id: provider.plugin_id.clone(),
77 source,
78 }
79 })?;
80
81 response.validate_v1().map_err(|reason| {
82 tracing::warn!(
83 plugin_id = %provider.plugin_id,
84 command = %command,
85 reason = %reason,
86 "plugin command returned invalid payload"
87 );
88 PluginDispatchError::InvalidResponsePayload {
89 plugin_id: provider.plugin_id.clone(),
90 reason,
91 }
92 })?;
93
94 Ok(response)
95 }
96
97 pub fn dispatch_passthrough(
99 &self,
100 command: &str,
101 args: &[String],
102 context: &PluginDispatchContext,
103 ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
104 let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
105 run_provider(&provider, command, args, context, self.process_timeout)
106 }
107}
108
109pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
110 let mut command = Command::new(path);
111 command.arg("--describe");
112 let started_at = Instant::now();
113 tracing::debug!(
114 executable = %path.display(),
115 timeout_ms = timeout.as_millis(),
116 "running plugin describe"
117 );
118 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
119 CommandRunError::Execute(source) => {
120 tracing::warn!(
121 executable = %path.display(),
122 error = %source,
123 "plugin describe execution failed"
124 );
125 anyhow!(
126 "failed to execute --describe for {}: {source}",
127 path.display()
128 )
129 }
130 CommandRunError::TimedOut { timeout, stderr } => {
131 let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
132 tracing::warn!(
133 executable = %path.display(),
134 timeout_ms = timeout.as_millis(),
135 stderr = %stderr,
136 "plugin describe timed out"
137 );
138 if stderr.is_empty() {
139 anyhow!(
140 "--describe timed out after {} ms for {}",
141 timeout.as_millis(),
142 path.display()
143 )
144 } else {
145 anyhow!(
146 "--describe timed out after {} ms for {}: {}",
147 timeout.as_millis(),
148 path.display(),
149 stderr
150 )
151 }
152 }
153 })?;
154
155 tracing::debug!(
156 executable = %path.display(),
157 elapsed_ms = started_at.elapsed().as_millis(),
158 status = ?output.status.code(),
159 "plugin describe completed"
160 );
161
162 if !output.status.success() {
163 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
164 let message = if stderr.is_empty() {
165 format!("--describe failed with status {}", output.status)
166 } else {
167 format!(
168 "--describe failed with status {}: {}",
169 output.status, stderr
170 )
171 };
172 return Err(anyhow!(message));
173 }
174
175 let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
176 .map_err(anyhow::Error::from)
177 .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
178 describe
179 .validate_v1()
180 .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
181
182 Ok(describe)
183}
184
185pub(super) fn run_provider(
186 provider: &DiscoveredPlugin,
187 selected_command: &str,
188 args: &[String],
189 context: &PluginDispatchContext,
190 timeout: Duration,
191) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
192 let mut command = Command::new(&provider.executable);
193 let started_at = Instant::now();
194 tracing::debug!(
195 plugin_id = %provider.plugin_id,
196 executable = %provider.executable.display(),
197 command = %selected_command,
198 arg_count = args.len(),
199 timeout_ms = timeout.as_millis(),
200 "dispatching plugin command"
201 );
202 command.arg(selected_command);
203 command.args(args);
204 command.env(ENV_OSP_COMMAND, selected_command);
205 for (key, value) in context.runtime_hints.env_pairs() {
206 command.env(key, value);
207 }
208 for (key, value) in context.env_pairs_for(&provider.plugin_id) {
209 command.env(key, value);
210 }
211
212 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
213 CommandRunError::Execute(source) => {
214 tracing::warn!(
215 plugin_id = %provider.plugin_id,
216 executable = %provider.executable.display(),
217 command = %selected_command,
218 error = %source,
219 "plugin command execution failed"
220 );
221 PluginDispatchError::ExecuteFailed {
222 plugin_id: provider.plugin_id.clone(),
223 source,
224 }
225 }
226 CommandRunError::TimedOut { timeout, stderr } => {
227 let stderr_text = String::from_utf8_lossy(&stderr).to_string();
228 tracing::warn!(
229 plugin_id = %provider.plugin_id,
230 executable = %provider.executable.display(),
231 command = %selected_command,
232 timeout_ms = timeout.as_millis(),
233 stderr = %stderr_text.trim(),
234 "plugin command timed out"
235 );
236 PluginDispatchError::TimedOut {
237 plugin_id: provider.plugin_id.clone(),
238 timeout,
239 stderr: stderr_text,
240 }
241 }
242 })?;
243
244 tracing::debug!(
245 plugin_id = %provider.plugin_id,
246 executable = %provider.executable.display(),
247 command = %selected_command,
248 elapsed_ms = started_at.elapsed().as_millis(),
249 status = ?output.status.code(),
250 "plugin command completed"
251 );
252
253 Ok(RawPluginOutput {
254 status_code: output.status.code().unwrap_or(1),
255 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
256 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
257 })
258}
259
260fn run_command_with_timeout(
261 mut command: Command,
262 timeout: Duration,
263) -> Result<Output, CommandRunError> {
264 configure_command_process_group(&mut command);
265 command.stdin(Stdio::null());
266 command.stdout(Stdio::piped());
267 command.stderr(Stdio::piped());
268
269 let mut child = DrainedChild::spawn(command).map_err(CommandRunError::Execute)?;
270 let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
271
272 loop {
273 match child.try_wait() {
274 Ok(Some(status)) => return child.finish(status).map_err(CommandRunError::Execute),
275 Ok(None) if Instant::now() < deadline => {
276 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
277 }
278 Ok(None) => {
279 terminate_timed_out_child(child.child_mut());
280 let status = child.wait().map_err(CommandRunError::Execute)?;
281 let output = child.finish(status).map_err(CommandRunError::Execute)?;
282 return Err(CommandRunError::TimedOut {
283 timeout,
284 stderr: output.stderr,
285 });
286 }
287 Err(source) => return Err(CommandRunError::Execute(source)),
288 }
289 }
290}
291
292struct DrainedChild {
293 child: Child,
294 stdout: JoinHandle<std::io::Result<Vec<u8>>>,
295 stderr: JoinHandle<std::io::Result<Vec<u8>>>,
296}
297
298impl DrainedChild {
299 fn spawn(mut command: Command) -> std::io::Result<Self> {
300 let mut child = spawn_command_with_retry(&mut command)?;
301 let stdout = child
302 .stdout
303 .take()
304 .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
305 let stderr = child
306 .stderr
307 .take()
308 .ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;
309 Ok(Self {
310 child,
311 stdout: spawn_capture_thread(stdout),
312 stderr: spawn_capture_thread(stderr),
313 })
314 }
315
316 fn child_mut(&mut self) -> &mut Child {
317 &mut self.child
318 }
319
320 fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
321 self.child.try_wait()
322 }
323
324 fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
325 self.child.wait()
326 }
327
328 fn finish(self, status: std::process::ExitStatus) -> std::io::Result<Output> {
329 Ok(Output {
330 status,
331 stdout: join_capture(self.stdout)?,
332 stderr: join_capture(self.stderr)?,
333 })
334 }
335}
336
337fn spawn_capture_thread<R>(mut reader: R) -> JoinHandle<std::io::Result<Vec<u8>>>
338where
339 R: Read + Send + 'static,
340{
341 thread::spawn(move || {
342 let mut buffer = Vec::new();
343 reader.read_to_end(&mut buffer)?;
344 Ok(buffer)
345 })
346}
347
348fn join_capture(handle: JoinHandle<std::io::Result<Vec<u8>>>) -> std::io::Result<Vec<u8>> {
349 handle
350 .join()
351 .map_err(|_| std::io::Error::other("plugin output capture thread panicked"))?
352}
353
354fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
355 for attempt in 0..=ETXTBSY_RETRY_COUNT {
356 match command.spawn() {
357 Ok(child) => return Ok(child),
358 Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
359 thread::sleep(ETXTBSY_RETRY_DELAY);
360 }
361 Err(err) => return Err(err),
362 }
363 }
364
365 unreachable!("retry loop should always return or error");
366}
367
368fn is_text_file_busy(err: &std::io::Error) -> bool {
369 err.raw_os_error() == Some(26)
370}
371
372#[cfg(unix)]
373fn configure_command_process_group(command: &mut Command) {
374 use std::os::unix::process::CommandExt;
375
376 command.process_group(0);
377}
378
379#[cfg(not(unix))]
380fn configure_command_process_group(_command: &mut Command) {}
381
382#[cfg(unix)]
383fn terminate_timed_out_child(child: &mut Child) {
384 const SIGTERM: i32 = 15;
385 const SIGKILL: i32 = 9;
386
387 let process_group = child.id() as i32;
388 let _ = signal_process_group(process_group, SIGTERM);
389 let grace_deadline = Instant::now() + Duration::from_millis(50);
390
391 loop {
392 match child.try_wait() {
393 Ok(Some(_)) => return,
394 Ok(None) if Instant::now() < grace_deadline => {
395 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
396 }
397 Ok(None) | Err(_) => break,
398 }
399 }
400
401 let _ = signal_process_group(process_group, SIGKILL);
402}
403
404#[cfg(not(unix))]
405fn terminate_timed_out_child(child: &mut Child) {
406 let _ = child.kill();
407}
408
409#[cfg(unix)]
410fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
411 unsafe extern "C" {
412 fn kill(pid: i32, sig: i32) -> i32;
413 }
414
415 let result = unsafe { kill(-process_group, signal) };
416 if result == 0 {
417 Ok(())
418 } else {
419 Err(std::io::Error::last_os_error())
420 }
421}