Skip to main content

osp_cli/plugin/
dispatch.rs

1//! Plugin process dispatch and validation boundary.
2//!
3//! This module exists so the rest of the app can treat plugin commands as
4//! structured responses instead of hand-managing child processes, timeouts, and
5//! payload validation everywhere.
6//!
7//! High-level flow:
8//!
9//! - resolve the provider that should handle a command
10//! - execute it with the correct environment and timeout policy
11//! - capture stdout/stderr and exit status
12//! - validate the returned JSON payload before handing it back to the host
13//!
14//! Contract:
15//!
16//! - subprocess spawning and timeout policy live here
17//! - higher layers should consume validated plugin results instead of shelling
18//!   out directly
19//! - plugin DTO validation should stay aligned with [`crate::core::plugin`]
20
21use super::manager::{
22    DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
23};
24use crate::core::plugin::{DescribeV1, ResponseV1};
25use anyhow::{Result, anyhow};
26use std::io::Read;
27use std::process::{Child, Command, Output, Stdio};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31
32const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
33const ETXTBSY_RETRY_COUNT: usize = 5;
34const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
35const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
36
37enum CommandRunError {
38    Execute(std::io::Error),
39    TimedOut { timeout: Duration, stderr: Vec<u8> },
40}
41
42impl PluginManager {
43    /// Runs a plugin command and returns its validated structured response.
44    ///
45    /// `command` is the full command path resolved against the active plugin
46    /// catalog. `args` are passed to the plugin after that command name.
47    /// `context` carries runtime hints, optional environment overrides, and an
48    /// optional one-shot provider override for this dispatch only.
49    ///
50    /// # Errors
51    ///
52    /// Returns [`PluginDispatchError`] when provider resolution fails, the
53    /// plugin subprocess cannot be executed, the subprocess times out, the
54    /// plugin exits non-zero, or the returned JSON is syntactically or
55    /// semantically invalid.
56    ///
57    /// # Examples
58    ///
59    /// ```
60    /// use osp_cli::plugin::{PluginDispatchContext, PluginDispatchError, PluginManager};
61    ///
62    /// let err = PluginManager::new(Vec::new())
63    ///     .dispatch("shared", &[], &PluginDispatchContext::default())
64    ///     .unwrap_err();
65    ///
66    /// assert!(matches!(err, PluginDispatchError::CommandNotFound { .. }));
67    /// ```
68    pub fn dispatch(
69        &self,
70        command: &str,
71        args: &[String],
72        context: &PluginDispatchContext,
73    ) -> std::result::Result<ResponseV1, PluginDispatchError> {
74        let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
75
76        let raw = run_provider(&provider, command, args, context, self.process_timeout)?;
77        if raw.status_code != 0 {
78            tracing::warn!(
79                plugin_id = %provider.plugin_id,
80                command = %command,
81                status_code = raw.status_code,
82                stderr = %raw.stderr.trim(),
83                "plugin command exited with non-zero status"
84            );
85            return Err(PluginDispatchError::NonZeroExit {
86                plugin_id: provider.plugin_id.clone(),
87                status_code: raw.status_code,
88                stderr: raw.stderr,
89            });
90        }
91
92        let response: ResponseV1 = serde_json::from_str(&raw.stdout).map_err(|source| {
93            tracing::warn!(
94                plugin_id = %provider.plugin_id,
95                command = %command,
96                error = %source,
97                "plugin command returned invalid JSON"
98            );
99            PluginDispatchError::InvalidJsonResponse {
100                plugin_id: provider.plugin_id.clone(),
101                source,
102            }
103        })?;
104
105        response.validate_v1().map_err(|reason| {
106            tracing::warn!(
107                plugin_id = %provider.plugin_id,
108                command = %command,
109                reason = %reason,
110                "plugin command returned invalid payload"
111            );
112            PluginDispatchError::InvalidResponsePayload {
113                plugin_id: provider.plugin_id.clone(),
114                reason,
115            }
116        })?;
117
118        Ok(response)
119    }
120
121    /// Runs a plugin command and returns raw stdout, stderr, and exit status.
122    ///
123    /// Unlike [`PluginManager::dispatch`], this does not attempt to decode or
124    /// validate plugin JSON output. Non-zero exit codes are returned in
125    /// [`RawPluginOutput::status_code`] rather than surfaced as
126    /// [`PluginDispatchError::NonZeroExit`].
127    ///
128    /// # Errors
129    ///
130    /// Returns [`PluginDispatchError`] when provider resolution fails, the
131    /// plugin subprocess cannot be executed, or the subprocess times out.
132    ///
133    /// # Examples
134    ///
135    /// ```
136    /// use osp_cli::plugin::{PluginDispatchContext, PluginDispatchError, PluginManager};
137    ///
138    /// let err = PluginManager::new(Vec::new())
139    ///     .dispatch_passthrough("shared", &[], &PluginDispatchContext::default())
140    ///     .unwrap_err();
141    ///
142    /// assert!(matches!(err, PluginDispatchError::CommandNotFound { .. }));
143    /// ```
144    pub fn dispatch_passthrough(
145        &self,
146        command: &str,
147        args: &[String],
148        context: &PluginDispatchContext,
149    ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
150        let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
151        run_provider(&provider, command, args, context, self.process_timeout)
152    }
153}
154
155pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
156    let mut command = Command::new(path);
157    command.arg("--describe");
158    let started_at = Instant::now();
159    tracing::debug!(
160        executable = %path.display(),
161        timeout_ms = timeout.as_millis(),
162        "running plugin describe"
163    );
164    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
165        CommandRunError::Execute(source) => {
166            tracing::warn!(
167                executable = %path.display(),
168                error = %source,
169                "plugin describe execution failed"
170            );
171            anyhow!(
172                "failed to execute --describe for {}: {source}",
173                path.display()
174            )
175        }
176        CommandRunError::TimedOut { timeout, stderr } => {
177            let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
178            tracing::warn!(
179                executable = %path.display(),
180                timeout_ms = timeout.as_millis(),
181                stderr = %stderr,
182                "plugin describe timed out"
183            );
184            if stderr.is_empty() {
185                anyhow!(
186                    "--describe timed out after {} ms for {}",
187                    timeout.as_millis(),
188                    path.display()
189                )
190            } else {
191                anyhow!(
192                    "--describe timed out after {} ms for {}: {}",
193                    timeout.as_millis(),
194                    path.display(),
195                    stderr
196                )
197            }
198        }
199    })?;
200
201    tracing::debug!(
202        executable = %path.display(),
203        elapsed_ms = started_at.elapsed().as_millis(),
204        status = ?output.status.code(),
205        "plugin describe completed"
206    );
207
208    if !output.status.success() {
209        let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
210        let message = if stderr.is_empty() {
211            format!("--describe failed with status {}", output.status)
212        } else {
213            format!(
214                "--describe failed with status {}: {}",
215                output.status, stderr
216            )
217        };
218        return Err(anyhow!(message));
219    }
220
221    let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
222        .map_err(anyhow::Error::from)
223        .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
224    describe
225        .validate_v1()
226        .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
227
228    Ok(describe)
229}
230
231pub(super) fn run_provider(
232    provider: &DiscoveredPlugin,
233    selected_command: &str,
234    args: &[String],
235    context: &PluginDispatchContext,
236    timeout: Duration,
237) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
238    let mut command = Command::new(&provider.executable);
239    let started_at = Instant::now();
240    tracing::debug!(
241        plugin_id = %provider.plugin_id,
242        executable = %provider.executable.display(),
243        command = %selected_command,
244        arg_count = args.len(),
245        timeout_ms = timeout.as_millis(),
246        "dispatching plugin command"
247    );
248    command.arg(selected_command);
249    command.args(args);
250    command.env(ENV_OSP_COMMAND, selected_command);
251    for (key, value) in context.runtime_hints.env_pairs() {
252        command.env(key, value);
253    }
254    for (key, value) in context.env_pairs_for(&provider.plugin_id) {
255        command.env(key, value);
256    }
257
258    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
259        CommandRunError::Execute(source) => {
260            tracing::warn!(
261                plugin_id = %provider.plugin_id,
262                executable = %provider.executable.display(),
263                command = %selected_command,
264                error = %source,
265                "plugin command execution failed"
266            );
267            PluginDispatchError::ExecuteFailed {
268                plugin_id: provider.plugin_id.clone(),
269                source,
270            }
271        }
272        CommandRunError::TimedOut { timeout, stderr } => {
273            let stderr_text = String::from_utf8_lossy(&stderr).to_string();
274            tracing::warn!(
275                plugin_id = %provider.plugin_id,
276                executable = %provider.executable.display(),
277                command = %selected_command,
278                timeout_ms = timeout.as_millis(),
279                stderr = %stderr_text.trim(),
280                "plugin command timed out"
281            );
282            PluginDispatchError::TimedOut {
283                plugin_id: provider.plugin_id.clone(),
284                timeout,
285                stderr: stderr_text,
286            }
287        }
288    })?;
289
290    tracing::debug!(
291        plugin_id = %provider.plugin_id,
292        executable = %provider.executable.display(),
293        command = %selected_command,
294        elapsed_ms = started_at.elapsed().as_millis(),
295        status = ?output.status.code(),
296        "plugin command completed"
297    );
298
299    Ok(RawPluginOutput {
300        status_code: output.status.code().unwrap_or(1),
301        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
302        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
303    })
304}
305
306fn run_command_with_timeout(
307    mut command: Command,
308    timeout: Duration,
309) -> Result<Output, CommandRunError> {
310    configure_command_process_group(&mut command);
311    command.stdin(Stdio::null());
312    command.stdout(Stdio::piped());
313    command.stderr(Stdio::piped());
314
315    let mut child = DrainedChild::spawn(command).map_err(CommandRunError::Execute)?;
316    let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
317
318    loop {
319        match child.try_wait() {
320            Ok(Some(status)) => return child.finish(status).map_err(CommandRunError::Execute),
321            Ok(None) if Instant::now() < deadline => {
322                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
323            }
324            Ok(None) => {
325                terminate_timed_out_child(child.child_mut());
326                let status = child.wait().map_err(CommandRunError::Execute)?;
327                let output = child.finish(status).map_err(CommandRunError::Execute)?;
328                return Err(CommandRunError::TimedOut {
329                    timeout,
330                    stderr: output.stderr,
331                });
332            }
333            Err(source) => return Err(CommandRunError::Execute(source)),
334        }
335    }
336}
337
338struct DrainedChild {
339    child: Child,
340    stdout: JoinHandle<std::io::Result<Vec<u8>>>,
341    stderr: JoinHandle<std::io::Result<Vec<u8>>>,
342}
343
344impl DrainedChild {
345    fn spawn(mut command: Command) -> std::io::Result<Self> {
346        let mut child = spawn_command_with_retry(&mut command)?;
347        let stdout = child
348            .stdout
349            .take()
350            .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
351        let stderr = child
352            .stderr
353            .take()
354            .ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;
355        Ok(Self {
356            child,
357            stdout: spawn_capture_thread(stdout),
358            stderr: spawn_capture_thread(stderr),
359        })
360    }
361
362    fn child_mut(&mut self) -> &mut Child {
363        &mut self.child
364    }
365
366    fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
367        self.child.try_wait()
368    }
369
370    fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
371        self.child.wait()
372    }
373
374    fn finish(self, status: std::process::ExitStatus) -> std::io::Result<Output> {
375        Ok(Output {
376            status,
377            stdout: join_capture(self.stdout)?,
378            stderr: join_capture(self.stderr)?,
379        })
380    }
381}
382
383fn spawn_capture_thread<R>(mut reader: R) -> JoinHandle<std::io::Result<Vec<u8>>>
384where
385    R: Read + Send + 'static,
386{
387    thread::spawn(move || {
388        let mut buffer = Vec::new();
389        reader.read_to_end(&mut buffer)?;
390        Ok(buffer)
391    })
392}
393
394fn join_capture(handle: JoinHandle<std::io::Result<Vec<u8>>>) -> std::io::Result<Vec<u8>> {
395    handle
396        .join()
397        .map_err(|_| std::io::Error::other("plugin output capture thread panicked"))?
398}
399
400fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
401    for attempt in 0..=ETXTBSY_RETRY_COUNT {
402        match command.spawn() {
403            Ok(child) => return Ok(child),
404            Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
405                thread::sleep(ETXTBSY_RETRY_DELAY);
406            }
407            Err(err) => return Err(err),
408        }
409    }
410
411    Err(std::io::Error::other(
412        "plugin spawn retry loop exhausted unexpectedly",
413    ))
414}
415
416fn is_text_file_busy(err: &std::io::Error) -> bool {
417    err.raw_os_error() == Some(26)
418}
419
420#[cfg(unix)]
421fn configure_command_process_group(command: &mut Command) {
422    use std::os::unix::process::CommandExt;
423
424    command.process_group(0);
425}
426
427#[cfg(not(unix))]
428fn configure_command_process_group(_command: &mut Command) {}
429
430#[cfg(unix)]
431fn terminate_timed_out_child(child: &mut Child) {
432    const SIGTERM: i32 = 15;
433    const SIGKILL: i32 = 9;
434
435    let process_group = child.id() as i32;
436    let _ = signal_process_group(process_group, SIGTERM);
437    let grace_deadline = Instant::now() + Duration::from_millis(50);
438
439    loop {
440        match child.try_wait() {
441            Ok(Some(_)) => return,
442            Ok(None) if Instant::now() < grace_deadline => {
443                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
444            }
445            Ok(None) | Err(_) => break,
446        }
447    }
448
449    let _ = signal_process_group(process_group, SIGKILL);
450}
451
452#[cfg(not(unix))]
453fn terminate_timed_out_child(child: &mut Child) {
454    let _ = child.kill();
455}
456
457#[cfg(unix)]
458fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
459    unsafe extern "C" {
460        fn kill(pid: i32, sig: i32) -> i32;
461    }
462
463    let result = unsafe { kill(-process_group, signal) };
464    if result == 0 {
465        Ok(())
466    } else {
467        Err(std::io::Error::last_os_error())
468    }
469}