1use std::collections::VecDeque;
44use std::path::PathBuf;
45use std::process::Stdio;
46use std::sync::Arc;
47use std::time::{Duration, Instant};
48
49use thiserror::Error;
50use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
51use tokio::process::{Child, ChildStdin, ChildStdout, Command};
52use tokio::sync::Mutex;
53use tracing::{debug, warn};
54
55use crate::plugin_manifest::PluginManifest;
56use crate::plugin_protocol::{
57 InitParams, JsonRpcVersion, PROTOCOL_VERSION, PluginRequest, PluginResponse, PluginRpcRequest,
58 PluginRpcResponse, RpcOutcome,
59};
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct LifetimePolicy {
68 pub idle_timeout: Duration,
69 pub shutdown_grace: Duration,
70 pub restart_window: Duration,
71 pub restart_cap: usize,
72}
73
74impl Default for LifetimePolicy {
75 fn default() -> Self {
76 Self {
77 idle_timeout: Duration::from_secs(60),
78 shutdown_grace: Duration::from_secs(10),
79 restart_window: Duration::from_secs(60),
80 restart_cap: 3,
81 }
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct PluginHealth {
94 pub plugin_name: String,
95 pub state: PluginState,
96 pub crashes_in_window: usize,
97 pub last_used: Option<Instant>,
98 pub last_crash: Option<Instant>,
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub enum PluginState {
103 Idle,
106 Running,
108 Recovering,
110 Disabled { reason: String },
114}
115
116#[derive(Debug, Error)]
121pub enum PluginClientError {
122 #[error("failed to spawn plugin `{plugin}` at {path}: {source}")]
123 Spawn {
124 plugin: String,
125 path: PathBuf,
126 #[source]
127 source: std::io::Error,
128 },
129 #[error("plugin `{plugin}` failed to initialise: {detail}")]
130 InitFailed { plugin: String, detail: String },
131 #[error(
132 "plugin `{plugin}` exceeded restart cap ({cap} restarts in {window:?}); marking disabled"
133 )]
134 RestartCapExceeded {
135 plugin: String,
136 cap: usize,
137 window: Duration,
138 },
139 #[error("plugin `{plugin}` is disabled: {reason}")]
140 Disabled { plugin: String, reason: String },
141 #[error("I/O error talking to plugin `{plugin}`: {source}")]
142 Io {
143 plugin: String,
144 #[source]
145 source: std::io::Error,
146 },
147 #[error("plugin `{plugin}` returned a malformed response: {source}")]
148 MalformedResponse {
149 plugin: String,
150 #[source]
151 source: serde_json::Error,
152 },
153 #[error("plugin `{plugin}` returned unexpected payload for method `{method}`")]
154 UnexpectedPayload { plugin: String, method: String },
155 #[error("plugin `{plugin}` reported error: {detail}")]
156 PluginError { plugin: String, detail: String },
157 #[error("plugin `{plugin}` reply id mismatch (expected {expected}, got {got})")]
158 IdMismatch {
159 plugin: String,
160 expected: u64,
161 got: u64,
162 },
163}
164
165#[derive(Clone)]
172pub struct PluginClient {
173 inner: Arc<Mutex<ClientState>>,
174 manifest: Arc<PluginManifest>,
175 executable: PathBuf,
176 policy: LifetimePolicy,
177}
178
179struct ClientState {
180 process: Option<RunningProcess>,
181 state: PluginState,
182 crashes: VecDeque<Instant>,
183 last_used: Option<Instant>,
184 last_crash: Option<Instant>,
185 next_id: u64,
186}
187
188struct RunningProcess {
189 child: Child,
190 stdin: ChildStdin,
191 stdout: BufReader<ChildStdout>,
192}
193
194impl PluginClient {
195 pub fn new(manifest: PluginManifest, executable: PathBuf, policy: LifetimePolicy) -> Self {
198 Self {
199 inner: Arc::new(Mutex::new(ClientState {
200 process: None,
201 state: PluginState::Idle,
202 crashes: VecDeque::new(),
203 last_used: None,
204 last_crash: None,
205 next_id: 0,
206 })),
207 manifest: Arc::new(manifest),
208 executable,
209 policy,
210 }
211 }
212
213 pub fn manifest(&self) -> &PluginManifest {
214 &self.manifest
215 }
216
217 pub fn policy(&self) -> LifetimePolicy {
218 self.policy
219 }
220
221 pub async fn health(&self) -> PluginHealth {
223 let state = self.inner.lock().await;
224 PluginHealth {
225 plugin_name: self.manifest.name.clone(),
226 state: state.state.clone(),
227 crashes_in_window: state.crashes.len(),
228 last_used: state.last_used,
229 last_crash: state.last_crash,
230 }
231 }
232
233 pub async fn clear_disabled(&self) {
237 let mut state = self.inner.lock().await;
238 state.crashes.clear();
239 if matches!(state.state, PluginState::Disabled { .. }) {
240 state.state = PluginState::Idle;
241 }
242 }
243
244 pub async fn request(&self, call: PluginRequest) -> Result<PluginResponse, PluginClientError> {
248 let mut state = self.inner.lock().await;
249
250 if let PluginState::Disabled { reason } = &state.state {
252 return Err(PluginClientError::Disabled {
253 plugin: self.manifest.name.clone(),
254 reason: reason.clone(),
255 });
256 }
257
258 if let Some(last) = state.last_used
263 && state.process.is_some()
264 && last.elapsed() >= self.policy.idle_timeout
265 {
266 debug!(
267 plugin = self.manifest.name.as_str(),
268 idle_for = ?last.elapsed(),
269 "reaping idle plugin process"
270 );
271 self.shutdown_locked(&mut state).await;
272 }
273
274 if state.process.is_none() {
276 self.spawn_locked(&mut state).await?;
277 }
278
279 let id = state.next_id.wrapping_add(1);
280 state.next_id = id;
281 let req = PluginRpcRequest {
282 jsonrpc: JsonRpcVersion::current(),
283 id,
284 call,
285 };
286 let line = match serde_json::to_string(&req) {
287 Ok(s) => s,
288 Err(source) => {
289 return Err(PluginClientError::MalformedResponse {
290 plugin: self.manifest.name.clone(),
291 source,
292 });
293 }
294 };
295
296 let outcome = match self.exchange_locked(&mut state, &line).await {
298 Ok(resp) => resp,
299 Err(e) => {
300 self.record_crash_locked(&mut state, e.to_string());
302 self.shutdown_locked(&mut state).await;
303 return Err(e);
304 }
305 };
306
307 if outcome.id != id {
308 return Err(PluginClientError::IdMismatch {
309 plugin: self.manifest.name.clone(),
310 expected: id,
311 got: outcome.id,
312 });
313 }
314
315 state.last_used = Some(Instant::now());
316
317 match outcome.outcome {
318 RpcOutcome::Result(r) => Ok(r),
319 RpcOutcome::Error(e) => Err(PluginClientError::PluginError {
320 plugin: self.manifest.name.clone(),
321 detail: e.to_string(),
322 }),
323 }
324 }
325
326 pub async fn shutdown(&self) {
330 let mut state = self.inner.lock().await;
331 self.shutdown_locked(&mut state).await;
332 }
333
334 async fn shutdown_locked(&self, state: &mut ClientState) {
335 let Some(mut proc) = state.process.take() else {
336 return;
337 };
338 if let Err(e) = proc.child.start_kill() {
341 warn!(
342 plugin = self.manifest.name.as_str(),
343 error = %e,
344 "start_kill failed; child may already be dead"
345 );
346 }
347 match tokio::time::timeout(self.policy.shutdown_grace, proc.child.wait()).await {
348 Ok(Ok(_)) => {
349 debug!(plugin = self.manifest.name.as_str(), "exited within grace");
350 }
351 Ok(Err(e)) => {
352 warn!(
353 plugin = self.manifest.name.as_str(),
354 error = %e,
355 "wait returned error post-kill"
356 );
357 }
358 Err(_) => {
359 warn!(
361 plugin = self.manifest.name.as_str(),
362 grace_ms = self.policy.shutdown_grace.as_millis(),
363 "plugin did not exit in grace; force-killing"
364 );
365 let _ = proc.child.kill().await;
366 }
367 }
368 if matches!(state.state, PluginState::Running) {
369 state.state = PluginState::Idle;
370 }
371 }
372
373 async fn spawn_locked(&self, state: &mut ClientState) -> Result<(), PluginClientError> {
374 let now = Instant::now();
377 let window = self.policy.restart_window;
378 while let Some(front) = state.crashes.front() {
379 if now.duration_since(*front) >= window {
380 state.crashes.pop_front();
381 } else {
382 break;
383 }
384 }
385 if state.crashes.len() >= self.policy.restart_cap {
386 let reason = format!(
387 "{} crashes in last {:?}",
388 state.crashes.len(),
389 self.policy.restart_window
390 );
391 state.state = PluginState::Disabled {
392 reason: reason.clone(),
393 };
394 return Err(PluginClientError::RestartCapExceeded {
395 plugin: self.manifest.name.clone(),
396 cap: self.policy.restart_cap,
397 window: self.policy.restart_window,
398 });
399 }
400
401 let mut cmd = Command::new(&self.executable);
402 cmd.stdin(Stdio::piped());
403 cmd.stdout(Stdio::piped());
404 cmd.stderr(Stdio::piped());
405 cmd.kill_on_drop(true);
406 cmd.env_clear();
410 for var in &self.manifest.allowed_env_vars {
411 if let Ok(value) = std::env::var(var) {
412 cmd.env(var, value);
413 }
414 }
415
416 let mut child = cmd.spawn().map_err(|source| PluginClientError::Spawn {
417 plugin: self.manifest.name.clone(),
418 path: self.executable.clone(),
419 source,
420 })?;
421
422 let stdin = child
423 .stdin
424 .take()
425 .expect("Stdio::piped on stdin should yield a handle");
426 let stdout = child
427 .stdout
428 .take()
429 .expect("Stdio::piped on stdout should yield a handle");
430 let stdout = BufReader::new(stdout);
431
432 state.process = Some(RunningProcess {
433 child,
434 stdin,
435 stdout,
436 });
437 state.state = PluginState::Running;
438
439 let init = PluginRequest::Init(InitParams {
441 source_name: self.manifest.name.clone(),
442 config: Default::default(),
443 protocol_version: PROTOCOL_VERSION.into(),
444 });
445 let id = state.next_id.wrapping_add(1);
446 state.next_id = id;
447 let req = PluginRpcRequest {
448 jsonrpc: JsonRpcVersion::current(),
449 id,
450 call: init,
451 };
452 let line = serde_json::to_string(&req).map_err(|e| PluginClientError::InitFailed {
453 plugin: self.manifest.name.clone(),
454 detail: e.to_string(),
455 })?;
456 let resp = self.exchange_locked(state, &line).await.map_err(|e| {
457 self.record_crash_locked_msg(state, "init exchange failed");
458 PluginClientError::InitFailed {
459 plugin: self.manifest.name.clone(),
460 detail: e.to_string(),
461 }
462 })?;
463 if resp.id != id {
464 return Err(PluginClientError::InitFailed {
465 plugin: self.manifest.name.clone(),
466 detail: format!("init reply id mismatch: expected {id}, got {}", resp.id),
467 });
468 }
469 match resp.outcome {
470 RpcOutcome::Result(_) => {}
471 RpcOutcome::Error(e) => {
472 self.record_crash_locked_msg(state, &format!("init returned error: {e}"));
473 return Err(PluginClientError::InitFailed {
474 plugin: self.manifest.name.clone(),
475 detail: e.to_string(),
476 });
477 }
478 }
479
480 Ok(())
481 }
482
483 async fn exchange_locked(
484 &self,
485 state: &mut ClientState,
486 line: &str,
487 ) -> Result<PluginRpcResponse, PluginClientError> {
488 let proc = state
489 .process
490 .as_mut()
491 .expect("exchange called without a running process");
492 proc.stdin
493 .write_all(line.as_bytes())
494 .await
495 .map_err(|source| PluginClientError::Io {
496 plugin: self.manifest.name.clone(),
497 source,
498 })?;
499 proc.stdin
500 .write_all(b"\n")
501 .await
502 .map_err(|source| PluginClientError::Io {
503 plugin: self.manifest.name.clone(),
504 source,
505 })?;
506 proc.stdin
507 .flush()
508 .await
509 .map_err(|source| PluginClientError::Io {
510 plugin: self.manifest.name.clone(),
511 source,
512 })?;
513
514 let mut reply = String::new();
515 let n =
516 proc.stdout
517 .read_line(&mut reply)
518 .await
519 .map_err(|source| PluginClientError::Io {
520 plugin: self.manifest.name.clone(),
521 source,
522 })?;
523 if n == 0 {
524 return Err(PluginClientError::Io {
525 plugin: self.manifest.name.clone(),
526 source: std::io::Error::new(
527 std::io::ErrorKind::UnexpectedEof,
528 "plugin closed stdout",
529 ),
530 });
531 }
532 serde_json::from_str(reply.trim_end()).map_err(|source| {
533 PluginClientError::MalformedResponse {
534 plugin: self.manifest.name.clone(),
535 source,
536 }
537 })
538 }
539
540 fn record_crash_locked(&self, state: &mut ClientState, _detail: String) {
541 let now = Instant::now();
542 state.crashes.push_back(now);
543 state.last_crash = Some(now);
544 if state.crashes.len() >= self.policy.restart_cap {
545 state.state = PluginState::Disabled {
546 reason: format!(
547 "{} crashes in last {:?}",
548 state.crashes.len(),
549 self.policy.restart_window
550 ),
551 };
552 } else {
553 state.state = PluginState::Recovering;
554 }
555 }
556
557 fn record_crash_locked_msg(&self, state: &mut ClientState, _msg: &str) {
558 self.record_crash_locked(state, String::new());
559 }
560}
561
562impl Drop for PluginClient {
563 fn drop(&mut self) {
564 }
570}
571
572#[cfg(all(test, target_os = "macos"))]
587mod tests {
588 use super::*;
589 use crate::plugin_manifest::PluginManifest;
590 use std::fs;
591 use std::os::unix::fs::PermissionsExt;
592 use std::path::Path;
593 use tempfile::TempDir;
594
595 fn write_fake_plugin(dir: &Path, name: &str, behaviour: &str) -> (PluginManifest, PathBuf) {
606 let exec_path = dir.join(format!("devboy-source-{name}"));
607 let script = match behaviour {
608 "echo" => format!(
609 r#"#!/bin/sh
610while IFS= read -r line; do
611 id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
612 printf '{{"jsonrpc":"2.0","id":%s,"result":{{"source_name":"{name}","capabilities_bits":1,"plugin_version":"0.0.1"}}}}\n' "$id"
613done
614"#
615 ),
616 "crash" => "#!/bin/sh\nexit 7\n".to_string(),
617 "hang" => "#!/bin/sh\nwhile read line; do :; done\nsleep 30\n".to_string(),
618 "env-dump" => format!(
619 r#"#!/bin/sh
620env > "{}/env-dump.txt"
621while IFS= read -r line; do
622 id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
623 printf '{{"jsonrpc":"2.0","id":%s,"result":{{"source_name":"{name}","capabilities_bits":1,"plugin_version":"0.0.1"}}}}\n' "$id"
624done
625"#,
626 dir.display()
627 ),
628 other => panic!("unknown behaviour: {other}"),
629 };
630 fs::write(&exec_path, script).unwrap();
631 let mut perms = fs::metadata(&exec_path).unwrap().permissions();
632 perms.set_mode(0o755);
633 fs::set_permissions(&exec_path, perms).unwrap();
634
635 let bytes = fs::read(&exec_path).unwrap();
636 use sha2::Digest;
637 let mut hasher = sha2::Sha256::new();
638 hasher.update(&bytes);
639 let checksum = hex::encode(hasher.finalize());
640
641 let manifest = PluginManifest {
642 name: name.into(),
643 version: "0.0.1".into(),
644 executable: PathBuf::from(format!("devboy-source-{name}")),
645 allowed_env_vars: vec!["DEVBOY_TEST_LET_THROUGH".into()],
646 checksum_sha256: checksum,
647 };
648 (manifest, exec_path)
649 }
650
651 fn fast_policy() -> LifetimePolicy {
652 LifetimePolicy {
653 idle_timeout: Duration::from_millis(80),
654 shutdown_grace: Duration::from_millis(200),
655 restart_window: Duration::from_secs(10),
656 restart_cap: 3,
657 }
658 }
659
660 #[tokio::test]
663 async fn lazy_spawn_and_init_handshake_succeeds() {
664 let dir = TempDir::new().unwrap();
665 let (manifest, exec) = write_fake_plugin(dir.path(), "echo", "echo");
666 let client = PluginClient::new(manifest, exec, fast_policy());
667
668 let initial = client.health().await;
669 assert_eq!(initial.state, PluginState::Idle);
670
671 let resp = client.request(PluginRequest::IsAvailable).await;
673 assert!(resp.is_ok(), "request failed: {resp:?}");
678 let after = client.health().await;
679 assert_eq!(after.state, PluginState::Running);
680 assert!(after.last_used.is_some());
681
682 client.shutdown().await;
683 }
684
685 #[tokio::test]
688 async fn idle_timeout_reaps_subprocess_before_next_request() {
689 let dir = TempDir::new().unwrap();
690 let (manifest, exec) = write_fake_plugin(dir.path(), "echoi", "echo");
691 let client = PluginClient::new(manifest, exec, fast_policy());
692
693 let _ = client.request(PluginRequest::IsAvailable).await.unwrap();
694 tokio::time::sleep(Duration::from_millis(150)).await;
696 let _ = client.request(PluginRequest::IsAvailable).await.unwrap();
699
700 client.shutdown().await;
701 }
702
703 #[tokio::test]
706 async fn restart_cap_disables_after_repeated_spawn_failures() {
707 let dir = TempDir::new().unwrap();
708 let (manifest, exec) = write_fake_plugin(dir.path(), "crashc", "crash");
709 let client = PluginClient::new(manifest, exec, fast_policy());
710
711 for _ in 0..3 {
714 let _ = client.request(PluginRequest::IsAvailable).await;
715 }
716 let h = client.health().await;
717 assert!(
718 matches!(h.state, PluginState::Disabled { .. }),
719 "expected Disabled, got {:?}",
720 h.state
721 );
722 let err = client
724 .request(PluginRequest::IsAvailable)
725 .await
726 .unwrap_err();
727 assert!(
728 matches!(err, PluginClientError::Disabled { .. }),
729 "expected Disabled error, got {err:?}"
730 );
731
732 client.clear_disabled().await;
734 assert_eq!(client.health().await.state, PluginState::Idle);
735 }
736
737 #[tokio::test]
740 async fn env_restriction_only_passes_allowed_vars() {
741 let dir = TempDir::new().unwrap();
742 let (manifest, exec) = write_fake_plugin(dir.path(), "envd", "env-dump");
743 let dir_path = dir.path().to_path_buf();
744 let client = PluginClient::new(manifest, exec, fast_policy());
745
746 temp_env::async_with_vars(
750 [
751 ("DEVBOY_TEST_SHOULD_NOT_LEAK", Some("leak-me")),
752 ("DEVBOY_TEST_LET_THROUGH", Some("passed-through")),
753 ],
754 async move {
755 let _ = client.request(PluginRequest::IsAvailable).await.unwrap();
756 client.shutdown().await;
757 },
758 )
759 .await;
760
761 let dump = fs::read_to_string(dir_path.join("env-dump.txt")).unwrap();
763 assert!(
764 dump.contains("DEVBOY_TEST_LET_THROUGH=passed-through"),
765 "allowed var did not pass through: {dump}"
766 );
767 assert!(
768 !dump.contains("DEVBOY_TEST_SHOULD_NOT_LEAK"),
769 "non-allowed var leaked into plugin env: {dump}"
770 );
771 }
772
773 #[tokio::test]
776 async fn shutdown_sends_sigterm_then_sigkill_on_grace_timeout() {
777 let dir = TempDir::new().unwrap();
778 let (manifest, exec) = write_fake_plugin(dir.path(), "hang", "hang");
779 let client = PluginClient::new(
780 manifest,
781 exec,
782 LifetimePolicy {
783 idle_timeout: Duration::from_secs(60),
784 shutdown_grace: Duration::from_millis(150),
785 restart_window: Duration::from_secs(10),
786 restart_cap: 3,
787 },
788 );
789
790 let req_fut = client.request(PluginRequest::IsAvailable);
795 let _ = tokio::time::timeout(Duration::from_millis(50), req_fut).await;
796 let start = Instant::now();
800 client.shutdown().await;
801 let elapsed = start.elapsed();
802 assert!(
805 elapsed < Duration::from_secs(2),
806 "shutdown took too long: {elapsed:?}"
807 );
808 }
809
810 #[test]
813 fn default_policy_matches_adr_021_section_10() {
814 let p = LifetimePolicy::default();
815 assert_eq!(p.idle_timeout, Duration::from_secs(60));
816 assert_eq!(p.shutdown_grace, Duration::from_secs(10));
817 assert_eq!(p.restart_window, Duration::from_secs(60));
818 assert_eq!(p.restart_cap, 3);
819 }
820}