Skip to main content

osp_cli/plugin/
dispatch.rs

1use super::manager::{
2    DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
3};
4use crate::core::plugin::{DescribeV1, ResponseV1};
5use anyhow::{Result, anyhow};
6use std::process::{Child, Command, Output, Stdio};
7use std::thread;
8use std::time::{Duration, Instant};
9
10const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
11const ETXTBSY_RETRY_COUNT: usize = 5;
12const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
13const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
14
15enum CommandRunError {
16    Execute(std::io::Error),
17    TimedOut { timeout: Duration, stderr: Vec<u8> },
18}
19
20impl PluginManager {
21    pub fn dispatch(
22        &self,
23        command: &str,
24        args: &[String],
25        context: &PluginDispatchContext,
26    ) -> std::result::Result<ResponseV1, PluginDispatchError> {
27        let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
28
29        let raw = run_provider(&provider, command, args, context, self.process_timeout)?;
30        if raw.status_code != 0 {
31            tracing::warn!(
32                plugin_id = %provider.plugin_id,
33                command = %command,
34                status_code = raw.status_code,
35                stderr = %raw.stderr.trim(),
36                "plugin command exited with non-zero status"
37            );
38            return Err(PluginDispatchError::NonZeroExit {
39                plugin_id: provider.plugin_id.clone(),
40                status_code: raw.status_code,
41                stderr: raw.stderr,
42            });
43        }
44
45        let response: ResponseV1 = serde_json::from_str(&raw.stdout).map_err(|source| {
46            tracing::warn!(
47                plugin_id = %provider.plugin_id,
48                command = %command,
49                error = %source,
50                "plugin command returned invalid JSON"
51            );
52            PluginDispatchError::InvalidJsonResponse {
53                plugin_id: provider.plugin_id.clone(),
54                source,
55            }
56        })?;
57
58        response.validate_v1().map_err(|reason| {
59            tracing::warn!(
60                plugin_id = %provider.plugin_id,
61                command = %command,
62                reason = %reason,
63                "plugin command returned invalid payload"
64            );
65            PluginDispatchError::InvalidResponsePayload {
66                plugin_id: provider.plugin_id.clone(),
67                reason,
68            }
69        })?;
70
71        Ok(response)
72    }
73
74    pub fn dispatch_passthrough(
75        &self,
76        command: &str,
77        args: &[String],
78        context: &PluginDispatchContext,
79    ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
80        let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
81        run_provider(&provider, command, args, context, self.process_timeout)
82    }
83}
84
85pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
86    let mut command = Command::new(path);
87    command.arg("--describe");
88    let started_at = Instant::now();
89    tracing::debug!(
90        executable = %path.display(),
91        timeout_ms = timeout.as_millis(),
92        "running plugin describe"
93    );
94    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
95        CommandRunError::Execute(source) => {
96            tracing::warn!(
97                executable = %path.display(),
98                error = %source,
99                "plugin describe execution failed"
100            );
101            anyhow!(
102                "failed to execute --describe for {}: {source}",
103                path.display()
104            )
105        }
106        CommandRunError::TimedOut { timeout, stderr } => {
107            let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
108            tracing::warn!(
109                executable = %path.display(),
110                timeout_ms = timeout.as_millis(),
111                stderr = %stderr,
112                "plugin describe timed out"
113            );
114            if stderr.is_empty() {
115                anyhow!(
116                    "--describe timed out after {} ms for {}",
117                    timeout.as_millis(),
118                    path.display()
119                )
120            } else {
121                anyhow!(
122                    "--describe timed out after {} ms for {}: {}",
123                    timeout.as_millis(),
124                    path.display(),
125                    stderr
126                )
127            }
128        }
129    })?;
130
131    tracing::debug!(
132        executable = %path.display(),
133        elapsed_ms = started_at.elapsed().as_millis(),
134        status = ?output.status.code(),
135        "plugin describe completed"
136    );
137
138    if !output.status.success() {
139        let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
140        let message = if stderr.is_empty() {
141            format!("--describe failed with status {}", output.status)
142        } else {
143            format!(
144                "--describe failed with status {}: {}",
145                output.status, stderr
146            )
147        };
148        return Err(anyhow!(message));
149    }
150
151    let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
152        .map_err(anyhow::Error::from)
153        .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
154    describe
155        .validate_v1()
156        .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
157
158    Ok(describe)
159}
160
161pub(super) fn run_provider(
162    provider: &DiscoveredPlugin,
163    selected_command: &str,
164    args: &[String],
165    context: &PluginDispatchContext,
166    timeout: Duration,
167) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
168    let mut command = Command::new(&provider.executable);
169    let started_at = Instant::now();
170    tracing::debug!(
171        plugin_id = %provider.plugin_id,
172        executable = %provider.executable.display(),
173        command = %selected_command,
174        arg_count = args.len(),
175        timeout_ms = timeout.as_millis(),
176        "dispatching plugin command"
177    );
178    command.arg(selected_command);
179    command.args(args);
180    command.env(ENV_OSP_COMMAND, selected_command);
181    for (key, value) in context.runtime_hints.env_pairs() {
182        command.env(key, value);
183    }
184    for (key, value) in context.env_pairs_for(&provider.plugin_id) {
185        command.env(key, value);
186    }
187
188    let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
189        CommandRunError::Execute(source) => {
190            tracing::warn!(
191                plugin_id = %provider.plugin_id,
192                executable = %provider.executable.display(),
193                command = %selected_command,
194                error = %source,
195                "plugin command execution failed"
196            );
197            PluginDispatchError::ExecuteFailed {
198                plugin_id: provider.plugin_id.clone(),
199                source,
200            }
201        }
202        CommandRunError::TimedOut { timeout, stderr } => {
203            let stderr_text = String::from_utf8_lossy(&stderr).to_string();
204            tracing::warn!(
205                plugin_id = %provider.plugin_id,
206                executable = %provider.executable.display(),
207                command = %selected_command,
208                timeout_ms = timeout.as_millis(),
209                stderr = %stderr_text.trim(),
210                "plugin command timed out"
211            );
212            PluginDispatchError::TimedOut {
213                plugin_id: provider.plugin_id.clone(),
214                timeout,
215                stderr: stderr_text,
216            }
217        }
218    })?;
219
220    tracing::debug!(
221        plugin_id = %provider.plugin_id,
222        executable = %provider.executable.display(),
223        command = %selected_command,
224        elapsed_ms = started_at.elapsed().as_millis(),
225        status = ?output.status.code(),
226        "plugin command completed"
227    );
228
229    Ok(RawPluginOutput {
230        status_code: output.status.code().unwrap_or(1),
231        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
232        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
233    })
234}
235
236fn run_command_with_timeout(
237    mut command: Command,
238    timeout: Duration,
239) -> Result<Output, CommandRunError> {
240    configure_command_process_group(&mut command);
241    command.stdin(Stdio::null());
242    command.stdout(Stdio::piped());
243    command.stderr(Stdio::piped());
244
245    let mut child = spawn_command_with_retry(&mut command).map_err(CommandRunError::Execute)?;
246    let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
247
248    loop {
249        match child.try_wait() {
250            Ok(Some(_)) => return child.wait_with_output().map_err(CommandRunError::Execute),
251            Ok(None) if Instant::now() < deadline => {
252                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
253            }
254            Ok(None) => {
255                terminate_timed_out_child(&mut child);
256                let output = child.wait_with_output().map_err(CommandRunError::Execute)?;
257                return Err(CommandRunError::TimedOut {
258                    timeout,
259                    stderr: output.stderr,
260                });
261            }
262            Err(source) => return Err(CommandRunError::Execute(source)),
263        }
264    }
265}
266
267fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
268    for attempt in 0..=ETXTBSY_RETRY_COUNT {
269        match command.spawn() {
270            Ok(child) => return Ok(child),
271            Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
272                thread::sleep(ETXTBSY_RETRY_DELAY);
273            }
274            Err(err) => return Err(err),
275        }
276    }
277
278    unreachable!("retry loop should always return or error");
279}
280
281fn is_text_file_busy(err: &std::io::Error) -> bool {
282    err.raw_os_error() == Some(26)
283}
284
285#[cfg(unix)]
286fn configure_command_process_group(command: &mut Command) {
287    use std::os::unix::process::CommandExt;
288
289    command.process_group(0);
290}
291
292#[cfg(not(unix))]
293fn configure_command_process_group(_command: &mut Command) {}
294
295#[cfg(unix)]
296fn terminate_timed_out_child(child: &mut Child) {
297    const SIGTERM: i32 = 15;
298    const SIGKILL: i32 = 9;
299
300    let process_group = child.id() as i32;
301    let _ = signal_process_group(process_group, SIGTERM);
302    let grace_deadline = Instant::now() + Duration::from_millis(50);
303
304    loop {
305        match child.try_wait() {
306            Ok(Some(_)) => return,
307            Ok(None) if Instant::now() < grace_deadline => {
308                thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
309            }
310            Ok(None) | Err(_) => break,
311        }
312    }
313
314    let _ = signal_process_group(process_group, SIGKILL);
315}
316
317#[cfg(not(unix))]
318fn terminate_timed_out_child(child: &mut Child) {
319    let _ = child.kill();
320}
321
322#[cfg(unix)]
323fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
324    unsafe extern "C" {
325        fn kill(pid: i32, sig: i32) -> i32;
326    }
327
328    let result = unsafe { kill(-process_group, signal) };
329    if result == 0 {
330        Ok(())
331    } else {
332        Err(std::io::Error::last_os_error())
333    }
334}