Skip to main content

osp_cli/plugin/
dispatch.rs

1//! Plugin process dispatch and validation boundary.
2//!
3//! This module exists so the rest of the app can treat plugin commands as
4//! structured responses instead of hand-managing child processes, timeouts, and
5//! payload validation everywhere.
6//!
7//! High-level flow:
8//!
9//! - resolve the provider that should handle a command
10//! - execute it with the correct environment and timeout policy
11//! - capture stdout/stderr and exit status
12//! - validate the returned JSON payload before handing it back to the host
13//!
14//! Contract:
15//!
16//! - subprocess spawning and timeout policy live here
17//! - higher layers should consume validated plugin results instead of shelling
18//!   out directly
19//! - plugin DTO validation should stay aligned with [`crate::core::plugin`]
20
21use super::manager::{
22    DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
23};
24use crate::core::plugin::{DescribeV1, ResponseV1};
25use anyhow::{Result, anyhow};
26use std::io::Read;
27use std::process::{Child, Command, Output, Stdio};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31
32const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
33const ETXTBSY_RETRY_COUNT: usize = 5;
34const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
35const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
36
37enum CommandRunError {
38    Execute(std::io::Error),
39    TimedOut { timeout: Duration, stderr: Vec<u8> },
40}
41
42struct ExecutedPluginCommand {
43    provider: DiscoveredPlugin,
44    raw: RawPluginOutput,
45}
46
47impl ExecutedPluginCommand {
48    fn run(
49        manager: &PluginManager,
50        command: &str,
51        args: &[String],
52        context: &PluginDispatchContext,
53    ) -> std::result::Result<Self, PluginDispatchError> {
54        let provider = manager.resolve_provider(command, context.provider_override.as_deref())?;
55        let raw = run_provider(&provider, command, args, context, manager.process_timeout)?;
56        Ok(Self { provider, raw })
57    }
58
59    fn into_raw(self) -> RawPluginOutput {
60        self.raw
61    }
62
63    fn into_response(self, command: &str) -> std::result::Result<ResponseV1, PluginDispatchError> {
64        if self.raw.status_code != 0 {
65            tracing::warn!(
66                plugin_id = %self.provider.plugin_id,
67                command = %command,
68                status_code = self.raw.status_code,
69                stderr = %self.raw.stderr.trim(),
70                "plugin command exited with non-zero status"
71            );
72            return Err(PluginDispatchError::NonZeroExit {
73                plugin_id: self.provider.plugin_id,
74                status_code: self.raw.status_code,
75                stderr: self.raw.stderr,
76            });
77        }
78
79        let response: ResponseV1 = serde_json::from_str(&self.raw.stdout).map_err(|source| {
80            tracing::warn!(
81                plugin_id = %self.provider.plugin_id,
82                command = %command,
83                error = %source,
84                "plugin command returned invalid JSON"
85            );
86            PluginDispatchError::InvalidJsonResponse {
87                plugin_id: self.provider.plugin_id.clone(),
88                source,
89            }
90        })?;
91
92        response.validate_v1().map_err(|reason| {
93            tracing::warn!(
94                plugin_id = %self.provider.plugin_id,
95                command = %command,
96                reason = %reason,
97                "plugin command returned invalid payload"
98            );
99            PluginDispatchError::InvalidResponsePayload {
100                plugin_id: self.provider.plugin_id,
101                reason,
102            }
103        })?;
104
105        Ok(response)
106    }
107}
108
109impl PluginManager {
110    /// Runs a plugin command and returns its validated structured response.
111    ///
112    /// `command` is the full command path resolved against the active plugin
113    /// catalog. `args` are passed to the plugin after that command name.
114    /// `context` carries runtime hints, optional environment overrides, and an
115    /// optional one-shot provider override for this dispatch only.
116    ///
117    /// # Errors
118    ///
119    /// Returns [`PluginDispatchError`] when provider resolution fails, the
120    /// plugin subprocess cannot be executed, the subprocess times out, the
121    /// plugin exits non-zero, or the returned JSON is syntactically or
122    /// semantically invalid.
123    ///
124    /// # Examples
125    ///
126    /// ```
127    /// use osp_cli::plugin::{PluginDispatchContext, PluginDispatchError, PluginManager};
128    ///
129    /// let err = PluginManager::new(Vec::new())
130    ///     .dispatch("shared", &[], &PluginDispatchContext::default())
131    ///     .unwrap_err();
132    ///
133    /// assert!(matches!(err, PluginDispatchError::CommandNotFound { .. }));
134    /// ```
135    pub fn dispatch(
136        &self,
137        command: &str,
138        args: &[String],
139        context: &PluginDispatchContext,
140    ) -> std::result::Result<ResponseV1, PluginDispatchError> {
141        ExecutedPluginCommand::run(self, command, args, context)?.into_response(command)
142    }
143
144    /// Runs a plugin command and returns raw stdout, stderr, and exit status.
145    ///
146    /// Unlike [`PluginManager::dispatch`], this does not attempt to decode or
147    /// validate plugin JSON output. Non-zero exit codes are returned in
148    /// [`RawPluginOutput::status_code`] rather than surfaced as
149    /// [`PluginDispatchError::NonZeroExit`].
150    ///
151    /// # Errors
152    ///
153    /// Returns [`PluginDispatchError`] when provider resolution fails, the
154    /// plugin subprocess cannot be executed, or the subprocess times out.
155    ///
156    /// # Examples
157    ///
158    /// ```
159    /// use osp_cli::plugin::{PluginDispatchContext, PluginDispatchError, PluginManager};
160    ///
161    /// let err = PluginManager::new(Vec::new())
162    ///     .dispatch_passthrough("shared", &[], &PluginDispatchContext::default())
163    ///     .unwrap_err();
164    ///
165    /// assert!(matches!(err, PluginDispatchError::CommandNotFound { .. }));
166    /// ```
167    pub fn dispatch_passthrough(
168        &self,
169        command: &str,
170        args: &[String],
171        context: &PluginDispatchContext,
172    ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
173        Ok(ExecutedPluginCommand::run(self, command, args, context)?.into_raw())
174    }
175}
176
177pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
178    let mut command = Command::new(path);
179    command.arg("--describe");
180    let started_at = Instant::now();
181    tracing::debug!(
182        executable = %path.display(),
183        timeout_ms = timeout.as_millis(),
184        "running plugin describe"
185    );
186    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
187        CommandRunError::Execute(source) => {
188            tracing::warn!(
189                executable = %path.display(),
190                error = %source,
191                "plugin describe execution failed"
192            );
193            anyhow!(
194                "failed to execute --describe for {}: {source}",
195                path.display()
196            )
197        }
198        CommandRunError::TimedOut { timeout, stderr } => {
199            let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
200            tracing::warn!(
201                executable = %path.display(),
202                timeout_ms = timeout.as_millis(),
203                stderr = %stderr,
204                "plugin describe timed out"
205            );
206            if stderr.is_empty() {
207                anyhow!(
208                    "--describe timed out after {} ms for {}",
209                    timeout.as_millis(),
210                    path.display()
211                )
212            } else {
213                anyhow!(
214                    "--describe timed out after {} ms for {}: {}",
215                    timeout.as_millis(),
216                    path.display(),
217                    stderr
218                )
219            }
220        }
221    })?;
222
223    tracing::debug!(
224        executable = %path.display(),
225        elapsed_ms = started_at.elapsed().as_millis(),
226        status = ?output.status.code(),
227        "plugin describe completed"
228    );
229
230    if !output.status.success() {
231        let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
232        let message = if stderr.is_empty() {
233            format!("--describe failed with status {}", output.status)
234        } else {
235            format!(
236                "--describe failed with status {}: {}",
237                output.status, stderr
238            )
239        };
240        return Err(anyhow!(message));
241    }
242
243    let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
244        .map_err(anyhow::Error::from)
245        .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
246    describe
247        .validate_v1()
248        .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
249
250    Ok(describe)
251}
252
253pub(super) fn run_provider(
254    provider: &DiscoveredPlugin,
255    selected_command: &str,
256    args: &[String],
257    context: &PluginDispatchContext,
258    timeout: Duration,
259) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
260    let mut command = Command::new(&provider.executable);
261    let started_at = Instant::now();
262    tracing::debug!(
263        plugin_id = %provider.plugin_id,
264        executable = %provider.executable.display(),
265        command = %selected_command,
266        arg_count = args.len(),
267        timeout_ms = timeout.as_millis(),
268        "dispatching plugin command"
269    );
270    command.arg(selected_command);
271    command.args(args);
272    command.env(ENV_OSP_COMMAND, selected_command);
273    for (key, value) in context.runtime_hints.env_pairs() {
274        command.env(key, value);
275    }
276    for (key, value) in context.env_pairs_for(&provider.plugin_id) {
277        command.env(key, value);
278    }
279
280    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
281        CommandRunError::Execute(source) => {
282            tracing::warn!(
283                plugin_id = %provider.plugin_id,
284                executable = %provider.executable.display(),
285                command = %selected_command,
286                error = %source,
287                "plugin command execution failed"
288            );
289            PluginDispatchError::ExecuteFailed {
290                plugin_id: provider.plugin_id.clone(),
291                source,
292            }
293        }
294        CommandRunError::TimedOut { timeout, stderr } => {
295            let stderr_text = String::from_utf8_lossy(&stderr).to_string();
296            tracing::warn!(
297                plugin_id = %provider.plugin_id,
298                executable = %provider.executable.display(),
299                command = %selected_command,
300                timeout_ms = timeout.as_millis(),
301                stderr = %stderr_text.trim(),
302                "plugin command timed out"
303            );
304            PluginDispatchError::TimedOut {
305                plugin_id: provider.plugin_id.clone(),
306                timeout,
307                stderr: stderr_text,
308            }
309        }
310    })?;
311
312    tracing::debug!(
313        plugin_id = %provider.plugin_id,
314        executable = %provider.executable.display(),
315        command = %selected_command,
316        elapsed_ms = started_at.elapsed().as_millis(),
317        status = ?output.status.code(),
318        "plugin command completed"
319    );
320
321    Ok(RawPluginOutput {
322        status_code: output.status.code().unwrap_or(1),
323        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
324        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
325    })
326}
327
328fn run_command_with_timeout(
329    mut command: Command,
330    timeout: Duration,
331) -> Result<Output, CommandRunError> {
332    configure_command_process_group(&mut command);
333    command.stdin(Stdio::null());
334    command.stdout(Stdio::piped());
335    command.stderr(Stdio::piped());
336
337    let mut child = DrainedChild::spawn(command).map_err(CommandRunError::Execute)?;
338    let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
339
340    loop {
341        match child.try_wait() {
342            Ok(Some(status)) => return child.finish(status).map_err(CommandRunError::Execute),
343            Ok(None) if Instant::now() < deadline => {
344                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
345            }
346            Ok(None) => {
347                terminate_timed_out_child(child.child_mut());
348                let status = child.wait().map_err(CommandRunError::Execute)?;
349                let output = child.finish(status).map_err(CommandRunError::Execute)?;
350                return Err(CommandRunError::TimedOut {
351                    timeout,
352                    stderr: output.stderr,
353                });
354            }
355            Err(source) => return Err(CommandRunError::Execute(source)),
356        }
357    }
358}
359
360struct DrainedChild {
361    child: Child,
362    stdout: JoinHandle<std::io::Result<Vec<u8>>>,
363    stderr: JoinHandle<std::io::Result<Vec<u8>>>,
364}
365
366impl DrainedChild {
367    fn spawn(mut command: Command) -> std::io::Result<Self> {
368        let mut child = spawn_command_with_retry(&mut command)?;
369        let stdout = child
370            .stdout
371            .take()
372            .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
373        let stderr = child
374            .stderr
375            .take()
376            .ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;
377        Ok(Self {
378            child,
379            stdout: spawn_capture_thread(stdout),
380            stderr: spawn_capture_thread(stderr),
381        })
382    }
383
384    fn child_mut(&mut self) -> &mut Child {
385        &mut self.child
386    }
387
388    fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
389        self.child.try_wait()
390    }
391
392    fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
393        self.child.wait()
394    }
395
396    fn finish(self, status: std::process::ExitStatus) -> std::io::Result<Output> {
397        Ok(Output {
398            status,
399            stdout: join_capture(self.stdout)?,
400            stderr: join_capture(self.stderr)?,
401        })
402    }
403}
404
405fn spawn_capture_thread<R>(mut reader: R) -> JoinHandle<std::io::Result<Vec<u8>>>
406where
407    R: Read + Send + 'static,
408{
409    thread::spawn(move || {
410        let mut buffer = Vec::new();
411        reader.read_to_end(&mut buffer)?;
412        Ok(buffer)
413    })
414}
415
416fn join_capture(handle: JoinHandle<std::io::Result<Vec<u8>>>) -> std::io::Result<Vec<u8>> {
417    handle
418        .join()
419        .map_err(|_| std::io::Error::other("plugin output capture thread panicked"))?
420}
421
422fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
423    for attempt in 0..=ETXTBSY_RETRY_COUNT {
424        match command.spawn() {
425            Ok(child) => return Ok(child),
426            Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
427                thread::sleep(ETXTBSY_RETRY_DELAY);
428            }
429            Err(err) => return Err(err),
430        }
431    }
432
433    Err(std::io::Error::other(
434        "plugin spawn retry loop exhausted unexpectedly",
435    ))
436}
437
438fn is_text_file_busy(err: &std::io::Error) -> bool {
439    err.raw_os_error() == Some(26)
440}
441
442#[cfg(unix)]
443fn configure_command_process_group(command: &mut Command) {
444    use std::os::unix::process::CommandExt;
445
446    command.process_group(0);
447}
448
449#[cfg(not(unix))]
450fn configure_command_process_group(_command: &mut Command) {}
451
452#[cfg(unix)]
453fn terminate_timed_out_child(child: &mut Child) {
454    const SIGTERM: i32 = 15;
455    const SIGKILL: i32 = 9;
456
457    let process_group = child.id() as i32;
458    let _ = signal_process_group(process_group, SIGTERM);
459    let grace_deadline = Instant::now() + Duration::from_millis(50);
460
461    loop {
462        match child.try_wait() {
463            Ok(Some(_)) => return,
464            Ok(None) if Instant::now() < grace_deadline => {
465                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
466            }
467            Ok(None) | Err(_) => break,
468        }
469    }
470
471    let _ = signal_process_group(process_group, SIGKILL);
472}
473
474#[cfg(not(unix))]
475fn terminate_timed_out_child(child: &mut Child) {
476    let _ = child.kill();
477}
478
479#[cfg(unix)]
480fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
481    unsafe extern "C" {
482        fn kill(pid: i32, sig: i32) -> i32;
483    }
484
485    let result = unsafe { kill(-process_group, signal) };
486    if result == 0 {
487        Ok(())
488    } else {
489        Err(std::io::Error::last_os_error())
490    }
491}