Skip to main content

osp_cli/plugin/
dispatch.rs

1//! Plugin process dispatch and validation boundary.
2//!
3//! This module exists so the rest of the app can treat plugin commands as
4//! structured responses instead of hand-managing child processes, timeouts, and
5//! payload validation everywhere.
6//!
7//! High-level flow:
8//!
9//! - resolve the provider that should handle a command
10//! - execute it with the correct environment and timeout policy
11//! - capture stdout/stderr and exit status
12//! - validate the returned JSON payload before handing it back to the host
13//!
14//! Contract:
15//!
16//! - subprocess spawning and timeout policy live here
17//! - higher layers should consume validated plugin results instead of shelling
18//!   out directly
19//! - plugin DTO validation should stay aligned with [`crate::core::plugin`]
20
21use super::manager::{
22    DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
23};
24use crate::core::plugin::{DescribeV1, ResponseV1};
25use anyhow::{Result, anyhow};
26use std::io::Read;
27use std::process::{Child, Command, Output, Stdio};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31
32const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
33const ETXTBSY_RETRY_COUNT: usize = 5;
34const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
35const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
36
37enum CommandRunError {
38    Execute(std::io::Error),
39    TimedOut { timeout: Duration, stderr: Vec<u8> },
40}
41
42impl PluginManager {
43    /// Runs a plugin command and returns its validated structured response.
44    pub fn dispatch(
45        &self,
46        command: &str,
47        args: &[String],
48        context: &PluginDispatchContext,
49    ) -> std::result::Result<ResponseV1, PluginDispatchError> {
50        let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
51
52        let raw = run_provider(&provider, command, args, context, self.process_timeout)?;
53        if raw.status_code != 0 {
54            tracing::warn!(
55                plugin_id = %provider.plugin_id,
56                command = %command,
57                status_code = raw.status_code,
58                stderr = %raw.stderr.trim(),
59                "plugin command exited with non-zero status"
60            );
61            return Err(PluginDispatchError::NonZeroExit {
62                plugin_id: provider.plugin_id.clone(),
63                status_code: raw.status_code,
64                stderr: raw.stderr,
65            });
66        }
67
68        let response: ResponseV1 = serde_json::from_str(&raw.stdout).map_err(|source| {
69            tracing::warn!(
70                plugin_id = %provider.plugin_id,
71                command = %command,
72                error = %source,
73                "plugin command returned invalid JSON"
74            );
75            PluginDispatchError::InvalidJsonResponse {
76                plugin_id: provider.plugin_id.clone(),
77                source,
78            }
79        })?;
80
81        response.validate_v1().map_err(|reason| {
82            tracing::warn!(
83                plugin_id = %provider.plugin_id,
84                command = %command,
85                reason = %reason,
86                "plugin command returned invalid payload"
87            );
88            PluginDispatchError::InvalidResponsePayload {
89                plugin_id: provider.plugin_id.clone(),
90                reason,
91            }
92        })?;
93
94        Ok(response)
95    }
96
97    /// Runs a plugin command and returns raw stdout, stderr, and exit status.
98    pub fn dispatch_passthrough(
99        &self,
100        command: &str,
101        args: &[String],
102        context: &PluginDispatchContext,
103    ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
104        let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
105        run_provider(&provider, command, args, context, self.process_timeout)
106    }
107}
108
109pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
110    let mut command = Command::new(path);
111    command.arg("--describe");
112    let started_at = Instant::now();
113    tracing::debug!(
114        executable = %path.display(),
115        timeout_ms = timeout.as_millis(),
116        "running plugin describe"
117    );
118    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
119        CommandRunError::Execute(source) => {
120            tracing::warn!(
121                executable = %path.display(),
122                error = %source,
123                "plugin describe execution failed"
124            );
125            anyhow!(
126                "failed to execute --describe for {}: {source}",
127                path.display()
128            )
129        }
130        CommandRunError::TimedOut { timeout, stderr } => {
131            let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
132            tracing::warn!(
133                executable = %path.display(),
134                timeout_ms = timeout.as_millis(),
135                stderr = %stderr,
136                "plugin describe timed out"
137            );
138            if stderr.is_empty() {
139                anyhow!(
140                    "--describe timed out after {} ms for {}",
141                    timeout.as_millis(),
142                    path.display()
143                )
144            } else {
145                anyhow!(
146                    "--describe timed out after {} ms for {}: {}",
147                    timeout.as_millis(),
148                    path.display(),
149                    stderr
150                )
151            }
152        }
153    })?;
154
155    tracing::debug!(
156        executable = %path.display(),
157        elapsed_ms = started_at.elapsed().as_millis(),
158        status = ?output.status.code(),
159        "plugin describe completed"
160    );
161
162    if !output.status.success() {
163        let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
164        let message = if stderr.is_empty() {
165            format!("--describe failed with status {}", output.status)
166        } else {
167            format!(
168                "--describe failed with status {}: {}",
169                output.status, stderr
170            )
171        };
172        return Err(anyhow!(message));
173    }
174
175    let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
176        .map_err(anyhow::Error::from)
177        .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
178    describe
179        .validate_v1()
180        .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
181
182    Ok(describe)
183}
184
185pub(super) fn run_provider(
186    provider: &DiscoveredPlugin,
187    selected_command: &str,
188    args: &[String],
189    context: &PluginDispatchContext,
190    timeout: Duration,
191) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
192    let mut command = Command::new(&provider.executable);
193    let started_at = Instant::now();
194    tracing::debug!(
195        plugin_id = %provider.plugin_id,
196        executable = %provider.executable.display(),
197        command = %selected_command,
198        arg_count = args.len(),
199        timeout_ms = timeout.as_millis(),
200        "dispatching plugin command"
201    );
202    command.arg(selected_command);
203    command.args(args);
204    command.env(ENV_OSP_COMMAND, selected_command);
205    for (key, value) in context.runtime_hints.env_pairs() {
206        command.env(key, value);
207    }
208    for (key, value) in context.env_pairs_for(&provider.plugin_id) {
209        command.env(key, value);
210    }
211
212    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
213        CommandRunError::Execute(source) => {
214            tracing::warn!(
215                plugin_id = %provider.plugin_id,
216                executable = %provider.executable.display(),
217                command = %selected_command,
218                error = %source,
219                "plugin command execution failed"
220            );
221            PluginDispatchError::ExecuteFailed {
222                plugin_id: provider.plugin_id.clone(),
223                source,
224            }
225        }
226        CommandRunError::TimedOut { timeout, stderr } => {
227            let stderr_text = String::from_utf8_lossy(&stderr).to_string();
228            tracing::warn!(
229                plugin_id = %provider.plugin_id,
230                executable = %provider.executable.display(),
231                command = %selected_command,
232                timeout_ms = timeout.as_millis(),
233                stderr = %stderr_text.trim(),
234                "plugin command timed out"
235            );
236            PluginDispatchError::TimedOut {
237                plugin_id: provider.plugin_id.clone(),
238                timeout,
239                stderr: stderr_text,
240            }
241        }
242    })?;
243
244    tracing::debug!(
245        plugin_id = %provider.plugin_id,
246        executable = %provider.executable.display(),
247        command = %selected_command,
248        elapsed_ms = started_at.elapsed().as_millis(),
249        status = ?output.status.code(),
250        "plugin command completed"
251    );
252
253    Ok(RawPluginOutput {
254        status_code: output.status.code().unwrap_or(1),
255        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
256        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
257    })
258}
259
260fn run_command_with_timeout(
261    mut command: Command,
262    timeout: Duration,
263) -> Result<Output, CommandRunError> {
264    configure_command_process_group(&mut command);
265    command.stdin(Stdio::null());
266    command.stdout(Stdio::piped());
267    command.stderr(Stdio::piped());
268
269    let mut child = DrainedChild::spawn(command).map_err(CommandRunError::Execute)?;
270    let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
271
272    loop {
273        match child.try_wait() {
274            Ok(Some(status)) => return child.finish(status).map_err(CommandRunError::Execute),
275            Ok(None) if Instant::now() < deadline => {
276                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
277            }
278            Ok(None) => {
279                terminate_timed_out_child(child.child_mut());
280                let status = child.wait().map_err(CommandRunError::Execute)?;
281                let output = child.finish(status).map_err(CommandRunError::Execute)?;
282                return Err(CommandRunError::TimedOut {
283                    timeout,
284                    stderr: output.stderr,
285                });
286            }
287            Err(source) => return Err(CommandRunError::Execute(source)),
288        }
289    }
290}
291
292struct DrainedChild {
293    child: Child,
294    stdout: JoinHandle<std::io::Result<Vec<u8>>>,
295    stderr: JoinHandle<std::io::Result<Vec<u8>>>,
296}
297
298impl DrainedChild {
299    fn spawn(mut command: Command) -> std::io::Result<Self> {
300        let mut child = spawn_command_with_retry(&mut command)?;
301        let stdout = child
302            .stdout
303            .take()
304            .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
305        let stderr = child
306            .stderr
307            .take()
308            .ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;
309        Ok(Self {
310            child,
311            stdout: spawn_capture_thread(stdout),
312            stderr: spawn_capture_thread(stderr),
313        })
314    }
315
316    fn child_mut(&mut self) -> &mut Child {
317        &mut self.child
318    }
319
320    fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
321        self.child.try_wait()
322    }
323
324    fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
325        self.child.wait()
326    }
327
328    fn finish(self, status: std::process::ExitStatus) -> std::io::Result<Output> {
329        Ok(Output {
330            status,
331            stdout: join_capture(self.stdout)?,
332            stderr: join_capture(self.stderr)?,
333        })
334    }
335}
336
337fn spawn_capture_thread<R>(mut reader: R) -> JoinHandle<std::io::Result<Vec<u8>>>
338where
339    R: Read + Send + 'static,
340{
341    thread::spawn(move || {
342        let mut buffer = Vec::new();
343        reader.read_to_end(&mut buffer)?;
344        Ok(buffer)
345    })
346}
347
348fn join_capture(handle: JoinHandle<std::io::Result<Vec<u8>>>) -> std::io::Result<Vec<u8>> {
349    handle
350        .join()
351        .map_err(|_| std::io::Error::other("plugin output capture thread panicked"))?
352}
353
354fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
355    for attempt in 0..=ETXTBSY_RETRY_COUNT {
356        match command.spawn() {
357            Ok(child) => return Ok(child),
358            Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
359                thread::sleep(ETXTBSY_RETRY_DELAY);
360            }
361            Err(err) => return Err(err),
362        }
363    }
364
365    unreachable!("retry loop should always return or error");
366}
367
368fn is_text_file_busy(err: &std::io::Error) -> bool {
369    err.raw_os_error() == Some(26)
370}
371
372#[cfg(unix)]
373fn configure_command_process_group(command: &mut Command) {
374    use std::os::unix::process::CommandExt;
375
376    command.process_group(0);
377}
378
379#[cfg(not(unix))]
380fn configure_command_process_group(_command: &mut Command) {}
381
382#[cfg(unix)]
383fn terminate_timed_out_child(child: &mut Child) {
384    const SIGTERM: i32 = 15;
385    const SIGKILL: i32 = 9;
386
387    let process_group = child.id() as i32;
388    let _ = signal_process_group(process_group, SIGTERM);
389    let grace_deadline = Instant::now() + Duration::from_millis(50);
390
391    loop {
392        match child.try_wait() {
393            Ok(Some(_)) => return,
394            Ok(None) if Instant::now() < grace_deadline => {
395                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
396            }
397            Ok(None) | Err(_) => break,
398        }
399    }
400
401    let _ = signal_process_group(process_group, SIGKILL);
402}
403
404#[cfg(not(unix))]
405fn terminate_timed_out_child(child: &mut Child) {
406    let _ = child.kill();
407}
408
409#[cfg(unix)]
410fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
411    unsafe extern "C" {
412        fn kill(pid: i32, sig: i32) -> i32;
413    }
414
415    let result = unsafe { kill(-process_group, signal) };
416    if result == 0 {
417        Ok(())
418    } else {
419        Err(std::io::Error::last_os_error())
420    }
421}