1use super::manager::{
22 DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
23};
24use crate::core::plugin::{DescribeV1, ResponseV1};
25use anyhow::{Result, anyhow};
26use std::io::Read;
27use std::process::{Child, Command, Output, Stdio};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31
32const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
33const ETXTBSY_RETRY_COUNT: usize = 5;
34const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
35const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
36
37enum CommandRunError {
38 Execute(std::io::Error),
39 TimedOut { timeout: Duration, stderr: Vec<u8> },
40}
41
42impl PluginManager {
43 pub fn dispatch(
69 &self,
70 command: &str,
71 args: &[String],
72 context: &PluginDispatchContext,
73 ) -> std::result::Result<ResponseV1, PluginDispatchError> {
74 let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
75
76 let raw = run_provider(&provider, command, args, context, self.process_timeout)?;
77 if raw.status_code != 0 {
78 tracing::warn!(
79 plugin_id = %provider.plugin_id,
80 command = %command,
81 status_code = raw.status_code,
82 stderr = %raw.stderr.trim(),
83 "plugin command exited with non-zero status"
84 );
85 return Err(PluginDispatchError::NonZeroExit {
86 plugin_id: provider.plugin_id.clone(),
87 status_code: raw.status_code,
88 stderr: raw.stderr,
89 });
90 }
91
92 let response: ResponseV1 = serde_json::from_str(&raw.stdout).map_err(|source| {
93 tracing::warn!(
94 plugin_id = %provider.plugin_id,
95 command = %command,
96 error = %source,
97 "plugin command returned invalid JSON"
98 );
99 PluginDispatchError::InvalidJsonResponse {
100 plugin_id: provider.plugin_id.clone(),
101 source,
102 }
103 })?;
104
105 response.validate_v1().map_err(|reason| {
106 tracing::warn!(
107 plugin_id = %provider.plugin_id,
108 command = %command,
109 reason = %reason,
110 "plugin command returned invalid payload"
111 );
112 PluginDispatchError::InvalidResponsePayload {
113 plugin_id: provider.plugin_id.clone(),
114 reason,
115 }
116 })?;
117
118 Ok(response)
119 }
120
121 pub fn dispatch_passthrough(
145 &self,
146 command: &str,
147 args: &[String],
148 context: &PluginDispatchContext,
149 ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
150 let provider = self.resolve_provider(command, context.provider_override.as_deref())?;
151 run_provider(&provider, command, args, context, self.process_timeout)
152 }
153}
154
155pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
156 let mut command = Command::new(path);
157 command.arg("--describe");
158 let started_at = Instant::now();
159 tracing::debug!(
160 executable = %path.display(),
161 timeout_ms = timeout.as_millis(),
162 "running plugin describe"
163 );
164 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
165 CommandRunError::Execute(source) => {
166 tracing::warn!(
167 executable = %path.display(),
168 error = %source,
169 "plugin describe execution failed"
170 );
171 anyhow!(
172 "failed to execute --describe for {}: {source}",
173 path.display()
174 )
175 }
176 CommandRunError::TimedOut { timeout, stderr } => {
177 let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
178 tracing::warn!(
179 executable = %path.display(),
180 timeout_ms = timeout.as_millis(),
181 stderr = %stderr,
182 "plugin describe timed out"
183 );
184 if stderr.is_empty() {
185 anyhow!(
186 "--describe timed out after {} ms for {}",
187 timeout.as_millis(),
188 path.display()
189 )
190 } else {
191 anyhow!(
192 "--describe timed out after {} ms for {}: {}",
193 timeout.as_millis(),
194 path.display(),
195 stderr
196 )
197 }
198 }
199 })?;
200
201 tracing::debug!(
202 executable = %path.display(),
203 elapsed_ms = started_at.elapsed().as_millis(),
204 status = ?output.status.code(),
205 "plugin describe completed"
206 );
207
208 if !output.status.success() {
209 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
210 let message = if stderr.is_empty() {
211 format!("--describe failed with status {}", output.status)
212 } else {
213 format!(
214 "--describe failed with status {}: {}",
215 output.status, stderr
216 )
217 };
218 return Err(anyhow!(message));
219 }
220
221 let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
222 .map_err(anyhow::Error::from)
223 .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
224 describe
225 .validate_v1()
226 .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
227
228 Ok(describe)
229}
230
231pub(super) fn run_provider(
232 provider: &DiscoveredPlugin,
233 selected_command: &str,
234 args: &[String],
235 context: &PluginDispatchContext,
236 timeout: Duration,
237) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
238 let mut command = Command::new(&provider.executable);
239 let started_at = Instant::now();
240 tracing::debug!(
241 plugin_id = %provider.plugin_id,
242 executable = %provider.executable.display(),
243 command = %selected_command,
244 arg_count = args.len(),
245 timeout_ms = timeout.as_millis(),
246 "dispatching plugin command"
247 );
248 command.arg(selected_command);
249 command.args(args);
250 command.env(ENV_OSP_COMMAND, selected_command);
251 for (key, value) in context.runtime_hints.env_pairs() {
252 command.env(key, value);
253 }
254 for (key, value) in context.env_pairs_for(&provider.plugin_id) {
255 command.env(key, value);
256 }
257
258 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
259 CommandRunError::Execute(source) => {
260 tracing::warn!(
261 plugin_id = %provider.plugin_id,
262 executable = %provider.executable.display(),
263 command = %selected_command,
264 error = %source,
265 "plugin command execution failed"
266 );
267 PluginDispatchError::ExecuteFailed {
268 plugin_id: provider.plugin_id.clone(),
269 source,
270 }
271 }
272 CommandRunError::TimedOut { timeout, stderr } => {
273 let stderr_text = String::from_utf8_lossy(&stderr).to_string();
274 tracing::warn!(
275 plugin_id = %provider.plugin_id,
276 executable = %provider.executable.display(),
277 command = %selected_command,
278 timeout_ms = timeout.as_millis(),
279 stderr = %stderr_text.trim(),
280 "plugin command timed out"
281 );
282 PluginDispatchError::TimedOut {
283 plugin_id: provider.plugin_id.clone(),
284 timeout,
285 stderr: stderr_text,
286 }
287 }
288 })?;
289
290 tracing::debug!(
291 plugin_id = %provider.plugin_id,
292 executable = %provider.executable.display(),
293 command = %selected_command,
294 elapsed_ms = started_at.elapsed().as_millis(),
295 status = ?output.status.code(),
296 "plugin command completed"
297 );
298
299 Ok(RawPluginOutput {
300 status_code: output.status.code().unwrap_or(1),
301 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
302 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
303 })
304}
305
306fn run_command_with_timeout(
307 mut command: Command,
308 timeout: Duration,
309) -> Result<Output, CommandRunError> {
310 configure_command_process_group(&mut command);
311 command.stdin(Stdio::null());
312 command.stdout(Stdio::piped());
313 command.stderr(Stdio::piped());
314
315 let mut child = DrainedChild::spawn(command).map_err(CommandRunError::Execute)?;
316 let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
317
318 loop {
319 match child.try_wait() {
320 Ok(Some(status)) => return child.finish(status).map_err(CommandRunError::Execute),
321 Ok(None) if Instant::now() < deadline => {
322 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
323 }
324 Ok(None) => {
325 terminate_timed_out_child(child.child_mut());
326 let status = child.wait().map_err(CommandRunError::Execute)?;
327 let output = child.finish(status).map_err(CommandRunError::Execute)?;
328 return Err(CommandRunError::TimedOut {
329 timeout,
330 stderr: output.stderr,
331 });
332 }
333 Err(source) => return Err(CommandRunError::Execute(source)),
334 }
335 }
336}
337
338struct DrainedChild {
339 child: Child,
340 stdout: JoinHandle<std::io::Result<Vec<u8>>>,
341 stderr: JoinHandle<std::io::Result<Vec<u8>>>,
342}
343
344impl DrainedChild {
345 fn spawn(mut command: Command) -> std::io::Result<Self> {
346 let mut child = spawn_command_with_retry(&mut command)?;
347 let stdout = child
348 .stdout
349 .take()
350 .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
351 let stderr = child
352 .stderr
353 .take()
354 .ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;
355 Ok(Self {
356 child,
357 stdout: spawn_capture_thread(stdout),
358 stderr: spawn_capture_thread(stderr),
359 })
360 }
361
362 fn child_mut(&mut self) -> &mut Child {
363 &mut self.child
364 }
365
366 fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
367 self.child.try_wait()
368 }
369
370 fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
371 self.child.wait()
372 }
373
374 fn finish(self, status: std::process::ExitStatus) -> std::io::Result<Output> {
375 Ok(Output {
376 status,
377 stdout: join_capture(self.stdout)?,
378 stderr: join_capture(self.stderr)?,
379 })
380 }
381}
382
383fn spawn_capture_thread<R>(mut reader: R) -> JoinHandle<std::io::Result<Vec<u8>>>
384where
385 R: Read + Send + 'static,
386{
387 thread::spawn(move || {
388 let mut buffer = Vec::new();
389 reader.read_to_end(&mut buffer)?;
390 Ok(buffer)
391 })
392}
393
394fn join_capture(handle: JoinHandle<std::io::Result<Vec<u8>>>) -> std::io::Result<Vec<u8>> {
395 handle
396 .join()
397 .map_err(|_| std::io::Error::other("plugin output capture thread panicked"))?
398}
399
400fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
401 for attempt in 0..=ETXTBSY_RETRY_COUNT {
402 match command.spawn() {
403 Ok(child) => return Ok(child),
404 Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
405 thread::sleep(ETXTBSY_RETRY_DELAY);
406 }
407 Err(err) => return Err(err),
408 }
409 }
410
411 Err(std::io::Error::other(
412 "plugin spawn retry loop exhausted unexpectedly",
413 ))
414}
415
416fn is_text_file_busy(err: &std::io::Error) -> bool {
417 err.raw_os_error() == Some(26)
418}
419
420#[cfg(unix)]
421fn configure_command_process_group(command: &mut Command) {
422 use std::os::unix::process::CommandExt;
423
424 command.process_group(0);
425}
426
427#[cfg(not(unix))]
428fn configure_command_process_group(_command: &mut Command) {}
429
430#[cfg(unix)]
431fn terminate_timed_out_child(child: &mut Child) {
432 const SIGTERM: i32 = 15;
433 const SIGKILL: i32 = 9;
434
435 let process_group = child.id() as i32;
436 let _ = signal_process_group(process_group, SIGTERM);
437 let grace_deadline = Instant::now() + Duration::from_millis(50);
438
439 loop {
440 match child.try_wait() {
441 Ok(Some(_)) => return,
442 Ok(None) if Instant::now() < grace_deadline => {
443 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
444 }
445 Ok(None) | Err(_) => break,
446 }
447 }
448
449 let _ = signal_process_group(process_group, SIGKILL);
450}
451
452#[cfg(not(unix))]
453fn terminate_timed_out_child(child: &mut Child) {
454 let _ = child.kill();
455}
456
457#[cfg(unix)]
458fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
459 unsafe extern "C" {
460 fn kill(pid: i32, sig: i32) -> i32;
461 }
462
463 let result = unsafe { kill(-process_group, signal) };
464 if result == 0 {
465 Ok(())
466 } else {
467 Err(std::io::Error::last_os_error())
468 }
469}