1use super::manager::{
2 DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
3};
4use crate::core::plugin::{DescribeV1, ResponseV1};
5use anyhow::{Result, anyhow};
6use std::process::{Child, Command, Output, Stdio};
7use std::thread;
8use std::time::{Duration, Instant};
9
10const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
11const ETXTBSY_RETRY_COUNT: usize = 5;
12const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
13const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
14
15enum CommandRunError {
16 Execute(std::io::Error),
17 TimedOut { timeout: Duration, stderr: Vec<u8> },
18}
19
20impl PluginManager {
21 pub fn dispatch(
22 &self,
23 command: &str,
24 args: &[String],
25 context: &PluginDispatchContext,
26 ) -> std::result::Result<ResponseV1, PluginDispatchError> {
27 let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
28
29 let raw = run_provider(&provider, command, args, context, self.process_timeout)?;
30 if raw.status_code != 0 {
31 tracing::warn!(
32 plugin_id = %provider.plugin_id,
33 command = %command,
34 status_code = raw.status_code,
35 stderr = %raw.stderr.trim(),
36 "plugin command exited with non-zero status"
37 );
38 return Err(PluginDispatchError::NonZeroExit {
39 plugin_id: provider.plugin_id.clone(),
40 status_code: raw.status_code,
41 stderr: raw.stderr,
42 });
43 }
44
45 let response: ResponseV1 = serde_json::from_str(&raw.stdout).map_err(|source| {
46 tracing::warn!(
47 plugin_id = %provider.plugin_id,
48 command = %command,
49 error = %source,
50 "plugin command returned invalid JSON"
51 );
52 PluginDispatchError::InvalidJsonResponse {
53 plugin_id: provider.plugin_id.clone(),
54 source,
55 }
56 })?;
57
58 response.validate_v1().map_err(|reason| {
59 tracing::warn!(
60 plugin_id = %provider.plugin_id,
61 command = %command,
62 reason = %reason,
63 "plugin command returned invalid payload"
64 );
65 PluginDispatchError::InvalidResponsePayload {
66 plugin_id: provider.plugin_id.clone(),
67 reason,
68 }
69 })?;
70
71 Ok(response)
72 }
73
74 pub fn dispatch_passthrough(
75 &self,
76 command: &str,
77 args: &[String],
78 context: &PluginDispatchContext,
79 ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
80 let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
81 run_provider(&provider, command, args, context, self.process_timeout)
82 }
83}
84
85pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
86 let mut command = Command::new(path);
87 command.arg("--describe");
88 let started_at = Instant::now();
89 tracing::debug!(
90 executable = %path.display(),
91 timeout_ms = timeout.as_millis(),
92 "running plugin describe"
93 );
94 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
95 CommandRunError::Execute(source) => {
96 tracing::warn!(
97 executable = %path.display(),
98 error = %source,
99 "plugin describe execution failed"
100 );
101 anyhow!(
102 "failed to execute --describe for {}: {source}",
103 path.display()
104 )
105 }
106 CommandRunError::TimedOut { timeout, stderr } => {
107 let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
108 tracing::warn!(
109 executable = %path.display(),
110 timeout_ms = timeout.as_millis(),
111 stderr = %stderr,
112 "plugin describe timed out"
113 );
114 if stderr.is_empty() {
115 anyhow!(
116 "--describe timed out after {} ms for {}",
117 timeout.as_millis(),
118 path.display()
119 )
120 } else {
121 anyhow!(
122 "--describe timed out after {} ms for {}: {}",
123 timeout.as_millis(),
124 path.display(),
125 stderr
126 )
127 }
128 }
129 })?;
130
131 tracing::debug!(
132 executable = %path.display(),
133 elapsed_ms = started_at.elapsed().as_millis(),
134 status = ?output.status.code(),
135 "plugin describe completed"
136 );
137
138 if !output.status.success() {
139 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
140 let message = if stderr.is_empty() {
141 format!("--describe failed with status {}", output.status)
142 } else {
143 format!(
144 "--describe failed with status {}: {}",
145 output.status, stderr
146 )
147 };
148 return Err(anyhow!(message));
149 }
150
151 let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
152 .map_err(anyhow::Error::from)
153 .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
154 describe
155 .validate_v1()
156 .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
157
158 Ok(describe)
159}
160
161pub(super) fn run_provider(
162 provider: &DiscoveredPlugin,
163 selected_command: &str,
164 args: &[String],
165 context: &PluginDispatchContext,
166 timeout: Duration,
167) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
168 let mut command = Command::new(&provider.executable);
169 let started_at = Instant::now();
170 tracing::debug!(
171 plugin_id = %provider.plugin_id,
172 executable = %provider.executable.display(),
173 command = %selected_command,
174 arg_count = args.len(),
175 timeout_ms = timeout.as_millis(),
176 "dispatching plugin command"
177 );
178 command.arg(selected_command);
179 command.args(args);
180 command.env(ENV_OSP_COMMAND, selected_command);
181 for (key, value) in context.runtime_hints.env_pairs() {
182 command.env(key, value);
183 }
184 for (key, value) in context.env_pairs_for(&provider.plugin_id) {
185 command.env(key, value);
186 }
187
188 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
189 CommandRunError::Execute(source) => {
190 tracing::warn!(
191 plugin_id = %provider.plugin_id,
192 executable = %provider.executable.display(),
193 command = %selected_command,
194 error = %source,
195 "plugin command execution failed"
196 );
197 PluginDispatchError::ExecuteFailed {
198 plugin_id: provider.plugin_id.clone(),
199 source,
200 }
201 }
202 CommandRunError::TimedOut { timeout, stderr } => {
203 let stderr_text = String::from_utf8_lossy(&stderr).to_string();
204 tracing::warn!(
205 plugin_id = %provider.plugin_id,
206 executable = %provider.executable.display(),
207 command = %selected_command,
208 timeout_ms = timeout.as_millis(),
209 stderr = %stderr_text.trim(),
210 "plugin command timed out"
211 );
212 PluginDispatchError::TimedOut {
213 plugin_id: provider.plugin_id.clone(),
214 timeout,
215 stderr: stderr_text,
216 }
217 }
218 })?;
219
220 tracing::debug!(
221 plugin_id = %provider.plugin_id,
222 executable = %provider.executable.display(),
223 command = %selected_command,
224 elapsed_ms = started_at.elapsed().as_millis(),
225 status = ?output.status.code(),
226 "plugin command completed"
227 );
228
229 Ok(RawPluginOutput {
230 status_code: output.status.code().unwrap_or(1),
231 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
232 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
233 })
234}
235
236fn run_command_with_timeout(
237 mut command: Command,
238 timeout: Duration,
239) -> Result<Output, CommandRunError> {
240 configure_command_process_group(&mut command);
241 command.stdin(Stdio::null());
242 command.stdout(Stdio::piped());
243 command.stderr(Stdio::piped());
244
245 let mut child = spawn_command_with_retry(&mut command).map_err(CommandRunError::Execute)?;
246 let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
247
248 loop {
249 match child.try_wait() {
250 Ok(Some(_)) => return child.wait_with_output().map_err(CommandRunError::Execute),
251 Ok(None) if Instant::now() < deadline => {
252 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
253 }
254 Ok(None) => {
255 terminate_timed_out_child(&mut child);
256 let output = child.wait_with_output().map_err(CommandRunError::Execute)?;
257 return Err(CommandRunError::TimedOut {
258 timeout,
259 stderr: output.stderr,
260 });
261 }
262 Err(source) => return Err(CommandRunError::Execute(source)),
263 }
264 }
265}
266
267fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
268 for attempt in 0..=ETXTBSY_RETRY_COUNT {
269 match command.spawn() {
270 Ok(child) => return Ok(child),
271 Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
272 thread::sleep(ETXTBSY_RETRY_DELAY);
273 }
274 Err(err) => return Err(err),
275 }
276 }
277
278 unreachable!("retry loop should always return or error");
279}
280
281fn is_text_file_busy(err: &std::io::Error) -> bool {
282 err.raw_os_error() == Some(26)
283}
284
285#[cfg(unix)]
286fn configure_command_process_group(command: &mut Command) {
287 use std::os::unix::process::CommandExt;
288
289 command.process_group(0);
290}
291
292#[cfg(not(unix))]
293fn configure_command_process_group(_command: &mut Command) {}
294
295#[cfg(unix)]
296fn terminate_timed_out_child(child: &mut Child) {
297 const SIGTERM: i32 = 15;
298 const SIGKILL: i32 = 9;
299
300 let process_group = child.id() as i32;
301 let _ = signal_process_group(process_group, SIGTERM);
302 let grace_deadline = Instant::now() + Duration::from_millis(50);
303
304 loop {
305 match child.try_wait() {
306 Ok(Some(_)) => return,
307 Ok(None) if Instant::now() < grace_deadline => {
308 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
309 }
310 Ok(None) | Err(_) => break,
311 }
312 }
313
314 let _ = signal_process_group(process_group, SIGKILL);
315}
316
317#[cfg(not(unix))]
318fn terminate_timed_out_child(child: &mut Child) {
319 let _ = child.kill();
320}
321
322#[cfg(unix)]
323fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
324 unsafe extern "C" {
325 fn kill(pid: i32, sig: i32) -> i32;
326 }
327
328 let result = unsafe { kill(-process_group, signal) };
329 if result == 0 {
330 Ok(())
331 } else {
332 Err(std::io::Error::last_os_error())
333 }
334}