1use super::manager::{
22 DiscoveredPlugin, PluginDispatchContext, PluginDispatchError, PluginManager, RawPluginOutput,
23};
24use crate::core::plugin::{DescribeV1, ResponseV1};
25use anyhow::{Result, anyhow};
26use std::io::Read;
27use std::process::{Child, Command, Output, Stdio};
28use std::thread;
29use std::thread::JoinHandle;
30use std::time::{Duration, Instant};
31
32const PROCESS_WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
33const ETXTBSY_RETRY_COUNT: usize = 5;
34const ETXTBSY_RETRY_DELAY: Duration = Duration::from_millis(10);
35const ENV_OSP_COMMAND: &str = "OSP_COMMAND";
36
37enum CommandRunError {
38 Execute(std::io::Error),
39 TimedOut { timeout: Duration, stderr: Vec<u8> },
40}
41
42struct ExecutedPluginCommand {
43 provider: DiscoveredPlugin,
44 raw: RawPluginOutput,
45}
46
47impl ExecutedPluginCommand {
48 fn run(
49 manager: &PluginManager,
50 command: &str,
51 args: &[String],
52 context: &PluginDispatchContext,
53 ) -> std::result::Result<Self, PluginDispatchError> {
54 let provider = manager.resolve_provider(command, context.provider_override.as_deref())?;
55 let raw = run_provider(&provider, command, args, context, manager.process_timeout)?;
56 Ok(Self { provider, raw })
57 }
58
59 fn into_raw(self) -> RawPluginOutput {
60 self.raw
61 }
62
63 fn into_response(self, command: &str) -> std::result::Result<ResponseV1, PluginDispatchError> {
64 if self.raw.status_code != 0 {
65 tracing::warn!(
66 plugin_id = %self.provider.plugin_id,
67 command = %command,
68 status_code = self.raw.status_code,
69 stderr = %self.raw.stderr.trim(),
70 "plugin command exited with non-zero status"
71 );
72 return Err(PluginDispatchError::NonZeroExit {
73 plugin_id: self.provider.plugin_id,
74 status_code: self.raw.status_code,
75 stderr: self.raw.stderr,
76 });
77 }
78
79 let response: ResponseV1 = serde_json::from_str(&self.raw.stdout).map_err(|source| {
80 tracing::warn!(
81 plugin_id = %self.provider.plugin_id,
82 command = %command,
83 error = %source,
84 "plugin command returned invalid JSON"
85 );
86 PluginDispatchError::InvalidJsonResponse {
87 plugin_id: self.provider.plugin_id.clone(),
88 source,
89 }
90 })?;
91
92 response.validate_v1().map_err(|reason| {
93 tracing::warn!(
94 plugin_id = %self.provider.plugin_id,
95 command = %command,
96 reason = %reason,
97 "plugin command returned invalid payload"
98 );
99 PluginDispatchError::InvalidResponsePayload {
100 plugin_id: self.provider.plugin_id,
101 reason,
102 }
103 })?;
104
105 Ok(response)
106 }
107}
108
109impl PluginManager {
110 pub fn dispatch(
136 &self,
137 command: &str,
138 args: &[String],
139 context: &PluginDispatchContext,
140 ) -> std::result::Result<ResponseV1, PluginDispatchError> {
141 ExecutedPluginCommand::run(self, command, args, context)?.into_response(command)
142 }
143
144 pub fn dispatch_passthrough(
168 &self,
169 command: &str,
170 args: &[String],
171 context: &PluginDispatchContext,
172 ) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
173 Ok(ExecutedPluginCommand::run(self, command, args, context)?.into_raw())
174 }
175}
176
177pub(super) fn describe_plugin(path: &std::path::Path, timeout: Duration) -> Result<DescribeV1> {
178 let mut command = Command::new(path);
179 command.arg("--describe");
180 let started_at = Instant::now();
181 tracing::debug!(
182 executable = %path.display(),
183 timeout_ms = timeout.as_millis(),
184 "running plugin describe"
185 );
186 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
187 CommandRunError::Execute(source) => {
188 tracing::warn!(
189 executable = %path.display(),
190 error = %source,
191 "plugin describe execution failed"
192 );
193 anyhow!(
194 "failed to execute --describe for {}: {source}",
195 path.display()
196 )
197 }
198 CommandRunError::TimedOut { timeout, stderr } => {
199 let stderr = String::from_utf8_lossy(&stderr).trim().to_string();
200 tracing::warn!(
201 executable = %path.display(),
202 timeout_ms = timeout.as_millis(),
203 stderr = %stderr,
204 "plugin describe timed out"
205 );
206 if stderr.is_empty() {
207 anyhow!(
208 "--describe timed out after {} ms for {}",
209 timeout.as_millis(),
210 path.display()
211 )
212 } else {
213 anyhow!(
214 "--describe timed out after {} ms for {}: {}",
215 timeout.as_millis(),
216 path.display(),
217 stderr
218 )
219 }
220 }
221 })?;
222
223 tracing::debug!(
224 executable = %path.display(),
225 elapsed_ms = started_at.elapsed().as_millis(),
226 status = ?output.status.code(),
227 "plugin describe completed"
228 );
229
230 if !output.status.success() {
231 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
232 let message = if stderr.is_empty() {
233 format!("--describe failed with status {}", output.status)
234 } else {
235 format!(
236 "--describe failed with status {}: {}",
237 output.status, stderr
238 )
239 };
240 return Err(anyhow!(message));
241 }
242
243 let describe: DescribeV1 = serde_json::from_slice(&output.stdout)
244 .map_err(anyhow::Error::from)
245 .map_err(|err| err.context(format!("invalid describe JSON from {}", path.display())))?;
246 describe
247 .validate_v1()
248 .map_err(|err| anyhow!("invalid describe payload from {}: {err}", path.display()))?;
249
250 Ok(describe)
251}
252
253pub(super) fn run_provider(
254 provider: &DiscoveredPlugin,
255 selected_command: &str,
256 args: &[String],
257 context: &PluginDispatchContext,
258 timeout: Duration,
259) -> std::result::Result<RawPluginOutput, PluginDispatchError> {
260 let mut command = Command::new(&provider.executable);
261 let started_at = Instant::now();
262 tracing::debug!(
263 plugin_id = %provider.plugin_id,
264 executable = %provider.executable.display(),
265 command = %selected_command,
266 arg_count = args.len(),
267 timeout_ms = timeout.as_millis(),
268 "dispatching plugin command"
269 );
270 command.arg(selected_command);
271 command.args(args);
272 command.env(ENV_OSP_COMMAND, selected_command);
273 for (key, value) in context.runtime_hints.env_pairs() {
274 command.env(key, value);
275 }
276 for (key, value) in context.env_pairs_for(&provider.plugin_id) {
277 command.env(key, value);
278 }
279
280 let output = run_command_with_timeout(command, timeout).map_err(|err| match err {
281 CommandRunError::Execute(source) => {
282 tracing::warn!(
283 plugin_id = %provider.plugin_id,
284 executable = %provider.executable.display(),
285 command = %selected_command,
286 error = %source,
287 "plugin command execution failed"
288 );
289 PluginDispatchError::ExecuteFailed {
290 plugin_id: provider.plugin_id.clone(),
291 source,
292 }
293 }
294 CommandRunError::TimedOut { timeout, stderr } => {
295 let stderr_text = String::from_utf8_lossy(&stderr).to_string();
296 tracing::warn!(
297 plugin_id = %provider.plugin_id,
298 executable = %provider.executable.display(),
299 command = %selected_command,
300 timeout_ms = timeout.as_millis(),
301 stderr = %stderr_text.trim(),
302 "plugin command timed out"
303 );
304 PluginDispatchError::TimedOut {
305 plugin_id: provider.plugin_id.clone(),
306 timeout,
307 stderr: stderr_text,
308 }
309 }
310 })?;
311
312 tracing::debug!(
313 plugin_id = %provider.plugin_id,
314 executable = %provider.executable.display(),
315 command = %selected_command,
316 elapsed_ms = started_at.elapsed().as_millis(),
317 status = ?output.status.code(),
318 "plugin command completed"
319 );
320
321 Ok(RawPluginOutput {
322 status_code: output.status.code().unwrap_or(1),
323 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
324 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
325 })
326}
327
328fn run_command_with_timeout(
329 mut command: Command,
330 timeout: Duration,
331) -> Result<Output, CommandRunError> {
332 configure_command_process_group(&mut command);
333 command.stdin(Stdio::null());
334 command.stdout(Stdio::piped());
335 command.stderr(Stdio::piped());
336
337 let mut child = DrainedChild::spawn(command).map_err(CommandRunError::Execute)?;
338 let deadline = Instant::now() + timeout.max(Duration::from_millis(1));
339
340 loop {
341 match child.try_wait() {
342 Ok(Some(status)) => return child.finish(status).map_err(CommandRunError::Execute),
343 Ok(None) if Instant::now() < deadline => {
344 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
345 }
346 Ok(None) => {
347 terminate_timed_out_child(child.child_mut());
348 let status = child.wait().map_err(CommandRunError::Execute)?;
349 let output = child.finish(status).map_err(CommandRunError::Execute)?;
350 return Err(CommandRunError::TimedOut {
351 timeout,
352 stderr: output.stderr,
353 });
354 }
355 Err(source) => return Err(CommandRunError::Execute(source)),
356 }
357 }
358}
359
360struct DrainedChild {
361 child: Child,
362 stdout: JoinHandle<std::io::Result<Vec<u8>>>,
363 stderr: JoinHandle<std::io::Result<Vec<u8>>>,
364}
365
366impl DrainedChild {
367 fn spawn(mut command: Command) -> std::io::Result<Self> {
368 let mut child = spawn_command_with_retry(&mut command)?;
369 let stdout = child
370 .stdout
371 .take()
372 .ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
373 let stderr = child
374 .stderr
375 .take()
376 .ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;
377 Ok(Self {
378 child,
379 stdout: spawn_capture_thread(stdout),
380 stderr: spawn_capture_thread(stderr),
381 })
382 }
383
384 fn child_mut(&mut self) -> &mut Child {
385 &mut self.child
386 }
387
388 fn try_wait(&mut self) -> std::io::Result<Option<std::process::ExitStatus>> {
389 self.child.try_wait()
390 }
391
392 fn wait(&mut self) -> std::io::Result<std::process::ExitStatus> {
393 self.child.wait()
394 }
395
396 fn finish(self, status: std::process::ExitStatus) -> std::io::Result<Output> {
397 Ok(Output {
398 status,
399 stdout: join_capture(self.stdout)?,
400 stderr: join_capture(self.stderr)?,
401 })
402 }
403}
404
405fn spawn_capture_thread<R>(mut reader: R) -> JoinHandle<std::io::Result<Vec<u8>>>
406where
407 R: Read + Send + 'static,
408{
409 thread::spawn(move || {
410 let mut buffer = Vec::new();
411 reader.read_to_end(&mut buffer)?;
412 Ok(buffer)
413 })
414}
415
416fn join_capture(handle: JoinHandle<std::io::Result<Vec<u8>>>) -> std::io::Result<Vec<u8>> {
417 handle
418 .join()
419 .map_err(|_| std::io::Error::other("plugin output capture thread panicked"))?
420}
421
422fn spawn_command_with_retry(command: &mut Command) -> std::io::Result<Child> {
423 for attempt in 0..=ETXTBSY_RETRY_COUNT {
424 match command.spawn() {
425 Ok(child) => return Ok(child),
426 Err(err) if is_text_file_busy(&err) && attempt < ETXTBSY_RETRY_COUNT => {
427 thread::sleep(ETXTBSY_RETRY_DELAY);
428 }
429 Err(err) => return Err(err),
430 }
431 }
432
433 Err(std::io::Error::other(
434 "plugin spawn retry loop exhausted unexpectedly",
435 ))
436}
437
438fn is_text_file_busy(err: &std::io::Error) -> bool {
439 err.raw_os_error() == Some(26)
440}
441
442#[cfg(unix)]
443fn configure_command_process_group(command: &mut Command) {
444 use std::os::unix::process::CommandExt;
445
446 command.process_group(0);
447}
448
449#[cfg(not(unix))]
450fn configure_command_process_group(_command: &mut Command) {}
451
452#[cfg(unix)]
453fn terminate_timed_out_child(child: &mut Child) {
454 const SIGTERM: i32 = 15;
455 const SIGKILL: i32 = 9;
456
457 let process_group = child.id() as i32;
458 let _ = signal_process_group(process_group, SIGTERM);
459 let grace_deadline = Instant::now() + Duration::from_millis(50);
460
461 loop {
462 match child.try_wait() {
463 Ok(Some(_)) => return,
464 Ok(None) if Instant::now() < grace_deadline => {
465 thread::sleep(PROCESS_WAIT_POLL_INTERVAL);
466 }
467 Ok(None) | Err(_) => break,
468 }
469 }
470
471 let _ = signal_process_group(process_group, SIGKILL);
472}
473
474#[cfg(not(unix))]
475fn terminate_timed_out_child(child: &mut Child) {
476 let _ = child.kill();
477}
478
479#[cfg(unix)]
480fn signal_process_group(process_group: i32, signal: i32) -> std::io::Result<()> {
481 unsafe extern "C" {
482 fn kill(pid: i32, sig: i32) -> i32;
483 }
484
485 let result = unsafe { kill(-process_group, signal) };
486 if result == 0 {
487 Ok(())
488 } else {
489 Err(std::io::Error::last_os_error())
490 }
491}