Skip to main content

algocline_app/service/
run.rs

1use std::sync::Arc;
2
3use algocline_core::QueryId;
4use algocline_engine::{FeedResult, VariantPkg};
5
6use super::eval_store::{splice_response_string, splice_response_warnings};
7use super::resolve::{is_package_installed, make_require_code, resolve_code, QueryResponse};
8use super::transcript::write_transcript_log;
9use super::AppService;
10use crate::pool::dispatch::{continue_via_pool, run_via_pool};
11
12/// Splice `save_warning` into the JSON `result` when the optional
13/// warning is `Some(_)`. Returns the original string unchanged when
14/// there is no warning.
15fn splice_save_warning(result_json: &str, warning: Option<String>) -> String {
16    match warning {
17        Some(msg) => splice_response_string(result_json, "save_warning", &msg),
18        None => result_json.to_string(),
19    }
20}
21
22/// Splice `transcript_warning` into the JSON `result` when the optional
23/// warning is `Some(_)`. Returns the original string unchanged when
24/// there is no warning.
25fn splice_transcript_warning(result_json: &str, warning: Option<String>) -> String {
26    match warning {
27        Some(msg) => splice_response_string(result_json, "transcript_warning", &msg),
28        None => result_json.to_string(),
29    }
30}
31
32impl AppService {
33    /// Execute Lua code with optional JSON context.
34    ///
35    /// When `host_mode: Some(true)` is passed, the call is proxied via
36    /// `PoolClient` to a long-lived worker subprocess over a Unix domain socket.
37    /// When `host_mode` is `None` or `Some(false)` the existing in-process
38    /// `Executor::start_session` path is used unchanged.
39    ///
40    /// # Concurrency
41    ///
42    /// **host_mode=false (default)**: No additional locking beyond `SessionRegistry`
43    /// lock C. `AppService` itself holds no long-lived lock during this call.
44    ///
45    /// **host_mode=true**: Acquires `RwLock<PoolRegistry>` (write) and advisory
46    /// `fs4::FileExt::lock_exclusive` to update `registry.json`. These locks are
47    /// **not** held across the UDS round-trip await.
48    ///
49    /// **Cancel safety**: cancelling this `.await` mid-UDS-request leaves the
50    /// worker subprocess running. The registry entry persists; callers can
51    /// reconnect via `alc_continue` after MCP restart.
52    pub async fn run(
53        &self,
54        code: Option<String>,
55        code_file: Option<String>,
56        ctx: Option<serde_json::Value>,
57        project_root: Option<String>,
58        host_mode: Option<bool>,
59    ) -> Result<String, String> {
60        let code = resolve_code(code, code_file)?;
61        let ctx = ctx.unwrap_or(serde_json::Value::Null);
62        let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
63        let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
64        let mut warnings: Vec<String> = extra_warnings;
65        warnings.extend(variant_warnings);
66
67        if host_mode == Some(true) {
68            // ── Pool path (Crux: MCP thin proxy IPC boundary) ─────────────────
69            // Worker subprocess is spawned and communicated via UDS.
70            // SessionRegistry (in-memory) is NOT touched on this path.
71            let (session_id, json, pool_save_error) = run_via_pool(
72                &self.pool_dir,
73                &self.pool_reg_path,
74                &self.pool_lock_path,
75                extra,
76                code,
77                ctx,
78            )
79            .await
80            .map_err(|e| e.to_string())?;
81
82            // session_id is stored in the JSON by the worker; update the
83            // in-memory registry so this MCP instance can route continues
84            // without another disk read.
85            // Load the just-persisted entry from disk to keep in-memory
86            // registry in sync.  This is a best-effort convenience cache;
87            // the disk state is authoritative.  Failure is surfaced to the
88            // MCP wire response as `pool_cache_reload_warning` so the caller
89            // can observe stale-cache conditions; tracing::warn! is kept for
90            // operator visibility in logs.
91            let cache_reload_warning: Option<String> =
92                match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
93                    Ok(reg) => {
94                        let mut guard = self.pool_registry.write().await;
95                        *guard = reg;
96                        None
97                    }
98                    Err(e) => {
99                        tracing::warn!(
100                            error = %e,
101                            "failed to reload pool registry after run; in-memory cache may be stale"
102                        );
103                        Some(e.to_string())
104                    }
105                };
106
107            let json = splice_response_warnings(&json, "lib_path_warnings", &warnings);
108            let json = match pool_save_error {
109                Some(msg) => splice_response_string(&json, "pool_save_error", &msg),
110                None => json,
111            };
112            let json = match cache_reload_warning {
113                Some(msg) => splice_response_string(&json, "pool_cache_reload_warning", &msg),
114                None => json,
115            };
116            let _ = session_id; // session_id is embedded in the JSON response
117            return Ok(json);
118        }
119
120        // ── In-process path (default) ──────────────────────────────────────────
121        let json = self
122            .start_and_tick(code, ctx, None, extra, variants)
123            .await?;
124        Ok(splice_response_warnings(
125            &json,
126            "lib_path_warnings",
127            &warnings,
128        ))
129    }
130
131    /// Apply a built-in strategy to a task.
132    ///
133    /// If the requested package is not installed, automatically installs the
134    /// bundled package collection from GitHub before executing.
135    ///
136    /// `project_root` — optional absolute path to the project root containing
137    /// `alc.lock`. Falls back to `ALC_PROJECT_ROOT` env or ancestor walk.
138    pub async fn advice(
139        &self,
140        strategy: &str,
141        task: Option<String>,
142        opts: Option<serde_json::Value>,
143        project_root: Option<String>,
144    ) -> Result<String, String> {
145        // Auto-install bundled packages if the requested strategy is missing
146        let app_dir = self.log_config.app_dir();
147        if !is_package_installed(&app_dir, strategy) {
148            self.auto_install_bundled_packages().await?;
149            if !is_package_installed(&app_dir, strategy) {
150                return Err(format!(
151                    "Package '{strategy}' not found after installing bundled collection. \
152                     Use alc_pkg_install to install it manually."
153                ));
154            }
155        }
156
157        let code = make_require_code(strategy);
158
159        let mut ctx_map = match opts {
160            Some(serde_json::Value::Object(m)) => m,
161            _ => serde_json::Map::new(),
162        };
163        if let Some(task) = task {
164            ctx_map.insert("task".into(), serde_json::Value::String(task));
165        }
166        let ctx = serde_json::Value::Object(ctx_map);
167
168        let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
169        let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
170        let mut warnings: Vec<String> = extra_warnings;
171        warnings.extend(variant_warnings);
172        let json = self
173            .start_and_tick(code, ctx, Some(strategy), extra, variants)
174            .await?;
175        Ok(splice_response_warnings(
176            &json,
177            "lib_path_warnings",
178            &warnings,
179        ))
180    }
181
182    /// Continue a paused execution — batch feed.
183    ///
184    /// For pool sessions (`session_id` found in registry.json), each response
185    /// in the batch is forwarded to the worker via `PoolClient::send_request`.
186    /// For in-MCP sessions, the existing `SessionRegistry::feed_response` path
187    /// is used unchanged.
188    pub async fn continue_batch(
189        &self,
190        session_id: &str,
191        responses: Vec<QueryResponse>,
192    ) -> Result<String, String> {
193        // ── Pool path check (same registry lookup as continue_single) ─────────
194        let pool_entry = {
195            let reg = self.pool_registry.read().await;
196            reg.find(session_id).cloned()
197        };
198
199        let pool_entry = if pool_entry.is_some() {
200            pool_entry
201        } else {
202            match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
203                Ok(reg) => {
204                    let entry = reg.find(session_id).cloned();
205                    if entry.is_some() {
206                        let mut guard = self.pool_registry.write().await;
207                        *guard = reg;
208                    }
209                    entry
210                }
211                Err(e) => {
212                    return Err(format!("Continue failed: {e}"));
213                }
214            }
215        };
216
217        if let Some(entry) = pool_entry {
218            // ── Pool routing ────────────────────────────────────────────────────
219            let mut last_json = None;
220            for qr in responses {
221                let json =
222                    continue_via_pool(&entry, session_id, qr.response, Some(qr.query_id), qr.usage)
223                        .await
224                        .map_err(|e| format!("Continue failed: {e}"))?;
225                last_json = Some(json);
226            }
227            return last_json.ok_or_else(|| "Empty responses array".to_string());
228        }
229
230        // ── In-MCP path ────────────────────────────────────────────────────────
231        let mut last_result = None;
232        for qr in responses {
233            let qid = QueryId::parse(&qr.query_id);
234            let result = self
235                .registry
236                .feed_response(session_id, &qid, qr.response, qr.usage.as_ref())
237                .await
238                .map_err(|e| format!("Continue failed: {e}"))?;
239            last_result = Some(result);
240        }
241        let result = last_result.ok_or("Empty responses array")?;
242        let transcript_warning = self.maybe_log_transcript(&result, session_id);
243        let json = result.to_json(session_id).to_string();
244        let json = splice_transcript_warning(&json, transcript_warning);
245        let save_warning = self.maybe_save_eval(&result, session_id, &json);
246        Ok(splice_save_warning(&json, save_warning))
247    }
248
249    /// Continue a paused execution — single response (with optional query_id).
250    ///
251    /// Routing is automatic: if `session_id` is found in `registry.json`
252    /// (pool path), the call is proxied via `PoolClient` over UDS. If not
253    /// found (in-MCP path), the existing `SessionRegistry::feed_response`
254    /// is used. Both paths never coexist for the same `session_id`.
255    ///
256    /// # Concurrency
257    ///
258    /// **Pool path**: acquires `RwLock<PoolRegistry>` (read) to look up the
259    /// session entry, then acquires `tokio::sync::Mutex` inside `PoolClient`
260    /// to serialize the UDS write. Neither lock is held across the UDS await.
261    ///
262    /// **In-MCP path**: acquires lock C in the two-phase pattern documented on
263    /// `SessionRegistry::feed_response`.
264    ///
265    /// **Cancel safety**: cancelling mid-await on the pool path leaves the
266    /// worker subprocess running (UDS send may have been partially written;
267    /// `read_line` is not cancel-safe — a partial line in the buffer renders
268    /// the connection unusable and `PoolClient` must reconnect).
269    pub async fn continue_single(
270        &self,
271        session_id: &str,
272        response: String,
273        query_id: Option<&str>,
274        usage: Option<algocline_core::TokenUsage>,
275    ) -> Result<String, String> {
276        // ── Pool path: check in-memory registry, then disk registry ───────────
277        // K-4: acquire read lock, clone the entry, release lock BEFORE await.
278        let pool_entry = {
279            let reg = self.pool_registry.read().await;
280            reg.find(session_id).cloned()
281        }; // read lock released here
282
283        // If in-memory cache missed, check disk (e.g. after MCP restart).
284        let pool_entry = if pool_entry.is_some() {
285            pool_entry
286        } else {
287            match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
288                Ok(reg) => {
289                    let entry = reg.find(session_id).cloned();
290                    if entry.is_some() {
291                        // Warm the in-memory cache.
292                        let mut guard = self.pool_registry.write().await;
293                        *guard = reg;
294                    }
295                    entry
296                }
297                Err(e) => {
298                    // Corrupt registry: propagate to MCP wire per §Error 伝播規律.
299                    return Err(format!("Continue failed: {e}"));
300                }
301            }
302        };
303
304        if let Some(entry) = pool_entry {
305            // ── Pool routing (Crux: MCP thin proxy IPC boundary) ──────────────
306            let json = continue_via_pool(
307                &entry,
308                session_id,
309                response,
310                query_id.map(str::to_string),
311                usage,
312            )
313            .await
314            .map_err(|e| format!("Continue failed: {e}"))?;
315            return Ok(json);
316        }
317
318        // ── In-MCP path ────────────────────────────────────────────────────────
319        let query_id = match query_id {
320            Some(qid) => QueryId::parse(qid),
321            None => self
322                .registry
323                .resolve_sole_pending_id(session_id)
324                .await
325                .map_err(|e| format!("Continue failed: {e}"))?,
326        };
327
328        let result = self
329            .registry
330            .feed_response(session_id, &query_id, response, usage.as_ref())
331            .await
332            .map_err(|e| format!("Continue failed: {e}"))?;
333
334        let transcript_warning = self.maybe_log_transcript(&result, session_id);
335        let json = result.to_json(session_id).to_string();
336        let json = splice_transcript_warning(&json, transcript_warning);
337        let save_warning = self.maybe_save_eval(&result, session_id, &json);
338        Ok(splice_save_warning(&json, save_warning))
339    }
340
341    // ─── Internal ───────────────────────────────────────────────
342
343    pub(super) fn maybe_log_transcript(
344        &self,
345        result: &FeedResult,
346        session_id: &str,
347    ) -> Option<String> {
348        if let FeedResult::Finished(exec_result) = result {
349            // Mutex poison means a previous thread panicked while holding the lock.
350            // Strategy name is non-critical for correctness, but the failure must be
351            // surfaced to MCP callers so it is observable, not silently dropped.
352            // See CLAUDE.md §Service 層の Error 伝播規律.
353            let strategy = match self.session_strategies.lock() {
354                Ok(mut map) => map.remove(session_id),
355                Err(e) => {
356                    tracing::warn!(
357                        "session_strategies mutex poisoned for '{}': {}",
358                        session_id,
359                        e
360                    );
361                    // Return warning immediately; transcript cannot be written
362                    // without strategy context being reliably recoverable.
363                    return Some(format!(
364                        "session_strategies mutex poisoned for '{session_id}': {e}"
365                    ));
366                }
367            };
368            // write_transcript_log returns Ok(Some(warning)) when meta write
369            // failed but the main log succeeded, so both Err and meta warning
370            // are surfaced as transcript_warning on the wire response.
371            match write_transcript_log(
372                &self.log_config,
373                session_id,
374                &exec_result.metrics,
375                strategy.as_deref(),
376            ) {
377                Err(e) => Some(e.to_string()),
378                Ok(meta_warning) => meta_warning,
379            }
380        } else {
381            None
382        }
383    }
384
385    /// Persist eval result for a finished session, returning any storage
386    /// failure as `Some(msg)` so the caller can surface it on the wire
387    /// response. `None` covers both "not an eval session" and
388    /// "successfully saved" — they are indistinguishable to the caller
389    /// because both produce the same wire shape.
390    pub(super) fn maybe_save_eval(
391        &self,
392        result: &FeedResult,
393        session_id: &str,
394        result_json: &str,
395    ) -> Option<String> {
396        if !matches!(result, FeedResult::Finished(_)) {
397            return None;
398        }
399        let strategy = {
400            let mut map = self.eval_sessions.lock().unwrap_or_else(|e| e.into_inner());
401            map.remove(session_id)
402        };
403        strategy.and_then(|s| {
404            super::eval_store::save_eval_result(&self.log_config.app_dir(), &s, result_json).err()
405        })
406    }
407
408    pub(super) async fn start_and_tick(
409        &self,
410        code: String,
411        ctx: serde_json::Value,
412        strategy: Option<&str>,
413        extra_lib_paths: Vec<std::path::PathBuf>,
414        variant_pkgs: Vec<VariantPkg>,
415    ) -> Result<String, String> {
416        let scenarios_dir = self.log_config.app_dir().scenarios_dir();
417        let session = self
418            .executor
419            .start_session(
420                code,
421                ctx,
422                extra_lib_paths,
423                variant_pkgs,
424                Arc::clone(&self.state_store),
425                Arc::clone(&self.card_store),
426                scenarios_dir,
427            )
428            .await?;
429        let (session_id, result) = self
430            .registry
431            .start_execution(session)
432            .await
433            .map_err(|e| format!("Execution failed: {e}"))?;
434        if let Some(s) = strategy {
435            if let Ok(mut map) = self.session_strategies.lock() {
436                map.insert(session_id.clone(), s.to_string());
437            }
438        }
439        let transcript_warning = self.maybe_log_transcript(&result, &session_id);
440        let json = result.to_json(&session_id).to_string();
441        Ok(splice_transcript_warning(&json, transcript_warning))
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use std::path::PathBuf;
448    use std::sync::Arc;
449
450    use algocline_core::{
451        AppDir, ExecutionMetrics, ExecutionObserver, LlmQuery, QueryId, TerminalState,
452    };
453    use algocline_engine::{ExecutionResult, FeedResult};
454
455    use super::super::config::{AppConfig, LogDirSource};
456    use super::{splice_transcript_warning, AppService};
457
458    fn make_metrics_with_transcript() -> ExecutionMetrics {
459        let metrics = ExecutionMetrics::new();
460        let observer = metrics.create_observer();
461        observer.on_paused(&[LlmQuery {
462            id: QueryId::single(),
463            prompt: "test prompt".into(),
464            system: None,
465            max_tokens: 100,
466            grounded: false,
467            underspecified: false,
468        }]);
469        metrics
470    }
471
472    fn make_finished_result(metrics: ExecutionMetrics) -> FeedResult {
473        FeedResult::Finished(ExecutionResult {
474            state: TerminalState::Completed {
475                result: serde_json::json!({"ok": true}),
476            },
477            metrics,
478        })
479    }
480
481    /// Build a minimal AppService with log_enabled and a custom log_dir.
482    async fn make_app_service_with_log_dir(log_dir: PathBuf) -> AppService {
483        let executor = Arc::new(
484            algocline_engine::Executor::new(vec![])
485                .await
486                .expect("executor"),
487        );
488        let tmp_app = tempfile::tempdir().expect("test tempdir");
489        let log_config = AppConfig {
490            log_dir: Some(log_dir),
491            log_dir_source: LogDirSource::EnvVar,
492            log_enabled: true,
493            prompt_preview_chars: 200,
494            app_dir: Arc::new(AppDir::new(tmp_app.path().to_path_buf())),
495        };
496        std::mem::forget(tmp_app);
497        AppService::new(executor, log_config, vec![])
498    }
499
500    // ── (b) maybe_log_transcript returns Some when write fails ──────────
501
502    #[tokio::test]
503    async fn maybe_log_transcript_returns_some_on_write_failure() {
504        let tmp = tempfile::tempdir().expect("test tempdir");
505        let log_dir = tmp.path().to_path_buf();
506        // Block write by creating a directory at the session file path.
507        std::fs::create_dir_all(log_dir.join("fail-session.json"))
508            .expect("pre-create dir to block write");
509        let svc = make_app_service_with_log_dir(log_dir).await;
510        let metrics = make_metrics_with_transcript();
511        let result = make_finished_result(metrics);
512        let warning = svc.maybe_log_transcript(&result, "fail-session");
513        assert!(warning.is_some(), "expected Some warning on write failure");
514        let msg = warning.unwrap();
515        assert!(
516            msg.contains("transcript"),
517            "warning should mention 'transcript', got: {msg}"
518        );
519    }
520
521    #[tokio::test]
522    async fn maybe_log_transcript_returns_none_on_non_finished() {
523        let tmp = tempfile::tempdir().expect("test tempdir");
524        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
525        let result = FeedResult::Accepted { remaining: 1 };
526        let warning = svc.maybe_log_transcript(&result, "any-session");
527        assert!(warning.is_none(), "Accepted result should return None");
528    }
529
530    // ── (c) splice_transcript_warning inserts field into JSON ───────────
531
532    #[test]
533    fn splice_transcript_warning_injects_field_when_some() {
534        let json = r#"{"status":"finished","result":{}}"#;
535        let out = splice_transcript_warning(json, Some("write failed".to_string()));
536        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
537        assert_eq!(
538            v["transcript_warning"].as_str(),
539            Some("write failed"),
540            "transcript_warning field should be present"
541        );
542        // Original fields are preserved.
543        assert_eq!(v["status"].as_str(), Some("finished"));
544    }
545
546    #[test]
547    fn splice_transcript_warning_passthrough_when_none() {
548        let json = r#"{"status":"finished"}"#;
549        let out = splice_transcript_warning(json, None);
550        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
551        assert!(
552            v.get("transcript_warning").is_none(),
553            "transcript_warning must be absent when warning is None"
554        );
555    }
556
557    // ── ST6: pool registry routing tests ────────────────────────────────────
558
559    use crate::pool::PoolSessionEntry;
560
561    /// T1: continue_single falls through to in-MCP path when session is not in pool registry.
562    ///
563    /// An unknown session ID should not be found in the pool registry and
564    /// should reach the `SessionRegistry::feed_response` path, which returns
565    /// an error because no session exists in the in-memory registry either.
566    #[tokio::test]
567    async fn continue_single_in_mcp_path_on_registry_miss() {
568        let tmp = tempfile::tempdir().expect("test tempdir");
569        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
570
571        // The pool registry on disk and in-memory is empty (no workers registered).
572        // continue_single should fall through to the in-MCP path and return
573        // "not found" because the in-memory session registry also has nothing.
574        let result = svc
575            .continue_single(
576                "nonexistent-session-id",
577                "some response".to_string(),
578                None,
579                None,
580            )
581            .await;
582        assert!(
583            result.is_err(),
584            "unknown session must return Err on in-MCP path"
585        );
586        let msg = result.unwrap_err();
587        assert!(
588            msg.contains("not found") || msg.contains("Continue failed"),
589            "error must indicate session not found, got: {msg}"
590        );
591    }
592
593    /// T2: AppService::new initialises pool_registry as empty when pool dir absent.
594    ///
595    /// Verifies that startup GC with a missing registry.json (normal first-run)
596    /// produces an empty PoolRegistry (not an error).
597    #[tokio::test]
598    async fn app_service_new_initialises_empty_pool_registry() {
599        let tmp = tempfile::tempdir().expect("test tempdir");
600        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
601
602        let reg = svc.pool_registry.read().await;
603        assert!(
604            reg.sessions.is_empty(),
605            "pool registry must be empty on first-run (no registry.json)"
606        );
607    }
608
609    /// T2b: AppService correctly stores pool registry paths derived from app_dir.
610    ///
611    /// Verifies that pool_dir / pool_reg_path / pool_lock_path are
612    /// non-empty paths derived from state_dir/pool/*.
613    #[tokio::test]
614    async fn app_service_pool_paths_correctly_derived() {
615        let tmp = tempfile::tempdir().expect("test tempdir");
616        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
617
618        assert!(
619            svc.pool_dir.ends_with("pool"),
620            "pool_dir must end in 'pool', got: {}",
621            svc.pool_dir.display()
622        );
623        assert!(
624            svc.pool_reg_path.ends_with("pool/registry.json"),
625            "pool_reg_path must end in 'pool/registry.json', got: {}",
626            svc.pool_reg_path.display()
627        );
628        assert!(
629            svc.pool_lock_path.ends_with("pool/registry.lock"),
630            "pool_lock_path must end in 'pool/registry.lock', got: {}",
631            svc.pool_lock_path.display()
632        );
633    }
634
635    /// T3: continue_single propagates PoolError::RegistryCorrupted to MCP wire.
636    ///
637    /// When registry.json is corrupt and there is a cache miss, continue_single
638    /// must return Err (not silently proceed with empty registry). This verifies
639    /// the CLAUDE.md §Error 伝播規律 invariant — no unwrap_or_default() swallowing.
640    #[tokio::test]
641    async fn continue_single_propagates_corrupted_registry_error() {
642        let tmp = tempfile::tempdir().expect("test tempdir");
643        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
644
645        // Write a corrupt registry.json to the pool directory.
646        let pool_dir = svc.pool_dir.clone();
647        std::fs::create_dir_all(&pool_dir).expect("create pool dir");
648        std::fs::write(pool_dir.join("registry.json"), b"{ not valid json !!!")
649            .expect("write corrupt registry");
650
651        // The in-memory cache is empty (startup GC failed on the corrupt file,
652        // so pool_registry is empty default).  The disk read in continue_single
653        // will hit the corrupt file and must propagate the error.
654        let result = svc
655            .continue_single("any-session-id", "response".to_string(), None, None)
656            .await;
657        assert!(
658            result.is_err(),
659            "corrupted registry must cause Err, not silent empty fallback"
660        );
661        let msg = result.unwrap_err();
662        assert!(
663            msg.contains("corrupted") || msg.contains("parse") || msg.contains("Continue failed"),
664            "error must mention registry problem, got: {msg}"
665        );
666    }
667
668    // ── pool_cache_reload_warning splice tests ───────────────────────────────
669
670    use super::super::eval_store::splice_response_string;
671
672    /// T1 (happy path): splice_response_string inserts pool_cache_reload_warning
673    /// into a valid JSON object response.
674    ///
675    /// Verifies the crux-card constraint: cache-reload failure must surface on
676    /// the MCP wire as an additive field, not remain warn!-only.
677    #[test]
678    fn splice_response_string_injects_cache_reload_warning() {
679        let json = r#"{"status":"finished","result":{"ok":true}}"#;
680        let msg = "failed to reload pool registry: No such file or directory";
681        let out = splice_response_string(json, "pool_cache_reload_warning", msg);
682        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
683        assert_eq!(
684            v["pool_cache_reload_warning"].as_str(),
685            Some(msg),
686            "pool_cache_reload_warning must be present in response"
687        );
688        // Original fields preserved (additive, not destructive).
689        assert_eq!(v["status"].as_str(), Some("finished"));
690    }
691
692    /// T2 (edge case): splice_response_string is a no-op when input is not a
693    /// JSON object (e.g. bare string or array).
694    ///
695    /// Guards against panics when strategy output is malformed.
696    #[test]
697    fn splice_response_string_passthrough_on_non_object_json() {
698        let non_object = r#""just a string""#;
699        let out = splice_response_string(non_object, "pool_cache_reload_warning", "err");
700        // Must return original unchanged.
701        assert_eq!(out, non_object);
702    }
703
704    /// T3 (error path / None branch): when cache_reload_warning is None, the
705    /// pool_cache_reload_warning field must NOT appear in the response JSON.
706    ///
707    /// Verifies the None arm of the new match block in run.rs leaves the JSON
708    /// untouched, consistent with the pool_save_error pattern.
709    #[test]
710    fn splice_response_string_not_called_when_none() {
711        let json = r#"{"status":"finished"}"#;
712        // Simulate the None branch: we simply do not call splice_response_string.
713        let v: serde_json::Value = serde_json::from_str(json).expect("valid JSON");
714        assert!(
715            v.get("pool_cache_reload_warning").is_none(),
716            "pool_cache_reload_warning must be absent when no cache-reload error occurred"
717        );
718    }
719
720    /// T1b: in-memory pool registry lookup finds an entry and routes to pool path.
721    ///
722    /// Inserts a live entry (current process PID) into the in-memory registry
723    /// and verifies that continue_single attempts the pool path (fails with
724    /// connection error because no real worker socket exists, not "not found").
725    #[tokio::test]
726    async fn continue_single_routes_to_pool_on_registry_hit() {
727        let tmp = tempfile::tempdir().expect("test tempdir");
728        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
729
730        // Insert a fake entry pointing to a non-existent socket.
731        // This simulates the case where a pool session was started.
732        let fake_sock = tmp.path().join("nonexistent.sock");
733        let entry = PoolSessionEntry::new(
734            "test-pool-session",
735            std::process::id(), // live PID — survives GC
736            fake_sock.clone(),
737            env!("CARGO_PKG_VERSION"),
738        );
739        {
740            let mut reg = svc.pool_registry.write().await;
741            reg.add(entry);
742        }
743
744        // continue_single should find the entry and attempt pool path.
745        // The UDS connect will fail (no socket file) → PoolError::Connect → Err.
746        // Importantly, the error is a connection error, NOT a "session not found" error.
747        let result = svc
748            .continue_single("test-pool-session", "response".to_string(), None, None)
749            .await;
750        assert!(
751            result.is_err(),
752            "pool path must fail with connect error (no real worker)"
753        );
754        let msg = result.unwrap_err();
755        // The error must come from pool path (UDS connect), not from SessionRegistry.
756        // "not found" would indicate the in-MCP path was taken instead.
757        assert!(
758            !msg.contains("session not found") || msg.contains("Continue failed"),
759            "error must be from pool path (UDS connect), got: {msg}"
760        );
761    }
762}