Skip to main content

algocline_app/pool/
dispatch.rs

1//! Host-mode dispatch helpers.
2//!
3//! Provides [`run_via_pool`] and [`continue_via_pool`] which orchestrate
4//! worker subprocess spawning, UDS connection, registry persistence, and
5//! response forwarding.
6//!
7//! ## Crux invariants enforced here
8//!
9//! - (MCP thin proxy IPC boundary): all host_mode=true calls reach a worker
10//!   subprocess via [`PoolClient`] over a Unix domain socket.  The MCP-internal
11//!   [`SessionRegistry`] is never touched on this path.
12//! - (Registry reconnect across restarts): every successful [`run_via_pool`]
13//!   call persists the session entry to `registry.json` via
14//!   [`with_registry_lock`] before returning.  A restarted MCP process can
15//!   rediscover live workers from that file.
16
17use std::path::{Path, PathBuf};
18
19use tokio::time::{sleep, timeout, Duration};
20
21use crate::pool::{
22    registry::with_registry_lock, PoolClient, PoolError, PoolRegistry, PoolRequest, PoolResponse,
23    PoolResponseData, PoolSessionEntry,
24};
25
26// ─── Worker spawn ─────────────────────────────────────────────────────────────
27
28/// Spawn a pool worker subprocess for the given session.
29///
30/// # Arguments
31///
32/// * `pool_dir` — directory for UDS sockets and the registry lock file.
33/// * `sid` — session ID string (UUID).
34///
35/// # Returns
36///
37/// `Ok((pid, sock_path))` on success. The returned `pid` is the OS-assigned
38/// process identifier captured immediately after `spawn()`; it is valid at
39/// the time of return but may be reused by the OS after the worker exits.
40///
41/// # Concurrency
42///
43/// **Reaping**: a `tokio::spawn` task is started to call `child.wait().await`
44/// (cancel safe per tokio docs). The spawned task runs independently; dropping
45/// its `JoinHandle` does not cancel it. The runtime reaps the OS process entry
46/// when `wait()` returns, preventing zombie accumulation.
47///
48/// **Cancel safety of the reap task**: `tokio::process::Child::wait` is cancel
49/// safe. If the MCP server runtime shuts down before the worker exits, the
50/// reap task is dropped; the worker process (setsid session leader) is
51/// reparented to init (PID 1) by the OS and reaped there — no zombie results.
52///
53/// **`child.id()` Option**: `tokio::process::Child::id()` returns `Option<u32>`.
54/// A `None` result (process already exited before id() was called) is
55/// propagated as `Err(PoolError::Spawn)`.
56///
57/// # Errors
58///
59/// - `PoolError::Spawn` — `current_exe()` failed, `Command::spawn()` failed,
60///   or `child.id()` returned `None` (process exited immediately after spawn).
61///
62/// # Panics
63///
64/// Does not panic. `tokio::spawn` panics if called outside a tokio runtime,
65/// but `spawn_worker` is only called from `run_via_pool` which runs on the
66/// MCP server's runtime.
67async fn spawn_worker(pool_dir: &Path, sid: &str) -> Result<(u32, PathBuf), PoolError> {
68    let sock = pool_dir.join(format!("{sid}.sock"));
69
70    let exe = std::env::current_exe()
71        .map_err(|e| PoolError::Spawn(format!("current_exe failed: {e}")))?;
72
73    let mut cmd = tokio::process::Command::new(&exe);
74    // "pool-worker" is a clap subcommand (kebab-case of PoolWorker enum variant).
75    // The worker binary must be invoked as `alc pool-worker --sid <sid> --sock <path>`.
76    cmd.args([
77        "pool-worker",
78        "--sid",
79        sid,
80        "--sock",
81        &sock.to_string_lossy(),
82    ]);
83
84    // SAFETY: pre_exec runs in the child process after fork, before exec.
85    // `libc::setsid()` is an async-signal-safe syscall.  No allocator,
86    // mutex, or tokio runtime is used inside the closure.
87    #[cfg(unix)]
88    {
89        unsafe {
90            cmd.pre_exec(|| {
91                // Detach the child from the parent's session so that
92                // parent (MCP server) death does not deliver SIGHUP to
93                // the worker.
94                libc::setsid();
95                Ok(())
96            });
97        }
98    }
99
100    let mut child = cmd
101        .spawn()
102        .map_err(|e| PoolError::Spawn(format!("worker spawn failed: {e}")))?;
103
104    // tokio::process::Child::id() returns Option<u32>: None if the process
105    // already exited before id() was called.
106    let pid = child.id().ok_or_else(|| {
107        PoolError::Spawn("child.id() returned None — process already exited".to_string())
108    })?;
109
110    // Fire-and-forget reap: start a background task that calls child.wait().await.
111    // Without this spawn, the Child handle drop would trigger a non-blocking
112    // orphan reap, but explicit wait() makes the reaping path observable and
113    // zombie-free even on slow exits.
114    // Dropping the JoinHandle does NOT cancel the task; the runtime continues
115    // running it until wait() completes.
116    let sid_owned = sid.to_string();
117    tokio::spawn(async move {
118        match child.wait().await {
119            Ok(status) => tracing::debug!(sid = %sid_owned, ?status, "pool worker reaped"),
120            Err(e) => tracing::warn!(sid = %sid_owned, error = %e, "pool worker wait error"),
121        }
122    });
123
124    Ok((pid, sock))
125}
126
127// ─── Public API ───────────────────────────────────────────────────────────────
128
129/// Generate a non-deterministic session ID for pool workers.
130///
131/// Uses timestamp + random bytes, matching the approach in
132/// `algocline-engine::session::gen_session_id`.
133fn gen_pool_sid() -> String {
134    use rand::RngExt;
135    use std::time::{SystemTime, UNIX_EPOCH};
136
137    // Timestamp prefix for rough ordering.  `unwrap_or_default` is
138    // justified: UNIX_EPOCH underflow only happens on misconfigured clocks
139    // where the timestamp suffix is 0.  The 16-hex-char random suffix alone
140    // guarantees uniqueness.
141    let ts = SystemTime::now()
142        .duration_since(UNIX_EPOCH)
143        .unwrap_or_default()
144        .as_nanos();
145
146    let random: u64 = rand::rng().random();
147    format!("p-{ts:x}-{random:016x}")
148}
149
150/// Spawn a worker subprocess, connect via UDS, proxy a `Run` request,
151/// and persist the session entry to `registry.json`.
152///
153/// On success, returns `(session_id, json_response, Option<pool_save_error>)`.
154/// The optional `pool_save_error` is `Some(msg)` when the registry write
155/// failed — callers must surface this as an additive field on the MCP wire
156/// response.
157///
158/// # Concurrency
159///
160/// **Cancel safety**: this function is **not** cancel safe. If dropped during
161/// `PoolClient::connect` or `send_request`, the `PoolClient` is dropped and
162/// the UDS connection is closed. The worker subprocess continues running; the
163/// session can be resumed via `continue_via_pool` using the registry entry.
164///
165/// **Locks**: acquires an advisory `fs4` flock (inside `spawn_blocking`) for
166/// the registry add/save. The lock is held only for the duration of the
167/// synchronous I/O in `persist_entry` and is released before this function
168/// returns. No `std::sync::Mutex` or `tokio::sync::Mutex` is held across
169/// `.await` points in this function.
170///
171/// **Zombie reaping**: `spawn_worker` starts a background `tokio::spawn` reap
172/// task. If this function returns `Err`, the spawned reap task continues in the
173/// background and the OS process entry is reaped when the worker exits.
174///
175/// **Socket wait timeout**: polls for socket file appearance with a 5 s
176/// `tokio::time::timeout`. If the socket does not appear in 5 s,
177/// `Err(PoolError::Handshake)` is returned.
178///
179/// # Errors
180///
181/// Returns `Err(PoolError)` if worker spawn, socket wait, or UDS run
182/// round-trip fails. Registry persistence failure is returned in the
183/// `pool_save_error` field, not as `Err`.
184pub async fn run_via_pool(
185    pool_dir: &Path,
186    reg_path: &Path,
187    lock_path: &Path,
188    extra_lib_paths: Vec<PathBuf>,
189    code: String,
190    ctx: serde_json::Value,
191) -> Result<(String, String, Option<String>), PoolError> {
192    // 1. Generate session ID and socket path.
193    let sid = gen_pool_sid();
194
195    // 2. Spawn worker subprocess (async — uses tokio::process::Command).
196    let (pid, sock) = spawn_worker(pool_dir, &sid).await?;
197
198    // 3. Poll for socket file appearance (timeout 5s).
199    {
200        let sock_clone = sock.clone();
201        timeout(Duration::from_secs(5), async {
202            loop {
203                if sock_clone.exists() {
204                    break;
205                }
206                sleep(Duration::from_millis(50)).await;
207            }
208        })
209        .await
210        .map_err(|_| {
211            PoolError::Handshake(format!(
212                "timeout waiting for worker socket at {}",
213                sock.display()
214            ))
215        })?;
216    }
217
218    // 4. Connect (includes handshake) and send Run request.
219    // Function-local PoolClient: connect → send → drop.
220    // Cancel safety: if this future is cancelled, the PoolClient is dropped
221    // and the connection is closed.  The next alc_continue call reconnects
222    // via registry.json.
223    let mut client = PoolClient::connect(&sock).await?;
224    let resp = client
225        .send_request(PoolRequest::Run {
226            code,
227            ctx: Some(ctx),
228            lib_paths: extra_lib_paths,
229        })
230        .await?;
231
232    let (worker_sid, feed_result_json) = extract_feed_response(resp)?;
233
234    // Convert the serde-format FeedResult to MCP wire JSON.
235    // The worker serializes FeedResult with standard serde ({"Paused": {...}})
236    // but callers expect the MCP format ({"status": "needs_response", ...}).
237    let mcp_json = feed_result_to_mcp_json(&worker_sid, &feed_result_json);
238
239    // 5. Persist the session entry to registry.json.
240    // Write failures are non-fatal: the session was already started.
241    // Callers surface the error as `pool_save_error` on the wire response.
242    let pool_save_error = persist_entry(
243        reg_path.to_path_buf(),
244        lock_path.to_path_buf(),
245        PoolSessionEntry::new(&worker_sid, pid, sock, env!("CARGO_PKG_VERSION")),
246    )
247    .await;
248
249    Ok((worker_sid, mcp_json.to_string(), pool_save_error))
250}
251
252/// Reconnect to an existing pool worker via its registry entry and forward a
253/// `Continue` request.
254///
255/// # Arguments
256///
257/// * `entry` — the registry entry for the target worker (sock path, etc.).
258/// * `sid` — session ID to resume.
259/// * `response` — LLM response text.
260/// * `query_id` — optional query ID being answered.
261/// * `usage` — optional token usage.
262///
263/// # Returns
264///
265/// `Ok(json_response)` — the stringified `feed_result` JSON from the worker.
266///
267/// # Errors
268///
269/// Returns `Err(PoolError)` on UDS connect failure or invalid response.
270///
271/// # Cancel safety
272///
273/// If this future is cancelled mid-await, the `PoolClient` is dropped and
274/// the socket is closed.  `read_line` is not cancel-safe; a partial line
275/// would corrupt the buffer.  Dropping the client is the correct recovery —
276/// the next `continue_via_pool` call creates a fresh connection.
277pub async fn continue_via_pool(
278    entry: &PoolSessionEntry,
279    sid: &str,
280    response: String,
281    query_id: Option<String>,
282    usage: Option<algocline_core::TokenUsage>,
283) -> Result<String, PoolError> {
284    // Function-local PoolClient: connect → send → drop.
285    let mut client = PoolClient::connect(&entry.sock).await?;
286    let resp = client
287        .send_request(PoolRequest::Continue {
288            sid: sid.to_string(),
289            response,
290            query_id,
291            usage,
292        })
293        .await?;
294
295    let (session_id, feed_result_json) = extract_feed_response(resp)?;
296    // Convert serde-format FeedResult to MCP wire JSON (same translation as run_via_pool).
297    let mcp_json = feed_result_to_mcp_json(&session_id, &feed_result_json);
298    Ok(mcp_json.to_string())
299}
300
301// ─── Internal helpers ─────────────────────────────────────────────────────────
302
303/// Convert a worker `FeedResult` serde-JSON value to the MCP wire format.
304///
305/// The worker serializes `FeedResult` with standard serde:
306/// - `FeedResult::Paused`   → `{"Paused": {"queries": [...]}}`
307/// - `FeedResult::Finished` → `{"Finished": {"state": ..., "metrics": ...}}`
308/// - `FeedResult::Accepted` → `{"Accepted": {"remaining": N}}`
309///
310/// This mirrors the logic of `algocline_engine::FeedResult::to_json` without
311/// requiring `FeedResult` to implement `Deserialize`.  The worker's serde
312/// output is the authoritative source.
313///
314/// # Arguments
315///
316/// * `session_id` — the pool worker's session ID (embedded in `needs_response`).
317/// * `feed_result` — the raw serde value from `PoolResponseData::Feed.feed_result`.
318///
319/// # Returns
320///
321/// MCP wire JSON.  On unrecognised shapes, falls back to
322/// `{"status": "error", "error": "..."}`.
323fn feed_result_to_mcp_json(session_id: &str, feed_result: &serde_json::Value) -> serde_json::Value {
324    use serde_json::json;
325
326    if let Some(paused) = feed_result.get("Paused") {
327        let queries = paused.get("queries").and_then(|q| q.as_array());
328        match queries {
329            Some(qs) if qs.len() == 1 => {
330                let q = &qs[0];
331                let mut obj = json!({
332                    "status": "needs_response",
333                    "session_id": session_id,
334                    "query_id": q.get("id").and_then(|v| v.as_str()).unwrap_or("q-0"),
335                    "prompt": q.get("prompt").cloned().unwrap_or(serde_json::Value::Null),
336                    "system": q.get("system").cloned().unwrap_or(serde_json::Value::Null),
337                    "max_tokens": q.get("max_tokens").cloned().unwrap_or(json!(1024)),
338                });
339                if q.get("grounded").and_then(|v| v.as_bool()).unwrap_or(false) {
340                    obj["grounded"] = json!(true);
341                }
342                if q.get("underspecified")
343                    .and_then(|v| v.as_bool())
344                    .unwrap_or(false)
345                {
346                    obj["underspecified"] = json!(true);
347                }
348                obj
349            }
350            Some(qs) => {
351                let mapped: Vec<serde_json::Value> = qs
352                    .iter()
353                    .map(|q| {
354                        let mut obj = json!({
355                            "id": q.get("id").cloned().unwrap_or(json!("q-0")),
356                            "prompt": q.get("prompt").cloned().unwrap_or(serde_json::Value::Null),
357                            "system": q.get("system").cloned().unwrap_or(serde_json::Value::Null),
358                            "max_tokens": q.get("max_tokens").cloned().unwrap_or(json!(1024)),
359                        });
360                        if q.get("grounded").and_then(|v| v.as_bool()).unwrap_or(false) {
361                            obj["grounded"] = json!(true);
362                        }
363                        if q.get("underspecified")
364                            .and_then(|v| v.as_bool())
365                            .unwrap_or(false)
366                        {
367                            obj["underspecified"] = json!(true);
368                        }
369                        obj
370                    })
371                    .collect();
372                json!({
373                    "status": "needs_response",
374                    "session_id": session_id,
375                    "queries": mapped,
376                })
377            }
378            None => json!({
379                "status": "needs_response",
380                "session_id": session_id,
381            }),
382        }
383    } else if let Some(finished) = feed_result.get("Finished") {
384        let state = finished.get("state");
385        let metrics = finished.get("metrics");
386        if let Some(completed) = state.and_then(|s| s.get("Completed")) {
387            json!({
388                "status": "completed",
389                "result": completed.get("result").cloned().unwrap_or(serde_json::Value::Null),
390                "stats": metrics.cloned().unwrap_or(serde_json::Value::Null),
391            })
392        } else if let Some(failed) = state.and_then(|s| s.get("Failed")) {
393            json!({
394                "status": "error",
395                "error": failed.get("error").and_then(|v| v.as_str()).unwrap_or("execution failed"),
396            })
397        } else {
398            json!({
399                "status": "cancelled",
400                "stats": metrics.cloned().unwrap_or(serde_json::Value::Null),
401            })
402        }
403    } else if let Some(accepted) = feed_result.get("Accepted") {
404        json!({
405            "status": "accepted",
406            "remaining": accepted.get("remaining").cloned().unwrap_or(json!(0)),
407        })
408    } else {
409        // Unrecognised shape — surface as error so callers can observe it.
410        json!({
411            "status": "error",
412            "error": format!("unrecognised FeedResult shape from worker: {feed_result}"),
413        })
414    }
415}
416
417/// Extract `(session_id, feed_result)` from a `PoolResponse`.
418///
419/// # Errors
420///
421/// Returns `PoolError::ResponseParse` if the response is not a `Feed` variant.
422fn extract_feed_response(resp: PoolResponse) -> Result<(String, serde_json::Value), PoolError> {
423    match resp.data {
424        Some(PoolResponseData::Feed {
425            session_id,
426            feed_result,
427        }) => Ok((session_id, feed_result)),
428        Some(other) => Err(PoolError::ResponseParse(format!(
429            "expected Feed response, got {other:?}"
430        ))),
431        None => {
432            let err = resp.error.unwrap_or_else(|| "unknown error".to_string());
433            Err(PoolError::ResponseParse(format!(
434                "worker returned error: {err}"
435            )))
436        }
437    }
438}
439
440/// Persist a session entry to `registry.json` under the advisory lock.
441///
442/// Returns `Some(error_message)` if the persist failed, `None` on success.
443/// This never returns `Err` — registry write failures are surfaced as
444/// additive `pool_save_error` fields on the MCP response, not as hard errors,
445/// because the session was already started successfully.
446///
447/// Registry I/O is synchronous and held under an advisory flock; the locked
448/// region is wrapped in `spawn_blocking` to avoid stalling a tokio runtime
449/// worker when another MCP process holds the lock.
450async fn persist_entry(
451    reg_path: PathBuf,
452    lock_path: PathBuf,
453    entry: PoolSessionEntry,
454) -> Option<String> {
455    match tokio::task::spawn_blocking(move || {
456        with_registry_lock(&lock_path, || {
457            let mut reg = PoolRegistry::load_or_default(&reg_path)?;
458            reg.add(entry);
459            reg.save(&reg_path)
460        })
461    })
462    .await
463    {
464        Ok(Ok(())) => None,
465        Ok(Err(e)) => Some(e.to_string()),
466        Err(e) => Some(format!("spawn_blocking join error: {e}")),
467    }
468}
469
470// ─── Tests ────────────────────────────────────────────────────────────────────
471
472#[cfg(test)]
473mod tests {
474    use std::path::PathBuf;
475
476    use super::*;
477    use crate::pool::{protocol::PoolResponseData, PoolResponse};
478
479    // ── T1: happy path ────────────────────────────────────────────────────────
480
481    /// T1 — extract_feed_response returns Ok for a valid Feed response.
482    #[test]
483    fn extract_feed_response_ok_on_feed() {
484        let resp = PoolResponse::success(PoolResponseData::Feed {
485            session_id: "test-sid".to_string(),
486            feed_result: serde_json::json!({"status": "needs_response"}),
487        });
488        let (sid, json) = extract_feed_response(resp).expect("should extract feed");
489        assert_eq!(sid, "test-sid");
490        assert_eq!(json["status"], "needs_response");
491    }
492
493    // ── T2: boundary / edge ───────────────────────────────────────────────────
494
495    /// T2 — gen_pool_sid returns distinct IDs on successive calls.
496    #[test]
497    fn gen_pool_sid_is_unique() {
498        let ids: Vec<_> = (0..20).map(|_| gen_pool_sid()).collect();
499        let unique: std::collections::HashSet<_> = ids.iter().collect();
500        assert_eq!(
501            unique.len(),
502            ids.len(),
503            "all generated session IDs must be distinct"
504        );
505    }
506
507    /// T2b — gen_pool_sid has expected prefix.
508    #[test]
509    fn gen_pool_sid_has_prefix() {
510        let sid = gen_pool_sid();
511        assert!(sid.starts_with("p-"), "sid must start with 'p-', got {sid}");
512    }
513
514    // ── T3: error path ────────────────────────────────────────────────────────
515
516    /// T3 — extract_feed_response returns ResponseParse error for a non-Feed variant.
517    #[test]
518    fn extract_feed_response_error_on_non_feed() {
519        let resp = PoolResponse::success(PoolResponseData::Shutdown);
520        let err = extract_feed_response(resp).expect_err("should fail on Shutdown response");
521        assert!(
522            matches!(err, PoolError::ResponseParse(_)),
523            "expected ResponseParse, got {err:?}"
524        );
525    }
526
527    /// T3b — extract_feed_response propagates the worker error message when ok=false.
528    #[test]
529    fn extract_feed_response_error_on_failure_response() {
530        let resp = PoolResponse::failure("something went wrong");
531        let err = extract_feed_response(resp).expect_err("should fail on error response");
532        match err {
533            PoolError::ResponseParse(msg) => {
534                assert!(
535                    msg.contains("something went wrong"),
536                    "error must include worker message, got: {msg}"
537                );
538            }
539            other => panic!("expected ResponseParse, got {other:?}"),
540        }
541    }
542
543    /// T3c — persist_entry returns Some(error) when the lock path parent
544    ///        directory cannot be created (permission denied simulation via
545    ///        using a path under a file, not a directory).
546    #[tokio::test]
547    async fn persist_entry_returns_some_on_io_error() {
548        let dir = tempfile::tempdir().expect("tempdir");
549        // Place a regular file where the parent directory is expected.
550        let blocker = dir.path().join("blocker");
551        std::fs::write(&blocker, b"not a dir").expect("write blocker");
552        let reg_path = blocker.join("registry.json"); // parent is a file
553        let lock_path = blocker.join("registry.lock");
554
555        let entry = PoolSessionEntry::new(
556            "test-sid",
557            std::process::id(),
558            PathBuf::from("/tmp/test.sock"),
559            "0.30.0",
560        );
561
562        let result = persist_entry(reg_path, lock_path, entry).await;
563        assert!(
564            result.is_some(),
565            "persist_entry must return Some(error) on I/O failure"
566        );
567    }
568
569    // ── T1: feed_result_to_mcp_json happy paths ───────────────────────────────
570
571    /// T1 — Paused with one query maps to needs_response with session_id.
572    #[test]
573    fn feed_result_to_mcp_json_paused_single_query() {
574        let feed = serde_json::json!({
575            "Paused": {
576                "queries": [{
577                    "id": "q-0",
578                    "prompt": "What is 1+1?",
579                    "system": null,
580                    "max_tokens": 1024,
581                    "grounded": false,
582                    "underspecified": false
583                }]
584            }
585        });
586        let mcp = feed_result_to_mcp_json("sid-abc", &feed);
587        assert_eq!(mcp["status"], "needs_response");
588        assert_eq!(mcp["session_id"], "sid-abc");
589        assert_eq!(mcp["query_id"], "q-0");
590        assert_eq!(mcp["prompt"], "What is 1+1?");
591    }
592
593    /// T1b — Finished(Completed) maps to completed with result.
594    #[test]
595    fn feed_result_to_mcp_json_finished_completed() {
596        let feed = serde_json::json!({
597            "Finished": {
598                "state": {
599                    "Completed": { "result": {"answer": 42} }
600                },
601                "metrics": {}
602            }
603        });
604        let mcp = feed_result_to_mcp_json("sid-xyz", &feed);
605        assert_eq!(mcp["status"], "completed");
606        assert_eq!(mcp["result"]["answer"], 42);
607    }
608
609    /// T2 — Paused with multiple queries maps to queries array.
610    #[test]
611    fn feed_result_to_mcp_json_paused_multi_query() {
612        let feed = serde_json::json!({
613            "Paused": {
614                "queries": [
615                    {"id": "q-0", "prompt": "P1", "system": null, "max_tokens": 512},
616                    {"id": "q-1", "prompt": "P2", "system": null, "max_tokens": 512}
617                ]
618            }
619        });
620        let mcp = feed_result_to_mcp_json("sid-multi", &feed);
621        assert_eq!(mcp["status"], "needs_response");
622        assert_eq!(mcp["session_id"], "sid-multi");
623        let qs = mcp["queries"].as_array().expect("queries array");
624        assert_eq!(qs.len(), 2);
625        assert_eq!(qs[0]["id"], "q-0");
626    }
627
628    /// T3 — Unrecognised FeedResult shape maps to error status.
629    #[test]
630    fn feed_result_to_mcp_json_unknown_shape_is_error() {
631        let feed = serde_json::json!({"Unknown": {}});
632        let mcp = feed_result_to_mcp_json("sid-bad", &feed);
633        assert_eq!(mcp["status"], "error");
634        assert!(
635            mcp["error"].as_str().unwrap_or("").contains("unrecognised"),
636            "error message must mention 'unrecognised', got: {}",
637            mcp["error"]
638        );
639    }
640
641    // ── G1/G2: concurrency — zombie reaping ───────────────────────────────────
642
643    /// G1 — spawn_worker uses tokio::process::Command and the reap task
644    ///      prevents zombie accumulation.  Verified by spawning /bin/true
645    ///      (instant exit), waiting for the reap task to complete, then
646    ///      confirming the OS has cleaned up the PID entry.
647    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
648    async fn test_spawn_worker_reaps_child_no_zombie() {
649        // Directly exercise the tokio::process machinery (not spawn_worker itself,
650        // which requires a valid worker binary path).  We reproduce the exact
651        // reaping pattern used inside spawn_worker.
652        let mut cmd = tokio::process::Command::new("true");
653        let mut child = cmd.spawn().expect("spawn true");
654        let pid = child.id().expect("child.id() must be Some before wait");
655        // Start the fire-and-forget reap task — mirrors spawn_worker behaviour.
656        tokio::spawn(async move {
657            let _ = child.wait().await;
658        });
659        // Give the reap task enough time to call wait() and the OS to remove the entry.
660        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
661        // Confirm the PID is no longer present in the OS process table.
662        // kill(pid, 0) returns -1 / ESRCH when the process is fully reaped.
663        let pid_i32 = i32::try_from(pid).expect("pid fits i32");
664        // SAFETY: kill(pid, 0) is a signal existence check; it does not deliver
665        // a signal and is safe to call with an arbitrary PID.
666        let rc = unsafe { libc::kill(pid_i32, 0) };
667        assert_eq!(
668            rc, -1,
669            "process should be gone (kill(pid,0) must return -1)"
670        );
671        let errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(0);
672        // ESRCH (3) = no such process — the expected errno when fully reaped.
673        assert_eq!(
674            errno,
675            libc::ESRCH,
676            "errno must be ESRCH (no such process), got {errno}"
677        );
678    }
679
680    /// G2 — `tokio::process::Child::id()` returns `None` after `wait()` completes,
681    ///      and `ok_or_else` correctly maps that to `PoolError::Spawn`.
682    ///      This validates the exact error propagation path used inside spawn_worker.
683    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
684    async fn test_spawn_worker_child_id_none_returns_pool_error() {
685        let mut child = tokio::process::Command::new("false")
686            .spawn()
687            .expect("spawn false");
688        // After wait() completes the child is consumed; id() returns None.
689        let _status = child.wait().await.expect("wait");
690        let id = child.id();
691        assert!(
692            id.is_none(),
693            "child.id() must be None after wait(): got {:?}",
694            id
695        );
696        // Reproduce the ok_or_else path from spawn_worker.
697        let result: Result<u32, crate::pool::PoolError> = id.ok_or_else(|| {
698            crate::pool::PoolError::Spawn(
699                "child.id() returned None — process already exited".to_string(),
700            )
701        });
702        assert!(
703            matches!(result, Err(crate::pool::PoolError::Spawn(_))),
704            "expected Err(PoolError::Spawn), got {:?}",
705            result
706        );
707    }
708}