Skip to main content

algocline_app/service/
run.rs

1use std::sync::Arc;
2
3use algocline_core::QueryId;
4use algocline_engine::{FeedResult, VariantPkg};
5
6use super::eval_store::{splice_response_string, splice_response_warnings};
7use super::resolve::{is_package_installed, make_require_code, resolve_code, QueryResponse};
8use super::transcript::write_transcript_log;
9use super::AppService;
10use crate::pool::dispatch::{continue_via_pool, run_via_pool};
11
12/// Recover from MCP clients that JSON-stringify an object-typed field
13/// before sending. The MCP `inputSchema` for `ctx` / `opts` /
14/// `strategy_opts` declares `type: object`, but some clients send a
15/// JSON-encoded string instead. Without this normalisation the value
16/// reaches Lua as a string and breaks pkgs that require a table
17/// (issue 1778656404-63015).
18///
19/// Only `Value::String` payloads that parse into a JSON object or
20/// array are replaced; ordinary strings and primitive scalars pass
21/// through untouched.
22pub(crate) fn normalize_stringified_json_object(v: serde_json::Value) -> serde_json::Value {
23    match v {
24        serde_json::Value::String(ref s) => match serde_json::from_str::<serde_json::Value>(s) {
25            Ok(parsed @ serde_json::Value::Object(_)) => parsed,
26            Ok(parsed @ serde_json::Value::Array(_)) => parsed,
27            _ => v,
28        },
29        other => other,
30    }
31}
32
33/// Splice `save_warning` into the JSON `result` when the optional
34/// warning is `Some(_)`. Returns the original string unchanged when
35/// there is no warning.
36fn splice_save_warning(result_json: &str, warning: Option<String>) -> String {
37    match warning {
38        Some(msg) => splice_response_string(result_json, "save_warning", &msg),
39        None => result_json.to_string(),
40    }
41}
42
43/// Splice `transcript_warning` into the JSON `result` when the optional
44/// warning is `Some(_)`. Returns the original string unchanged when
45/// there is no warning.
46fn splice_transcript_warning(result_json: &str, warning: Option<String>) -> String {
47    match warning {
48        Some(msg) => splice_response_string(result_json, "transcript_warning", &msg),
49        None => result_json.to_string(),
50    }
51}
52
53impl AppService {
54    /// Execute Lua code with optional JSON context.
55    ///
56    /// When `host_mode: Some(true)` is passed, the call is proxied via
57    /// `PoolClient` to a long-lived worker subprocess over a Unix domain socket.
58    /// When `host_mode` is `None` or `Some(false)` the existing in-process
59    /// `Executor::start_session` path is used unchanged.
60    ///
61    /// # Concurrency
62    ///
63    /// **host_mode=false (default)**: No additional locking beyond `SessionRegistry`
64    /// lock C. `AppService` itself holds no long-lived lock during this call.
65    ///
66    /// **host_mode=true**: Acquires `RwLock<PoolRegistry>` (write) and advisory
67    /// `fs4::FileExt::lock_exclusive` to update `registry.json`. These locks are
68    /// **not** held across the UDS round-trip await.
69    ///
70    /// **Cancel safety**: cancelling this `.await` mid-UDS-request leaves the
71    /// worker subprocess running. The registry entry persists; callers can
72    /// reconnect via `alc_continue` after MCP restart.
73    pub async fn run(
74        &self,
75        code: Option<String>,
76        code_file: Option<String>,
77        ctx: Option<serde_json::Value>,
78        project_root: Option<String>,
79        host_mode: Option<bool>,
80    ) -> Result<String, String> {
81        let code = resolve_code(code, code_file)?;
82        let ctx = normalize_stringified_json_object(ctx.unwrap_or(serde_json::Value::Null));
83        let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
84        let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
85        let mut warnings: Vec<String> = extra_warnings;
86        warnings.extend(variant_warnings);
87
88        if host_mode == Some(true) {
89            // ── Pool path (Crux: MCP thin proxy IPC boundary) ─────────────────
90            // Worker subprocess is spawned and communicated via UDS.
91            // SessionRegistry (in-memory) is NOT touched on this path.
92            let (session_id, json, pool_save_error) = run_via_pool(
93                &self.pool_dir,
94                &self.pool_reg_path,
95                &self.pool_lock_path,
96                extra,
97                code,
98                ctx,
99            )
100            .await
101            .map_err(|e| e.to_string())?;
102
103            // session_id is stored in the JSON by the worker; update the
104            // in-memory registry so this MCP instance can route continues
105            // without another disk read.
106            // Load the just-persisted entry from disk to keep in-memory
107            // registry in sync.  This is a best-effort convenience cache;
108            // the disk state is authoritative.  Failure is surfaced to the
109            // MCP wire response as `pool_cache_reload_warning` so the caller
110            // can observe stale-cache conditions; tracing::warn! is kept for
111            // operator visibility in logs.
112            let cache_reload_warning: Option<String> =
113                match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
114                    Ok(reg) => {
115                        let mut guard = self.pool_registry.write().await;
116                        *guard = reg;
117                        None
118                    }
119                    Err(e) => {
120                        tracing::warn!(
121                            error = %e,
122                            "failed to reload pool registry after run; in-memory cache may be stale"
123                        );
124                        Some(e.to_string())
125                    }
126                };
127
128            let json = splice_response_warnings(&json, "lib_path_warnings", &warnings);
129            let json = match pool_save_error {
130                Some(msg) => splice_response_string(&json, "pool_save_error", &msg),
131                None => json,
132            };
133            let json = match cache_reload_warning {
134                Some(msg) => splice_response_string(&json, "pool_cache_reload_warning", &msg),
135                None => json,
136            };
137            let _ = session_id; // session_id is embedded in the JSON response
138            return Ok(json);
139        }
140
141        // ── In-process path (default) ──────────────────────────────────────────
142        let json = self
143            .start_and_tick(code, ctx, None, extra, variants)
144            .await?;
145        Ok(splice_response_warnings(
146            &json,
147            "lib_path_warnings",
148            &warnings,
149        ))
150    }
151
152    /// Apply a built-in strategy to a task.
153    ///
154    /// If the requested package is not installed, automatically installs the
155    /// bundled package collection from GitHub before executing.
156    ///
157    /// `project_root` — optional absolute path to the project root containing
158    /// `alc.lock`. Falls back to `ALC_PROJECT_ROOT` env or ancestor walk.
159    pub async fn advice(
160        &self,
161        strategy: &str,
162        task: Option<String>,
163        opts: Option<serde_json::Value>,
164        project_root: Option<String>,
165    ) -> Result<String, String> {
166        // Auto-install bundled packages if the requested strategy is missing
167        let app_dir = self.log_config.app_dir();
168        if !is_package_installed(&app_dir, strategy) {
169            self.auto_install_bundled_packages().await?;
170            if !is_package_installed(&app_dir, strategy) {
171                return Err(format!(
172                    "Package '{strategy}' not found after installing bundled collection. \
173                     Use alc_pkg_install to install it manually."
174                ));
175            }
176        }
177
178        let code = make_require_code(strategy);
179
180        let opts = opts.map(normalize_stringified_json_object);
181        let mut ctx_map = match opts {
182            Some(serde_json::Value::Object(m)) => m,
183            _ => serde_json::Map::new(),
184        };
185        if let Some(task) = task {
186            ctx_map.insert("task".into(), serde_json::Value::String(task));
187        }
188        let ctx = serde_json::Value::Object(ctx_map);
189
190        let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
191        let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
192        let mut warnings: Vec<String> = extra_warnings;
193        warnings.extend(variant_warnings);
194        let json = self
195            .start_and_tick(code, ctx, Some(strategy), extra, variants)
196            .await?;
197        Ok(splice_response_warnings(
198            &json,
199            "lib_path_warnings",
200            &warnings,
201        ))
202    }
203
204    /// Continue a paused execution — batch feed.
205    ///
206    /// For pool sessions (`session_id` found in registry.json), each response
207    /// in the batch is forwarded to the worker via `PoolClient::send_request`.
208    /// For in-MCP sessions, the existing `SessionRegistry::feed_response` path
209    /// is used unchanged.
210    pub async fn continue_batch(
211        &self,
212        session_id: &str,
213        responses: Vec<QueryResponse>,
214    ) -> Result<String, String> {
215        // ── Pool path check (same registry lookup as continue_single) ─────────
216        let pool_entry = {
217            let reg = self.pool_registry.read().await;
218            reg.find(session_id).cloned()
219        };
220
221        let pool_entry = if pool_entry.is_some() {
222            pool_entry
223        } else {
224            match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
225                Ok(reg) => {
226                    let entry = reg.find(session_id).cloned();
227                    if entry.is_some() {
228                        let mut guard = self.pool_registry.write().await;
229                        *guard = reg;
230                    }
231                    entry
232                }
233                Err(e) => {
234                    return Err(format!("Continue failed: {e}"));
235                }
236            }
237        };
238
239        if let Some(entry) = pool_entry {
240            // ── Pool routing ────────────────────────────────────────────────────
241            let mut last_json = None;
242            for qr in responses {
243                let json =
244                    continue_via_pool(&entry, session_id, qr.response, Some(qr.query_id), qr.usage)
245                        .await
246                        .map_err(|e| format!("Continue failed: {e}"))?;
247                last_json = Some(json);
248            }
249            return last_json.ok_or_else(|| "Empty responses array".to_string());
250        }
251
252        // ── In-MCP path ────────────────────────────────────────────────────────
253        let mut last_result = None;
254        for qr in responses {
255            let qid = QueryId::parse(&qr.query_id);
256            let result = self
257                .registry
258                .feed_response(session_id, &qid, qr.response, qr.usage.as_ref())
259                .await
260                .map_err(|e| format!("Continue failed: {e}"))?;
261            last_result = Some(result);
262        }
263        let result = last_result.ok_or("Empty responses array")?;
264        let transcript_warning = self.maybe_log_transcript(&result, session_id);
265        let json = result.to_json(session_id).to_string();
266        let json = splice_transcript_warning(&json, transcript_warning);
267        let save_warning = self.maybe_save_eval(&result, session_id, &json);
268        Ok(splice_save_warning(&json, save_warning))
269    }
270
271    /// Continue a paused execution — single response (with optional query_id).
272    ///
273    /// Routing is automatic: if `session_id` is found in `registry.json`
274    /// (pool path), the call is proxied via `PoolClient` over UDS. If not
275    /// found (in-MCP path), the existing `SessionRegistry::feed_response`
276    /// is used. Both paths never coexist for the same `session_id`.
277    ///
278    /// # Concurrency
279    ///
280    /// **Pool path**: acquires `RwLock<PoolRegistry>` (read) to look up the
281    /// session entry, then acquires `tokio::sync::Mutex` inside `PoolClient`
282    /// to serialize the UDS write. Neither lock is held across the UDS await.
283    ///
284    /// **In-MCP path**: acquires lock C in the two-phase pattern documented on
285    /// `SessionRegistry::feed_response`.
286    ///
287    /// **Cancel safety**: cancelling mid-await on the pool path leaves the
288    /// worker subprocess running (UDS send may have been partially written;
289    /// `read_line` is not cancel-safe — a partial line in the buffer renders
290    /// the connection unusable and `PoolClient` must reconnect).
291    pub async fn continue_single(
292        &self,
293        session_id: &str,
294        response: String,
295        query_id: Option<&str>,
296        usage: Option<algocline_core::TokenUsage>,
297    ) -> Result<String, String> {
298        // ── Pool path: check in-memory registry, then disk registry ───────────
299        // K-4: acquire read lock, clone the entry, release lock BEFORE await.
300        let pool_entry = {
301            let reg = self.pool_registry.read().await;
302            reg.find(session_id).cloned()
303        }; // read lock released here
304
305        // If in-memory cache missed, check disk (e.g. after MCP restart).
306        let pool_entry = if pool_entry.is_some() {
307            pool_entry
308        } else {
309            match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
310                Ok(reg) => {
311                    let entry = reg.find(session_id).cloned();
312                    if entry.is_some() {
313                        // Warm the in-memory cache.
314                        let mut guard = self.pool_registry.write().await;
315                        *guard = reg;
316                    }
317                    entry
318                }
319                Err(e) => {
320                    // Corrupt registry: propagate to MCP wire per §Error 伝播規律.
321                    return Err(format!("Continue failed: {e}"));
322                }
323            }
324        };
325
326        if let Some(entry) = pool_entry {
327            // ── Pool routing (Crux: MCP thin proxy IPC boundary) ──────────────
328            let json = continue_via_pool(
329                &entry,
330                session_id,
331                response,
332                query_id.map(str::to_string),
333                usage,
334            )
335            .await
336            .map_err(|e| format!("Continue failed: {e}"))?;
337            return Ok(json);
338        }
339
340        // ── In-MCP path ────────────────────────────────────────────────────────
341        let query_id = match query_id {
342            Some(qid) => QueryId::parse(qid),
343            None => self
344                .registry
345                .resolve_sole_pending_id(session_id)
346                .await
347                .map_err(|e| format!("Continue failed: {e}"))?,
348        };
349
350        let result = self
351            .registry
352            .feed_response(session_id, &query_id, response, usage.as_ref())
353            .await
354            .map_err(|e| format!("Continue failed: {e}"))?;
355
356        let transcript_warning = self.maybe_log_transcript(&result, session_id);
357        let json = result.to_json(session_id).to_string();
358        let json = splice_transcript_warning(&json, transcript_warning);
359        let save_warning = self.maybe_save_eval(&result, session_id, &json);
360        Ok(splice_save_warning(&json, save_warning))
361    }
362
363    // ─── Internal ───────────────────────────────────────────────
364
365    pub(super) fn maybe_log_transcript(
366        &self,
367        result: &FeedResult,
368        session_id: &str,
369    ) -> Option<String> {
370        if let FeedResult::Finished(exec_result) = result {
371            // Mutex poison means a previous thread panicked while holding the lock.
372            // Strategy name is non-critical for correctness, but the failure must be
373            // surfaced to MCP callers so it is observable, not silently dropped.
374            // See CLAUDE.md §Service 層の Error 伝播規律.
375            let strategy = match self.session_strategies.lock() {
376                Ok(mut map) => map.remove(session_id),
377                Err(e) => {
378                    tracing::warn!(
379                        "session_strategies mutex poisoned for '{}': {}",
380                        session_id,
381                        e
382                    );
383                    // Return warning immediately; transcript cannot be written
384                    // without strategy context being reliably recoverable.
385                    return Some(format!(
386                        "session_strategies mutex poisoned for '{session_id}': {e}"
387                    ));
388                }
389            };
390            // write_transcript_log returns Ok(Some(warning)) when meta write
391            // failed but the main log succeeded, so both Err and meta warning
392            // are surfaced as transcript_warning on the wire response.
393            match write_transcript_log(
394                &self.log_config,
395                session_id,
396                &exec_result.metrics,
397                strategy.as_deref(),
398            ) {
399                Err(e) => Some(e.to_string()),
400                Ok(meta_warning) => meta_warning,
401            }
402        } else {
403            None
404        }
405    }
406
407    /// Persist eval result for a finished session, returning any storage
408    /// failure as `Some(msg)` so the caller can surface it on the wire
409    /// response. `None` covers both "not an eval session" and
410    /// "successfully saved" — they are indistinguishable to the caller
411    /// because both produce the same wire shape.
412    pub(super) fn maybe_save_eval(
413        &self,
414        result: &FeedResult,
415        session_id: &str,
416        result_json: &str,
417    ) -> Option<String> {
418        if !matches!(result, FeedResult::Finished(_)) {
419            return None;
420        }
421        let strategy = {
422            let mut map = self.eval_sessions.lock().unwrap_or_else(|e| e.into_inner());
423            map.remove(session_id)
424        };
425        strategy.and_then(|s| {
426            super::eval_store::save_eval_result(&self.log_config.app_dir(), &s, result_json).err()
427        })
428    }
429
430    pub(super) async fn start_and_tick(
431        &self,
432        code: String,
433        ctx: serde_json::Value,
434        strategy: Option<&str>,
435        extra_lib_paths: Vec<std::path::PathBuf>,
436        variant_pkgs: Vec<VariantPkg>,
437    ) -> Result<String, String> {
438        let scenarios_dir = self.log_config.app_dir().scenarios_dir();
439        let session = self
440            .executor
441            .start_session(
442                code,
443                ctx,
444                extra_lib_paths,
445                variant_pkgs,
446                Arc::clone(&self.state_store),
447                Arc::clone(&self.card_store),
448                scenarios_dir,
449            )
450            .await?;
451        let (session_id, result) = self
452            .registry
453            .start_execution(session)
454            .await
455            .map_err(|e| format!("Execution failed: {e}"))?;
456        if let Some(s) = strategy {
457            if let Ok(mut map) = self.session_strategies.lock() {
458                map.insert(session_id.clone(), s.to_string());
459            }
460        }
461        let transcript_warning = self.maybe_log_transcript(&result, &session_id);
462        let json = result.to_json(&session_id).to_string();
463        Ok(splice_transcript_warning(&json, transcript_warning))
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use std::path::PathBuf;
470    use std::sync::Arc;
471
472    use algocline_core::{
473        AppDir, ExecutionMetrics, ExecutionObserver, LlmQuery, QueryId, TerminalState,
474    };
475    use algocline_engine::{ExecutionResult, FeedResult};
476
477    use super::super::config::{AppConfig, LogDirSource};
478    use super::{splice_transcript_warning, AppService};
479
480    fn make_metrics_with_transcript() -> ExecutionMetrics {
481        let metrics = ExecutionMetrics::new();
482        let observer = metrics.create_observer();
483        observer.on_paused(&[LlmQuery {
484            id: QueryId::single(),
485            prompt: "test prompt".into(),
486            system: None,
487            max_tokens: 100,
488            grounded: false,
489            underspecified: false,
490        }]);
491        metrics
492    }
493
494    fn make_finished_result(metrics: ExecutionMetrics) -> FeedResult {
495        FeedResult::Finished(ExecutionResult {
496            state: TerminalState::Completed {
497                result: serde_json::json!({"ok": true}),
498            },
499            metrics,
500        })
501    }
502
503    /// Build a minimal AppService with log_enabled and a custom log_dir.
504    async fn make_app_service_with_log_dir(log_dir: PathBuf) -> AppService {
505        let executor = Arc::new(
506            algocline_engine::Executor::new(vec![])
507                .await
508                .expect("executor"),
509        );
510        let tmp_app = tempfile::tempdir().expect("test tempdir");
511        let log_config = AppConfig {
512            log_dir: Some(log_dir),
513            log_dir_source: LogDirSource::EnvVar,
514            log_enabled: true,
515            prompt_preview_chars: 200,
516            app_dir: Arc::new(AppDir::new(tmp_app.path().to_path_buf())),
517        };
518        std::mem::forget(tmp_app);
519        AppService::new(executor, log_config, vec![])
520    }
521
522    // ── (b) maybe_log_transcript returns Some when write fails ──────────
523
524    #[tokio::test]
525    async fn maybe_log_transcript_returns_some_on_write_failure() {
526        let tmp = tempfile::tempdir().expect("test tempdir");
527        let log_dir = tmp.path().to_path_buf();
528        // Block write by creating a directory at the session file path.
529        std::fs::create_dir_all(log_dir.join("fail-session.json"))
530            .expect("pre-create dir to block write");
531        let svc = make_app_service_with_log_dir(log_dir).await;
532        let metrics = make_metrics_with_transcript();
533        let result = make_finished_result(metrics);
534        let warning = svc.maybe_log_transcript(&result, "fail-session");
535        assert!(warning.is_some(), "expected Some warning on write failure");
536        let msg = warning.unwrap();
537        assert!(
538            msg.contains("transcript"),
539            "warning should mention 'transcript', got: {msg}"
540        );
541    }
542
543    #[tokio::test]
544    async fn maybe_log_transcript_returns_none_on_non_finished() {
545        let tmp = tempfile::tempdir().expect("test tempdir");
546        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
547        let result = FeedResult::Accepted { remaining: 1 };
548        let warning = svc.maybe_log_transcript(&result, "any-session");
549        assert!(warning.is_none(), "Accepted result should return None");
550    }
551
552    // ── (c) splice_transcript_warning inserts field into JSON ───────────
553
554    #[test]
555    fn splice_transcript_warning_injects_field_when_some() {
556        let json = r#"{"status":"finished","result":{}}"#;
557        let out = splice_transcript_warning(json, Some("write failed".to_string()));
558        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
559        assert_eq!(
560            v["transcript_warning"].as_str(),
561            Some("write failed"),
562            "transcript_warning field should be present"
563        );
564        // Original fields are preserved.
565        assert_eq!(v["status"].as_str(), Some("finished"));
566    }
567
568    #[test]
569    fn splice_transcript_warning_passthrough_when_none() {
570        let json = r#"{"status":"finished"}"#;
571        let out = splice_transcript_warning(json, None);
572        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
573        assert!(
574            v.get("transcript_warning").is_none(),
575            "transcript_warning must be absent when warning is None"
576        );
577    }
578
579    // ── ST6: pool registry routing tests ────────────────────────────────────
580
581    use crate::pool::PoolSessionEntry;
582
583    /// T1: continue_single falls through to in-MCP path when session is not in pool registry.
584    ///
585    /// An unknown session ID should not be found in the pool registry and
586    /// should reach the `SessionRegistry::feed_response` path, which returns
587    /// an error because no session exists in the in-memory registry either.
588    #[tokio::test]
589    async fn continue_single_in_mcp_path_on_registry_miss() {
590        let tmp = tempfile::tempdir().expect("test tempdir");
591        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
592
593        // The pool registry on disk and in-memory is empty (no workers registered).
594        // continue_single should fall through to the in-MCP path and return
595        // "not found" because the in-memory session registry also has nothing.
596        let result = svc
597            .continue_single(
598                "nonexistent-session-id",
599                "some response".to_string(),
600                None,
601                None,
602            )
603            .await;
604        assert!(
605            result.is_err(),
606            "unknown session must return Err on in-MCP path"
607        );
608        let msg = result.unwrap_err();
609        assert!(
610            msg.contains("not found") || msg.contains("Continue failed"),
611            "error must indicate session not found, got: {msg}"
612        );
613    }
614
615    /// T2: AppService::new initialises pool_registry as empty when pool dir absent.
616    ///
617    /// Verifies that startup GC with a missing registry.json (normal first-run)
618    /// produces an empty PoolRegistry (not an error).
619    #[tokio::test]
620    async fn app_service_new_initialises_empty_pool_registry() {
621        let tmp = tempfile::tempdir().expect("test tempdir");
622        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
623
624        let reg = svc.pool_registry.read().await;
625        assert!(
626            reg.sessions.is_empty(),
627            "pool registry must be empty on first-run (no registry.json)"
628        );
629    }
630
631    /// T2b: AppService correctly stores pool registry paths derived from app_dir.
632    ///
633    /// Verifies that pool_dir / pool_reg_path / pool_lock_path are
634    /// non-empty paths derived from state_dir/pool/*.
635    #[tokio::test]
636    async fn app_service_pool_paths_correctly_derived() {
637        let tmp = tempfile::tempdir().expect("test tempdir");
638        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
639
640        assert!(
641            svc.pool_dir.ends_with("pool"),
642            "pool_dir must end in 'pool', got: {}",
643            svc.pool_dir.display()
644        );
645        assert!(
646            svc.pool_reg_path.ends_with("pool/registry.json"),
647            "pool_reg_path must end in 'pool/registry.json', got: {}",
648            svc.pool_reg_path.display()
649        );
650        assert!(
651            svc.pool_lock_path.ends_with("pool/registry.lock"),
652            "pool_lock_path must end in 'pool/registry.lock', got: {}",
653            svc.pool_lock_path.display()
654        );
655    }
656
657    /// T3: continue_single propagates PoolError::RegistryCorrupted to MCP wire.
658    ///
659    /// When registry.json is corrupt and there is a cache miss, continue_single
660    /// must return Err (not silently proceed with empty registry). This verifies
661    /// the CLAUDE.md §Error 伝播規律 invariant — no unwrap_or_default() swallowing.
662    #[tokio::test]
663    async fn continue_single_propagates_corrupted_registry_error() {
664        let tmp = tempfile::tempdir().expect("test tempdir");
665        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
666
667        // Write a corrupt registry.json to the pool directory.
668        let pool_dir = svc.pool_dir.clone();
669        std::fs::create_dir_all(&pool_dir).expect("create pool dir");
670        std::fs::write(pool_dir.join("registry.json"), b"{ not valid json !!!")
671            .expect("write corrupt registry");
672
673        // The in-memory cache is empty (startup GC failed on the corrupt file,
674        // so pool_registry is empty default).  The disk read in continue_single
675        // will hit the corrupt file and must propagate the error.
676        let result = svc
677            .continue_single("any-session-id", "response".to_string(), None, None)
678            .await;
679        assert!(
680            result.is_err(),
681            "corrupted registry must cause Err, not silent empty fallback"
682        );
683        let msg = result.unwrap_err();
684        assert!(
685            msg.contains("corrupted") || msg.contains("parse") || msg.contains("Continue failed"),
686            "error must mention registry problem, got: {msg}"
687        );
688    }
689
690    // ── pool_cache_reload_warning splice tests ───────────────────────────────
691
692    use super::super::eval_store::splice_response_string;
693
694    /// T1 (happy path): splice_response_string inserts pool_cache_reload_warning
695    /// into a valid JSON object response.
696    ///
697    /// Verifies the crux-card constraint: cache-reload failure must surface on
698    /// the MCP wire as an additive field, not remain warn!-only.
699    #[test]
700    fn splice_response_string_injects_cache_reload_warning() {
701        let json = r#"{"status":"finished","result":{"ok":true}}"#;
702        let msg = "failed to reload pool registry: No such file or directory";
703        let out = splice_response_string(json, "pool_cache_reload_warning", msg);
704        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
705        assert_eq!(
706            v["pool_cache_reload_warning"].as_str(),
707            Some(msg),
708            "pool_cache_reload_warning must be present in response"
709        );
710        // Original fields preserved (additive, not destructive).
711        assert_eq!(v["status"].as_str(), Some("finished"));
712    }
713
714    /// T2 (edge case): splice_response_string is a no-op when input is not a
715    /// JSON object (e.g. bare string or array).
716    ///
717    /// Guards against panics when strategy output is malformed.
718    #[test]
719    fn splice_response_string_passthrough_on_non_object_json() {
720        let non_object = r#""just a string""#;
721        let out = splice_response_string(non_object, "pool_cache_reload_warning", "err");
722        // Must return original unchanged.
723        assert_eq!(out, non_object);
724    }
725
726    /// T3 (error path / None branch): when cache_reload_warning is None, the
727    /// pool_cache_reload_warning field must NOT appear in the response JSON.
728    ///
729    /// Verifies the None arm of the new match block in run.rs leaves the JSON
730    /// untouched, consistent with the pool_save_error pattern.
731    #[test]
732    fn splice_response_string_not_called_when_none() {
733        let json = r#"{"status":"finished"}"#;
734        // Simulate the None branch: we simply do not call splice_response_string.
735        let v: serde_json::Value = serde_json::from_str(json).expect("valid JSON");
736        assert!(
737            v.get("pool_cache_reload_warning").is_none(),
738            "pool_cache_reload_warning must be absent when no cache-reload error occurred"
739        );
740    }
741
742    /// T1b: in-memory pool registry lookup finds an entry and routes to pool path.
743    ///
744    /// Inserts a live entry (current process PID) into the in-memory registry
745    /// and verifies that continue_single attempts the pool path (fails with
746    /// connection error because no real worker socket exists, not "not found").
747    #[tokio::test]
748    async fn continue_single_routes_to_pool_on_registry_hit() {
749        let tmp = tempfile::tempdir().expect("test tempdir");
750        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
751
752        // Insert a fake entry pointing to a non-existent socket.
753        // This simulates the case where a pool session was started.
754        let fake_sock = tmp.path().join("nonexistent.sock");
755        let entry = PoolSessionEntry::new(
756            "test-pool-session",
757            std::process::id(), // live PID — survives GC
758            fake_sock.clone(),
759            env!("CARGO_PKG_VERSION"),
760        );
761        {
762            let mut reg = svc.pool_registry.write().await;
763            reg.add(entry);
764        }
765
766        // continue_single should find the entry and attempt pool path.
767        // The UDS connect will fail (no socket file) → PoolError::Connect → Err.
768        // Importantly, the error is a connection error, NOT a "session not found" error.
769        let result = svc
770            .continue_single("test-pool-session", "response".to_string(), None, None)
771            .await;
772        assert!(
773            result.is_err(),
774            "pool path must fail with connect error (no real worker)"
775        );
776        let msg = result.unwrap_err();
777        // The error must come from pool path (UDS connect), not from SessionRegistry.
778        // "not found" would indicate the in-MCP path was taken instead.
779        assert!(
780            !msg.contains("session not found") || msg.contains("Continue failed"),
781            "error must be from pool path (UDS connect), got: {msg}"
782        );
783    }
784}