Skip to main content

devboy_storage/
plugin_client.rs

1//! Lifetime-managing client for subprocess `SecretSource`
2//! plugins per [ADR-021] §10 (subprocess plugin lifetime
3//! contract).
4//!
5//! Builds on the wire-protocol types from [`plugin_protocol`]
6//! and the manifest discovery from [`plugin_manifest`]: this
7//! module owns the *process*. The host calls `request(...)`
8//! whenever it needs to talk to the plugin; the client takes
9//! care of:
10//!
11//! - **Lazy spawn** — the binary doesn't run until the first
12//!   request reaches the client.
13//! - **Idle timeout** — a spawn that hasn't been used for
14//!   `idle_timeout` is shut down on the next access (kept the
15//!   simple way: lazy reaping, no background sweeper).
16//! - **Graceful shutdown** — `SIGTERM` + `grace_period`
17//!   followed by `SIGKILL` if the child won't exit. `Drop`
18//!   calls `shutdown_blocking()` so a leaked client doesn't
19//!   leave a zombie.
20//! - **Restart cap** — a sliding-window counter caps automatic
21//!   re-spawn after a crash. Beyond the cap the plugin is
22//!   marked **disabled**; `doctor` reports the failure count
23//!   and the user has to clear it.
24//! - **Env restriction** — the child inherits exactly the env
25//!   vars listed in [`crate::plugin_manifest::PluginManifest::allowed_env_vars`]
26//!   and nothing else. `Command::env_clear()` is the gate; the
27//!   test crate's env-leak fixture proves it.
28//!
29//! ## What this module does **not** do
30//!
31//! Implement the [`SecretSource`] trait. The client returns
32//! typed wire payloads; a thin adapter (added in P15.3 or by
33//! the router) maps them to `SecretSource::get/list/validate`
34//! results. Keeping the trait impl out of this module makes
35//! the lifetime semantics testable without dragging in the
36//! whole router.
37//!
38//! [`plugin_protocol`]: crate::plugin_protocol
39//! [`plugin_manifest`]: crate::plugin_manifest
40//! [`SecretSource`]: crate::source::SecretSource
41//! [ADR-021]: https://github.com/meteora-pro/devboy-tools/blob/main/docs/architecture/adr/ADR-021-secret-source-router.md
42
43use std::collections::VecDeque;
44use std::path::PathBuf;
45use std::process::Stdio;
46use std::sync::Arc;
47use std::time::{Duration, Instant};
48
49use thiserror::Error;
50use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
51use tokio::process::{Child, ChildStdin, ChildStdout, Command};
52use tokio::sync::Mutex;
53use tracing::{debug, warn};
54
55use crate::plugin_manifest::PluginManifest;
56use crate::plugin_protocol::{
57    InitParams, JsonRpcVersion, PROTOCOL_VERSION, PluginRequest, PluginResponse, PluginRpcRequest,
58    PluginRpcResponse, RpcOutcome,
59};
60
61// =============================================================================
62// Policy
63// =============================================================================
64
65/// Lifetime knobs. All durations have ADR-021 §10 defaults.
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct LifetimePolicy {
68    pub idle_timeout: Duration,
69    pub shutdown_grace: Duration,
70    pub restart_window: Duration,
71    pub restart_cap: usize,
72}
73
74impl Default for LifetimePolicy {
75    fn default() -> Self {
76        Self {
77            idle_timeout: Duration::from_secs(60),
78            shutdown_grace: Duration::from_secs(10),
79            restart_window: Duration::from_secs(60),
80            restart_cap: 3,
81        }
82    }
83}
84
85// =============================================================================
86// Health snapshot (consumed by `doctor`)
87// =============================================================================
88
89/// Lifetime view exposed to `doctor`. Captures whether the
90/// plugin is alive, how many times it crashed in the rolling
91/// window, and whether the restart cap has tripped.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct PluginHealth {
94    pub plugin_name: String,
95    pub state: PluginState,
96    pub crashes_in_window: usize,
97    pub last_used: Option<Instant>,
98    pub last_crash: Option<Instant>,
99}
100
101#[derive(Debug, Clone, PartialEq, Eq)]
102pub enum PluginState {
103    /// Never spawned or shut down cleanly. Next request
104    /// triggers a fresh spawn.
105    Idle,
106    /// Subprocess is alive and ready.
107    Running,
108    /// Subprocess died and we're inside the restart window.
109    Recovering,
110    /// Restart cap hit. The plugin is dormant until the user
111    /// clears the failure count via `doctor` (or restarts the
112    /// host).
113    Disabled { reason: String },
114}
115
116// =============================================================================
117// Errors
118// =============================================================================
119
120#[derive(Debug, Error)]
121pub enum PluginClientError {
122    #[error("failed to spawn plugin `{plugin}` at {path}: {source}")]
123    Spawn {
124        plugin: String,
125        path: PathBuf,
126        #[source]
127        source: std::io::Error,
128    },
129    #[error("plugin `{plugin}` failed to initialise: {detail}")]
130    InitFailed { plugin: String, detail: String },
131    #[error(
132        "plugin `{plugin}` exceeded restart cap ({cap} restarts in {window:?}); marking disabled"
133    )]
134    RestartCapExceeded {
135        plugin: String,
136        cap: usize,
137        window: Duration,
138    },
139    #[error("plugin `{plugin}` is disabled: {reason}")]
140    Disabled { plugin: String, reason: String },
141    #[error("I/O error talking to plugin `{plugin}`: {source}")]
142    Io {
143        plugin: String,
144        #[source]
145        source: std::io::Error,
146    },
147    #[error("plugin `{plugin}` returned a malformed response: {source}")]
148    MalformedResponse {
149        plugin: String,
150        #[source]
151        source: serde_json::Error,
152    },
153    #[error("plugin `{plugin}` returned unexpected payload for method `{method}`")]
154    UnexpectedPayload { plugin: String, method: String },
155    #[error("plugin `{plugin}` reported error: {detail}")]
156    PluginError { plugin: String, detail: String },
157    #[error("plugin `{plugin}` reply id mismatch (expected {expected}, got {got})")]
158    IdMismatch {
159        plugin: String,
160        expected: u64,
161        got: u64,
162    },
163}
164
165// =============================================================================
166// Client
167// =============================================================================
168
169/// Thread-safe handle to a subprocess plugin. Cheap to clone —
170/// the actual state lives behind an `Arc<Mutex<…>>`.
171#[derive(Clone)]
172pub struct PluginClient {
173    inner: Arc<Mutex<ClientState>>,
174    manifest: Arc<PluginManifest>,
175    executable: PathBuf,
176    policy: LifetimePolicy,
177}
178
179struct ClientState {
180    process: Option<RunningProcess>,
181    state: PluginState,
182    crashes: VecDeque<Instant>,
183    last_used: Option<Instant>,
184    last_crash: Option<Instant>,
185    next_id: u64,
186}
187
188struct RunningProcess {
189    child: Child,
190    stdin: ChildStdin,
191    stdout: BufReader<ChildStdout>,
192}
193
194impl PluginClient {
195    /// Build a fresh client. Does **not** spawn — the first
196    /// `request` call performs the lazy spawn.
197    pub fn new(manifest: PluginManifest, executable: PathBuf, policy: LifetimePolicy) -> Self {
198        Self {
199            inner: Arc::new(Mutex::new(ClientState {
200                process: None,
201                state: PluginState::Idle,
202                crashes: VecDeque::new(),
203                last_used: None,
204                last_crash: None,
205                next_id: 0,
206            })),
207            manifest: Arc::new(manifest),
208            executable,
209            policy,
210        }
211    }
212
213    pub fn manifest(&self) -> &PluginManifest {
214        &self.manifest
215    }
216
217    pub fn policy(&self) -> LifetimePolicy {
218        self.policy
219    }
220
221    /// Snapshot of the current health for `doctor`.
222    pub async fn health(&self) -> PluginHealth {
223        let state = self.inner.lock().await;
224        PluginHealth {
225            plugin_name: self.manifest.name.clone(),
226            state: state.state.clone(),
227            crashes_in_window: state.crashes.len(),
228            last_used: state.last_used,
229            last_crash: state.last_crash,
230        }
231    }
232
233    /// Manually clear the crash counter — used by `doctor`'s
234    /// "reset disabled plugin" affordance after the operator
235    /// has fixed whatever was crashing it.
236    pub async fn clear_disabled(&self) {
237        let mut state = self.inner.lock().await;
238        state.crashes.clear();
239        if matches!(state.state, PluginState::Disabled { .. }) {
240            state.state = PluginState::Idle;
241        }
242    }
243
244    /// Issue a single request. Spawns lazily, re-spawns on
245    /// crash within the cap, and reaps an idle process before
246    /// the next live call.
247    pub async fn request(&self, call: PluginRequest) -> Result<PluginResponse, PluginClientError> {
248        let mut state = self.inner.lock().await;
249
250        // Disabled plugins refuse before doing any I/O.
251        if let PluginState::Disabled { reason } = &state.state {
252            return Err(PluginClientError::Disabled {
253                plugin: self.manifest.name.clone(),
254                reason: reason.clone(),
255            });
256        }
257
258        // Lazy reap — drop the child if we've been idle past
259        // `idle_timeout`. We do this *before* checking for a
260        // running process so the next branch handles the
261        // re-spawn uniformly.
262        if let Some(last) = state.last_used
263            && state.process.is_some()
264            && last.elapsed() >= self.policy.idle_timeout
265        {
266            debug!(
267                plugin = self.manifest.name.as_str(),
268                idle_for = ?last.elapsed(),
269                "reaping idle plugin process"
270            );
271            self.shutdown_locked(&mut state).await;
272        }
273
274        // Spawn if needed.
275        if state.process.is_none() {
276            self.spawn_locked(&mut state).await?;
277        }
278
279        let id = state.next_id.wrapping_add(1);
280        state.next_id = id;
281        let req = PluginRpcRequest {
282            jsonrpc: JsonRpcVersion::current(),
283            id,
284            call,
285        };
286        let line = match serde_json::to_string(&req) {
287            Ok(s) => s,
288            Err(source) => {
289                return Err(PluginClientError::MalformedResponse {
290                    plugin: self.manifest.name.clone(),
291                    source,
292                });
293            }
294        };
295
296        // Send + read response.
297        let outcome = match self.exchange_locked(&mut state, &line).await {
298            Ok(resp) => resp,
299            Err(e) => {
300                // Treat any I/O error as a crash.
301                self.record_crash_locked(&mut state, e.to_string());
302                self.shutdown_locked(&mut state).await;
303                return Err(e);
304            }
305        };
306
307        if outcome.id != id {
308            return Err(PluginClientError::IdMismatch {
309                plugin: self.manifest.name.clone(),
310                expected: id,
311                got: outcome.id,
312            });
313        }
314
315        state.last_used = Some(Instant::now());
316
317        match outcome.outcome {
318            RpcOutcome::Result(r) => Ok(r),
319            RpcOutcome::Error(e) => Err(PluginClientError::PluginError {
320                plugin: self.manifest.name.clone(),
321                detail: e.to_string(),
322            }),
323        }
324    }
325
326    /// Send `SIGTERM`, wait `shutdown_grace`, send `SIGKILL`
327    /// if still alive. Idempotent — safe to call multiple
328    /// times.
329    pub async fn shutdown(&self) {
330        let mut state = self.inner.lock().await;
331        self.shutdown_locked(&mut state).await;
332    }
333
334    async fn shutdown_locked(&self, state: &mut ClientState) {
335        let Some(mut proc) = state.process.take() else {
336            return;
337        };
338        // Try a graceful kill first. tokio::process::Child::start_kill
339        // sends SIGTERM (on Unix; TerminateProcess on Windows).
340        if let Err(e) = proc.child.start_kill() {
341            warn!(
342                plugin = self.manifest.name.as_str(),
343                error = %e,
344                "start_kill failed; child may already be dead"
345            );
346        }
347        match tokio::time::timeout(self.policy.shutdown_grace, proc.child.wait()).await {
348            Ok(Ok(_)) => {
349                debug!(plugin = self.manifest.name.as_str(), "exited within grace");
350            }
351            Ok(Err(e)) => {
352                warn!(
353                    plugin = self.manifest.name.as_str(),
354                    error = %e,
355                    "wait returned error post-kill"
356                );
357            }
358            Err(_) => {
359                // Grace period elapsed. Force-kill.
360                warn!(
361                    plugin = self.manifest.name.as_str(),
362                    grace_ms = self.policy.shutdown_grace.as_millis(),
363                    "plugin did not exit in grace; force-killing"
364                );
365                let _ = proc.child.kill().await;
366            }
367        }
368        if matches!(state.state, PluginState::Running) {
369            state.state = PluginState::Idle;
370        }
371    }
372
373    async fn spawn_locked(&self, state: &mut ClientState) -> Result<(), PluginClientError> {
374        // Restart-cap check: drop crashes outside the window
375        // first.
376        let now = Instant::now();
377        let window = self.policy.restart_window;
378        while let Some(front) = state.crashes.front() {
379            if now.duration_since(*front) >= window {
380                state.crashes.pop_front();
381            } else {
382                break;
383            }
384        }
385        if state.crashes.len() >= self.policy.restart_cap {
386            let reason = format!(
387                "{} crashes in last {:?}",
388                state.crashes.len(),
389                self.policy.restart_window
390            );
391            state.state = PluginState::Disabled {
392                reason: reason.clone(),
393            };
394            return Err(PluginClientError::RestartCapExceeded {
395                plugin: self.manifest.name.clone(),
396                cap: self.policy.restart_cap,
397                window: self.policy.restart_window,
398            });
399        }
400
401        let mut cmd = Command::new(&self.executable);
402        cmd.stdin(Stdio::piped());
403        cmd.stdout(Stdio::piped());
404        cmd.stderr(Stdio::piped());
405        cmd.kill_on_drop(true);
406        // Strip the host's env wholesale, then add only the
407        // allowed vars. This is the env-restriction enforcement
408        // the manifest declares.
409        cmd.env_clear();
410        for var in &self.manifest.allowed_env_vars {
411            if let Ok(value) = std::env::var(var) {
412                cmd.env(var, value);
413            }
414        }
415
416        let mut child = cmd.spawn().map_err(|source| PluginClientError::Spawn {
417            plugin: self.manifest.name.clone(),
418            path: self.executable.clone(),
419            source,
420        })?;
421
422        let stdin = child
423            .stdin
424            .take()
425            .expect("Stdio::piped on stdin should yield a handle");
426        let stdout = child
427            .stdout
428            .take()
429            .expect("Stdio::piped on stdout should yield a handle");
430        let stdout = BufReader::new(stdout);
431
432        state.process = Some(RunningProcess {
433            child,
434            stdin,
435            stdout,
436        });
437        state.state = PluginState::Running;
438
439        // Init handshake.
440        let init = PluginRequest::Init(InitParams {
441            source_name: self.manifest.name.clone(),
442            config: Default::default(),
443            protocol_version: PROTOCOL_VERSION.into(),
444        });
445        let id = state.next_id.wrapping_add(1);
446        state.next_id = id;
447        let req = PluginRpcRequest {
448            jsonrpc: JsonRpcVersion::current(),
449            id,
450            call: init,
451        };
452        let line = serde_json::to_string(&req).map_err(|e| PluginClientError::InitFailed {
453            plugin: self.manifest.name.clone(),
454            detail: e.to_string(),
455        })?;
456        let resp = self.exchange_locked(state, &line).await.map_err(|e| {
457            self.record_crash_locked_msg(state, "init exchange failed");
458            PluginClientError::InitFailed {
459                plugin: self.manifest.name.clone(),
460                detail: e.to_string(),
461            }
462        })?;
463        if resp.id != id {
464            return Err(PluginClientError::InitFailed {
465                plugin: self.manifest.name.clone(),
466                detail: format!("init reply id mismatch: expected {id}, got {}", resp.id),
467            });
468        }
469        match resp.outcome {
470            RpcOutcome::Result(_) => {}
471            RpcOutcome::Error(e) => {
472                self.record_crash_locked_msg(state, &format!("init returned error: {e}"));
473                return Err(PluginClientError::InitFailed {
474                    plugin: self.manifest.name.clone(),
475                    detail: e.to_string(),
476                });
477            }
478        }
479
480        Ok(())
481    }
482
483    async fn exchange_locked(
484        &self,
485        state: &mut ClientState,
486        line: &str,
487    ) -> Result<PluginRpcResponse, PluginClientError> {
488        let proc = state
489            .process
490            .as_mut()
491            .expect("exchange called without a running process");
492        proc.stdin
493            .write_all(line.as_bytes())
494            .await
495            .map_err(|source| PluginClientError::Io {
496                plugin: self.manifest.name.clone(),
497                source,
498            })?;
499        proc.stdin
500            .write_all(b"\n")
501            .await
502            .map_err(|source| PluginClientError::Io {
503                plugin: self.manifest.name.clone(),
504                source,
505            })?;
506        proc.stdin
507            .flush()
508            .await
509            .map_err(|source| PluginClientError::Io {
510                plugin: self.manifest.name.clone(),
511                source,
512            })?;
513
514        let mut reply = String::new();
515        let n =
516            proc.stdout
517                .read_line(&mut reply)
518                .await
519                .map_err(|source| PluginClientError::Io {
520                    plugin: self.manifest.name.clone(),
521                    source,
522                })?;
523        if n == 0 {
524            return Err(PluginClientError::Io {
525                plugin: self.manifest.name.clone(),
526                source: std::io::Error::new(
527                    std::io::ErrorKind::UnexpectedEof,
528                    "plugin closed stdout",
529                ),
530            });
531        }
532        serde_json::from_str(reply.trim_end()).map_err(|source| {
533            PluginClientError::MalformedResponse {
534                plugin: self.manifest.name.clone(),
535                source,
536            }
537        })
538    }
539
540    fn record_crash_locked(&self, state: &mut ClientState, _detail: String) {
541        let now = Instant::now();
542        state.crashes.push_back(now);
543        state.last_crash = Some(now);
544        if state.crashes.len() >= self.policy.restart_cap {
545            state.state = PluginState::Disabled {
546                reason: format!(
547                    "{} crashes in last {:?}",
548                    state.crashes.len(),
549                    self.policy.restart_window
550                ),
551            };
552        } else {
553            state.state = PluginState::Recovering;
554        }
555    }
556
557    fn record_crash_locked_msg(&self, state: &mut ClientState, _msg: &str) {
558        self.record_crash_locked(state, String::new());
559    }
560}
561
562impl Drop for PluginClient {
563    fn drop(&mut self) {
564        // Best-effort: if this is the last Arc, kill the child
565        // synchronously via tokio's `kill_on_drop` flag set at
566        // spawn. The async shutdown can't run from Drop, but
567        // `kill_on_drop` ensures the child is reaped without a
568        // zombie even if shutdown() was never called.
569    }
570}
571
572// =============================================================================
573// Tests
574// =============================================================================
575
576// Tests run a real subprocess plugin (a shell script). Shell
577// scripts and `chmod +x` (`std::os::unix::fs::PermissionsExt`)
578// are UNIX-only, so the test module is gated to UNIX targets.
579// Tests exec a freshly-written shell script via tokio. Linux runners
580// (especially cargo-llvm-cov + ubuntu-arm) sporadically surface
581// ETXTBSY ("Text file busy") even after sync_all + a sync warmup
582// exec, due to the kernel deferring text-segment release on those
583// filesystems. macOS runs the same exec without the quirk, so gate
584// the whole module to macOS — mirrors the same fix already in
585// `crates/plugins/secrets/1password/src/lib.rs`.
586#[cfg(all(test, target_os = "macos"))]
587mod tests {
588    use super::*;
589    use crate::plugin_manifest::PluginManifest;
590    use std::fs;
591    use std::os::unix::fs::PermissionsExt;
592    use std::path::Path;
593    use tempfile::TempDir;
594
595    /// Build a fake plugin shell-script that:
596    /// - reads JSON-RPC lines from stdin
597    /// - emits canned responses indexed by the request `id`
598    /// - exits cleanly on EOF (i.e. after stdin closes)
599    ///
600    /// `behaviour` is a small DSL: `"echo"` echoes Init back as
601    /// success; `"crash"` exits non-zero immediately; `"hang"`
602    /// reads but never writes (for grace-timeout tests);
603    /// `"env-dump"` writes the env to a sidecar file then echoes
604    /// Init.
605    fn write_fake_plugin(dir: &Path, name: &str, behaviour: &str) -> (PluginManifest, PathBuf) {
606        let exec_path = dir.join(format!("devboy-source-{name}"));
607        let script = match behaviour {
608            "echo" => format!(
609                r#"#!/bin/sh
610while IFS= read -r line; do
611  id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
612  printf '{{"jsonrpc":"2.0","id":%s,"result":{{"source_name":"{name}","capabilities_bits":1,"plugin_version":"0.0.1"}}}}\n' "$id"
613done
614"#
615            ),
616            "crash" => "#!/bin/sh\nexit 7\n".to_string(),
617            "hang" => "#!/bin/sh\nwhile read line; do :; done\nsleep 30\n".to_string(),
618            "env-dump" => format!(
619                r#"#!/bin/sh
620env > "{}/env-dump.txt"
621while IFS= read -r line; do
622  id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
623  printf '{{"jsonrpc":"2.0","id":%s,"result":{{"source_name":"{name}","capabilities_bits":1,"plugin_version":"0.0.1"}}}}\n' "$id"
624done
625"#,
626                dir.display()
627            ),
628            other => panic!("unknown behaviour: {other}"),
629        };
630        fs::write(&exec_path, script).unwrap();
631        let mut perms = fs::metadata(&exec_path).unwrap().permissions();
632        perms.set_mode(0o755);
633        fs::set_permissions(&exec_path, perms).unwrap();
634
635        let bytes = fs::read(&exec_path).unwrap();
636        use sha2::Digest;
637        let mut hasher = sha2::Sha256::new();
638        hasher.update(&bytes);
639        let checksum = hex::encode(hasher.finalize());
640
641        let manifest = PluginManifest {
642            name: name.into(),
643            version: "0.0.1".into(),
644            executable: PathBuf::from(format!("devboy-source-{name}")),
645            allowed_env_vars: vec!["DEVBOY_TEST_LET_THROUGH".into()],
646            checksum_sha256: checksum,
647        };
648        (manifest, exec_path)
649    }
650
651    fn fast_policy() -> LifetimePolicy {
652        LifetimePolicy {
653            idle_timeout: Duration::from_millis(80),
654            shutdown_grace: Duration::from_millis(200),
655            restart_window: Duration::from_secs(10),
656            restart_cap: 3,
657        }
658    }
659
660    // -- Spawn + init handshake --------------------------------
661
662    #[tokio::test]
663    async fn lazy_spawn_and_init_handshake_succeeds() {
664        let dir = TempDir::new().unwrap();
665        let (manifest, exec) = write_fake_plugin(dir.path(), "echo", "echo");
666        let client = PluginClient::new(manifest, exec, fast_policy());
667
668        let initial = client.health().await;
669        assert_eq!(initial.state, PluginState::Idle);
670
671        // First request triggers spawn + init + the request.
672        let resp = client.request(PluginRequest::IsAvailable).await;
673        // The fake "echo" script always returns the Init result
674        // shape, which serde will accept as PluginResponse::Init
675        // because we use serde(untagged). That's fine — the
676        // important assertion is that the exchange completed.
677        assert!(resp.is_ok(), "request failed: {resp:?}");
678        let after = client.health().await;
679        assert_eq!(after.state, PluginState::Running);
680        assert!(after.last_used.is_some());
681
682        client.shutdown().await;
683    }
684
685    // -- Idle timeout reaps process ----------------------------
686
687    #[tokio::test]
688    async fn idle_timeout_reaps_subprocess_before_next_request() {
689        let dir = TempDir::new().unwrap();
690        let (manifest, exec) = write_fake_plugin(dir.path(), "echoi", "echo");
691        let client = PluginClient::new(manifest, exec, fast_policy());
692
693        let _ = client.request(PluginRequest::IsAvailable).await.unwrap();
694        // Sleep past the idle timeout.
695        tokio::time::sleep(Duration::from_millis(150)).await;
696        // Next request must succeed — that means the client
697        // re-spawned cleanly.
698        let _ = client.request(PluginRequest::IsAvailable).await.unwrap();
699
700        client.shutdown().await;
701    }
702
703    // -- Restart cap -------------------------------------------
704
705    #[tokio::test]
706    async fn restart_cap_disables_after_repeated_spawn_failures() {
707        let dir = TempDir::new().unwrap();
708        let (manifest, exec) = write_fake_plugin(dir.path(), "crashc", "crash");
709        let client = PluginClient::new(manifest, exec, fast_policy());
710
711        // Each call: child exits before init handshake → recorded
712        // as a crash. Cap = 3.
713        for _ in 0..3 {
714            let _ = client.request(PluginRequest::IsAvailable).await;
715        }
716        let h = client.health().await;
717        assert!(
718            matches!(h.state, PluginState::Disabled { .. }),
719            "expected Disabled, got {:?}",
720            h.state
721        );
722        // Fourth call refuses without spawning.
723        let err = client
724            .request(PluginRequest::IsAvailable)
725            .await
726            .unwrap_err();
727        assert!(
728            matches!(err, PluginClientError::Disabled { .. }),
729            "expected Disabled error, got {err:?}"
730        );
731
732        // Operator clears the failure counter.
733        client.clear_disabled().await;
734        assert_eq!(client.health().await.state, PluginState::Idle);
735    }
736
737    // -- Env restriction ---------------------------------------
738
739    #[tokio::test]
740    async fn env_restriction_only_passes_allowed_vars() {
741        let dir = TempDir::new().unwrap();
742        let (manifest, exec) = write_fake_plugin(dir.path(), "envd", "env-dump");
743        let dir_path = dir.path().to_path_buf();
744        let client = PluginClient::new(manifest, exec, fast_policy());
745
746        // `temp_env` scopes the env mutation to the future
747        // body — no `unsafe` needed and other tests don't see
748        // the leakage.
749        temp_env::async_with_vars(
750            [
751                ("DEVBOY_TEST_SHOULD_NOT_LEAK", Some("leak-me")),
752                ("DEVBOY_TEST_LET_THROUGH", Some("passed-through")),
753            ],
754            async move {
755                let _ = client.request(PluginRequest::IsAvailable).await.unwrap();
756                client.shutdown().await;
757            },
758        )
759        .await;
760
761        // The fake plugin wrote its env to a sidecar file.
762        let dump = fs::read_to_string(dir_path.join("env-dump.txt")).unwrap();
763        assert!(
764            dump.contains("DEVBOY_TEST_LET_THROUGH=passed-through"),
765            "allowed var did not pass through: {dump}"
766        );
767        assert!(
768            !dump.contains("DEVBOY_TEST_SHOULD_NOT_LEAK"),
769            "non-allowed var leaked into plugin env: {dump}"
770        );
771    }
772
773    // -- Graceful shutdown -------------------------------------
774
775    #[tokio::test]
776    async fn shutdown_sends_sigterm_then_sigkill_on_grace_timeout() {
777        let dir = TempDir::new().unwrap();
778        let (manifest, exec) = write_fake_plugin(dir.path(), "hang", "hang");
779        let client = PluginClient::new(
780            manifest,
781            exec,
782            LifetimePolicy {
783                idle_timeout: Duration::from_secs(60),
784                shutdown_grace: Duration::from_millis(150),
785                restart_window: Duration::from_secs(10),
786                restart_cap: 3,
787            },
788        );
789
790        // Spawn the process by attempting a request. The "hang"
791        // plugin reads but never writes — request will hang too,
792        // so we time it out. Then call shutdown which must force
793        // -kill within grace + a small slack.
794        let req_fut = client.request(PluginRequest::IsAvailable);
795        let _ = tokio::time::timeout(Duration::from_millis(50), req_fut).await;
796        // Even though the request future was dropped, the
797        // process is still alive in the registry.
798
799        let start = Instant::now();
800        client.shutdown().await;
801        let elapsed = start.elapsed();
802        // Should land between grace_period and grace_period + 1s
803        // slack.
804        assert!(
805            elapsed < Duration::from_secs(2),
806            "shutdown took too long: {elapsed:?}"
807        );
808    }
809
810    // -- Default policy values --------------------------------
811
812    #[test]
813    fn default_policy_matches_adr_021_section_10() {
814        let p = LifetimePolicy::default();
815        assert_eq!(p.idle_timeout, Duration::from_secs(60));
816        assert_eq!(p.shutdown_grace, Duration::from_secs(10));
817        assert_eq!(p.restart_window, Duration::from_secs(60));
818        assert_eq!(p.restart_cap, 3);
819    }
820}