Skip to main content

algocline_app/service/
run.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use algocline_core::QueryId;
5use algocline_engine::{FeedResult, VariantPkg};
6
7use super::alc_toml::load_alc_toml;
8use super::eval_store::{splice_response_string, splice_response_warnings};
9use super::resolve::{is_package_installed, make_require_code, resolve_code, QueryResponse};
10use super::transcript::write_transcript_log;
11use super::AppService;
12use crate::pool::dispatch::{continue_via_pool, run_via_pool};
13
14/// Recover from MCP clients that JSON-stringify an object-typed field
15/// before sending. The MCP `inputSchema` for `ctx` / `opts` /
16/// `strategy_opts` declares `type: object`, but some clients send a
17/// JSON-encoded string instead. Without this normalisation the value
18/// reaches Lua as a string and breaks pkgs that require a table
19/// (issue 1778656404-63015).
20///
21/// Only `Value::String` payloads that parse into a JSON object or
22/// array are replaced; ordinary strings and primitive scalars pass
23/// through untouched.
24pub(crate) fn normalize_stringified_json_object(v: serde_json::Value) -> serde_json::Value {
25    match v {
26        serde_json::Value::String(ref s) => match serde_json::from_str::<serde_json::Value>(s) {
27            Ok(parsed @ serde_json::Value::Object(_)) => parsed,
28            Ok(parsed @ serde_json::Value::Array(_)) => parsed,
29            _ => v,
30        },
31        other => other,
32    }
33}
34
35/// Splice `save_warning` into the JSON `result` when the optional
36/// warning is `Some(_)`. Returns the original string unchanged when
37/// there is no warning.
38fn splice_save_warning(result_json: &str, warning: Option<String>) -> String {
39    match warning {
40        Some(msg) => splice_response_string(result_json, "save_warning", &msg),
41        None => result_json.to_string(),
42    }
43}
44
45/// Splice `transcript_warning` into the JSON `result` when the optional
46/// warning is `Some(_)`. Returns the original string unchanged when
47/// there is no warning.
48fn splice_transcript_warning(result_json: &str, warning: Option<String>) -> String {
49    match warning {
50        Some(msg) => splice_response_string(result_json, "transcript_warning", &msg),
51        None => result_json.to_string(),
52    }
53}
54
55/// Build a frozen env snapshot from up to three sources and an optional allowlist.
56///
57/// # Sources (applied in priority order, lower overwritten by higher)
58///
59/// 1. **OS environment** (`ctx.env.allow_os = true` only) — `std::env::vars()` snapshot.
60/// 2. **dotenv file** (`ctx.env.dotenv`) — parsed via `dotenvy`; keys overwrite OS layer.
61/// 3. **inject** (`ctx.env.inject`) — explicit key/value map; highest priority, overwrites all.
62///
63/// # Arguments
64///
65/// - `ctx` — the full `alc_run` context value; `ctx["env"]` is extracted here.
66/// - `project_root` — required when `ctx.env.dotenv` is a relative path.
67///   `None` with a relative dotenv path is an error (avoids CWD ambiguity).
68/// - `alc_toml_allow` — optional allowlist from `alc.toml [env].allow`.
69///   When `Some`, the merged map is filtered to only keys present in the list.
70///   `None` means no filtering (all resolved keys are kept).
71///
72/// # Errors
73///
74/// Returns `Err(String)` for:
75/// - `ctx.env.inject` values that are not JSON strings
76/// - `ctx.env.dotenv` is a relative path but `project_root` is `None`
77/// - `dotenvy` I/O error opening the dotenv file
78/// - `dotenvy` parse error for any entry in the dotenv file
79pub(super) fn resolve_env(
80    ctx: &serde_json::Value,
81    project_root: Option<&std::path::Path>,
82    alc_toml_allow: Option<&[String]>,
83) -> Result<Arc<HashMap<String, String>>, String> {
84    let env_obj = ctx.get("env").and_then(|v| v.as_object());
85
86    // ── Layer 3 (highest): inject ──────────────────────────────────────────────
87    let inject: HashMap<String, String> = if let Some(obj) = env_obj {
88        if let Some(inject_val) = obj.get("inject") {
89            match inject_val.as_object() {
90                Some(m) => {
91                    let mut map = HashMap::new();
92                    for (k, v) in m {
93                        match v.as_str() {
94                            Some(s) => {
95                                map.insert(k.clone(), s.to_string());
96                            }
97                            None => {
98                                return Err(format!(
99                                    "ctx.env.inject: value for key '{k}' must be a string, got {v}"
100                                ));
101                            }
102                        }
103                    }
104                    map
105                }
106                None => {
107                    return Err(format!(
108                        "ctx.env.inject must be an object, got {}",
109                        inject_val
110                    ));
111                }
112            }
113        } else {
114            HashMap::new()
115        }
116    } else {
117        HashMap::new()
118    };
119
120    // Resolved dotenv path (if any)
121    let dotenv_path: Option<std::path::PathBuf> = if let Some(p) = env_obj
122        .and_then(|o| o.get("dotenv"))
123        .and_then(|v| v.as_str())
124    {
125        let path = std::path::Path::new(p);
126        if path.is_absolute() {
127            Some(path.to_path_buf())
128        } else {
129            match project_root {
130                Some(root) => Some(root.join(p)),
131                None => {
132                    return Err(format!(
133                        "ctx.env.dotenv: relative path '{p}' requires project_root to be set"
134                    ));
135                }
136            }
137        }
138    } else {
139        None
140    };
141
142    let allow_os = env_obj
143        .and_then(|o| o.get("allow_os"))
144        .and_then(|v| v.as_bool())
145        .unwrap_or(false);
146
147    let mut merged: HashMap<String, String> = HashMap::new();
148
149    // ── Layer 1 (lowest): OS environment ──────────────────────────────────────
150    if allow_os {
151        for (k, v) in std::env::vars() {
152            merged.insert(k, v);
153        }
154    }
155
156    // ── Layer 2: dotenv file ───────────────────────────────────────────────────
157    if let Some(ref full) = dotenv_path {
158        let iter = dotenvy::from_path_iter(full)
159            .map_err(|e| format!("ctx.env.dotenv: failed to open '{}': {e}", full.display()))?;
160        for item in iter {
161            let (k, v) = item
162                .map_err(|e| format!("ctx.env.dotenv: parse error in '{}': {e}", full.display()))?;
163            merged.insert(k, v);
164        }
165    }
166
167    // ── Layer 3: inject (overwrite, highest priority) ──────────────────────────
168    for (k, v) in inject {
169        merged.insert(k, v);
170    }
171
172    // ── Optional allowlist filter (alc.toml [env].allow) ──────────────────────
173    if let Some(allow) = alc_toml_allow {
174        if !allow.is_empty() {
175            let allowset: std::collections::HashSet<&String> = allow.iter().collect();
176            merged.retain(|k, _| allowset.contains(k));
177        }
178    }
179
180    Ok(Arc::new(merged))
181}
182
183impl AppService {
184    /// Execute Lua code with optional JSON context.
185    ///
186    /// When `host_mode: Some(true)` is passed, the call is proxied via
187    /// `PoolClient` to a long-lived worker subprocess over a Unix domain socket.
188    /// When `host_mode` is `None` or `Some(false)` the existing in-process
189    /// `Executor::start_session` path is used unchanged.
190    ///
191    /// # Concurrency
192    ///
193    /// **host_mode=false (default)**: No additional locking beyond `SessionRegistry`
194    /// lock C. `AppService` itself holds no long-lived lock during this call.
195    ///
196    /// **host_mode=true**: Acquires `RwLock<PoolRegistry>` (write) and advisory
197    /// `fs4::FileExt::lock_exclusive` to update `registry.json`. These locks are
198    /// **not** held across the UDS round-trip await.
199    ///
200    /// **Cancel safety**: cancelling this `.await` mid-UDS-request leaves the
201    /// worker subprocess running. The registry entry persists; callers can
202    /// reconnect via `alc_continue` after MCP restart.
203    pub async fn run(
204        &self,
205        code: Option<String>,
206        code_file: Option<String>,
207        ctx: Option<serde_json::Value>,
208        project_root: Option<String>,
209        host_mode: Option<bool>,
210    ) -> Result<String, String> {
211        let code = resolve_code(code, code_file)?;
212        let ctx = normalize_stringified_json_object(ctx.unwrap_or(serde_json::Value::Null));
213        let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
214        let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
215        let mut warnings: Vec<String> = extra_warnings;
216        warnings.extend(variant_warnings);
217
218        if host_mode == Some(true) {
219            // ── Pool path (Crux: MCP thin proxy IPC boundary) ─────────────────
220            // Worker subprocess is spawned and communicated via UDS.
221            // SessionRegistry (in-memory) is NOT touched on this path.
222            let (session_id, json, pool_save_error) = run_via_pool(
223                &self.pool_dir,
224                &self.pool_reg_path,
225                &self.pool_lock_path,
226                extra,
227                code,
228                ctx,
229            )
230            .await
231            .map_err(|e| e.to_string())?;
232
233            // session_id is stored in the JSON by the worker; update the
234            // in-memory registry so this MCP instance can route continues
235            // without another disk read.
236            // Load the just-persisted entry from disk to keep in-memory
237            // registry in sync.  This is a best-effort convenience cache;
238            // the disk state is authoritative.  Failure is surfaced to the
239            // MCP wire response as `pool_cache_reload_warning` so the caller
240            // can observe stale-cache conditions; tracing::warn! is kept for
241            // operator visibility in logs.
242            let cache_reload_warning: Option<String> =
243                match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
244                    Ok(reg) => {
245                        let mut guard = self.pool_registry.write().await;
246                        *guard = reg;
247                        None
248                    }
249                    Err(e) => {
250                        tracing::warn!(
251                            error = %e,
252                            "failed to reload pool registry after run; in-memory cache may be stale"
253                        );
254                        Some(e.to_string())
255                    }
256                };
257
258            let json = splice_response_warnings(&json, "lib_path_warnings", &warnings);
259            let json = match pool_save_error {
260                Some(msg) => splice_response_string(&json, "pool_save_error", &msg),
261                None => json,
262            };
263            let json = match cache_reload_warning {
264                Some(msg) => splice_response_string(&json, "pool_cache_reload_warning", &msg),
265                None => json,
266            };
267            let _ = session_id; // session_id is embedded in the JSON response
268            return Ok(json);
269        }
270
271        // ── In-process path (default) ──────────────────────────────────────────
272
273        // Build the frozen env snapshot at alc_run invocation time (TIME boundary).
274        // Load alc.toml to get the optional env.allow allowlist.
275        let alc_toml_allow_list: Vec<String> = if let Some(root) = project_root.as_deref() {
276            let root_path = std::path::Path::new(root);
277            match load_alc_toml(root_path) {
278                Ok(Some(t)) => t.env.map(|e| e.allow).unwrap_or_default(),
279                Ok(None) => Vec::new(),
280                Err(e) => return Err(format!("alc.toml load error: {e}")),
281            }
282        } else {
283            Vec::new()
284        };
285        let alc_toml_allow = if alc_toml_allow_list.is_empty() {
286            None
287        } else {
288            Some(alc_toml_allow_list.as_slice())
289        };
290
291        let project_root_path = project_root.as_deref().map(std::path::Path::new);
292        let env_map = resolve_env(&ctx, project_root_path, alc_toml_allow)?;
293
294        let json = self
295            .start_and_tick(env_map, code, ctx, None, extra, variants)
296            .await?;
297        Ok(splice_response_warnings(
298            &json,
299            "lib_path_warnings",
300            &warnings,
301        ))
302    }
303
304    /// Apply a built-in strategy to a task.
305    ///
306    /// If the requested package is not installed, automatically installs the
307    /// bundled package collection from GitHub before executing.
308    ///
309    /// `project_root` — optional absolute path to the project root containing
310    /// `alc.lock`. Falls back to `ALC_PROJECT_ROOT` env or ancestor walk.
311    pub async fn advice(
312        &self,
313        strategy: &str,
314        task: Option<String>,
315        opts: Option<serde_json::Value>,
316        project_root: Option<String>,
317    ) -> Result<String, String> {
318        // Auto-install bundled packages if the requested strategy is missing
319        let app_dir = self.log_config.app_dir();
320        if !is_package_installed(&app_dir, strategy) {
321            self.auto_install_bundled_packages().await?;
322            if !is_package_installed(&app_dir, strategy) {
323                return Err(format!(
324                    "Package '{strategy}' not found after installing bundled collection. \
325                     Use alc_pkg_install to install it manually."
326                ));
327            }
328        }
329
330        let code = make_require_code(strategy);
331
332        let opts = opts.map(normalize_stringified_json_object);
333        let mut ctx_map = match opts {
334            Some(serde_json::Value::Object(m)) => m,
335            _ => serde_json::Map::new(),
336        };
337        if let Some(task) = task {
338            ctx_map.insert("task".into(), serde_json::Value::String(task));
339        }
340        let ctx = serde_json::Value::Object(ctx_map);
341
342        let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
343        let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
344        let mut warnings: Vec<String> = extra_warnings;
345        warnings.extend(variant_warnings);
346        // advice() does not accept ctx.env; pass an empty map so AlcEnv is
347        // present but empty (no env vars visible to advice strategies).
348        let env_map = Arc::new(HashMap::new());
349        let json = self
350            .start_and_tick(env_map, code, ctx, Some(strategy), extra, variants)
351            .await?;
352        Ok(splice_response_warnings(
353            &json,
354            "lib_path_warnings",
355            &warnings,
356        ))
357    }
358
359    /// Continue a paused execution — batch feed.
360    ///
361    /// For pool sessions (`session_id` found in registry.json), each response
362    /// in the batch is forwarded to the worker via `PoolClient::send_request`.
363    /// For in-MCP sessions, the existing `SessionRegistry::feed_response` path
364    /// is used unchanged.
365    pub async fn continue_batch(
366        &self,
367        session_id: &str,
368        responses: Vec<QueryResponse>,
369    ) -> Result<String, String> {
370        // ── Pool path check (same registry lookup as continue_single) ─────────
371        let pool_entry = {
372            let reg = self.pool_registry.read().await;
373            reg.find(session_id).cloned()
374        };
375
376        let pool_entry = if pool_entry.is_some() {
377            pool_entry
378        } else {
379            match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
380                Ok(reg) => {
381                    let entry = reg.find(session_id).cloned();
382                    if entry.is_some() {
383                        let mut guard = self.pool_registry.write().await;
384                        *guard = reg;
385                    }
386                    entry
387                }
388                Err(e) => {
389                    return Err(format!("Continue failed: {e}"));
390                }
391            }
392        };
393
394        if let Some(entry) = pool_entry {
395            // ── Pool routing ────────────────────────────────────────────────────
396            let mut last_json = None;
397            for qr in responses {
398                let json =
399                    continue_via_pool(&entry, session_id, qr.response, Some(qr.query_id), qr.usage)
400                        .await
401                        .map_err(|e| format!("Continue failed: {e}"))?;
402                last_json = Some(json);
403            }
404            return last_json.ok_or_else(|| "Empty responses array".to_string());
405        }
406
407        // ── In-MCP path ────────────────────────────────────────────────────────
408        let mut last_result = None;
409        for qr in responses {
410            let qid = QueryId::parse(&qr.query_id);
411            let result = self
412                .registry
413                .feed_response(session_id, &qid, qr.response, qr.usage.as_ref())
414                .await
415                .map_err(|e| format!("Continue failed: {e}"))?;
416            last_result = Some(result);
417        }
418        let result = last_result.ok_or("Empty responses array")?;
419        let transcript_warning = self.maybe_log_transcript(&result, session_id);
420        let json = result.to_json(session_id).to_string();
421        let json = splice_transcript_warning(&json, transcript_warning);
422        let save_warning = self.maybe_save_eval(&result, session_id, &json);
423        Ok(splice_save_warning(&json, save_warning))
424    }
425
426    /// Continue a paused execution — single response (with optional query_id).
427    ///
428    /// Routing is automatic: if `session_id` is found in `registry.json`
429    /// (pool path), the call is proxied via `PoolClient` over UDS. If not
430    /// found (in-MCP path), the existing `SessionRegistry::feed_response`
431    /// is used. Both paths never coexist for the same `session_id`.
432    ///
433    /// # Concurrency
434    ///
435    /// **Pool path**: acquires `RwLock<PoolRegistry>` (read) to look up the
436    /// session entry, then acquires `tokio::sync::Mutex` inside `PoolClient`
437    /// to serialize the UDS write. Neither lock is held across the UDS await.
438    ///
439    /// **In-MCP path**: acquires lock C in the two-phase pattern documented on
440    /// `SessionRegistry::feed_response`.
441    ///
442    /// **Cancel safety**: cancelling mid-await on the pool path leaves the
443    /// worker subprocess running (UDS send may have been partially written;
444    /// `read_line` is not cancel-safe — a partial line in the buffer renders
445    /// the connection unusable and `PoolClient` must reconnect).
446    pub async fn continue_single(
447        &self,
448        session_id: &str,
449        response: String,
450        query_id: Option<&str>,
451        usage: Option<algocline_core::TokenUsage>,
452    ) -> Result<String, String> {
453        // ── Pool path: check in-memory registry, then disk registry ───────────
454        // K-4: acquire read lock, clone the entry, release lock BEFORE await.
455        let pool_entry = {
456            let reg = self.pool_registry.read().await;
457            reg.find(session_id).cloned()
458        }; // read lock released here
459
460        // If in-memory cache missed, check disk (e.g. after MCP restart).
461        let pool_entry = if pool_entry.is_some() {
462            pool_entry
463        } else {
464            match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
465                Ok(reg) => {
466                    let entry = reg.find(session_id).cloned();
467                    if entry.is_some() {
468                        // Warm the in-memory cache.
469                        let mut guard = self.pool_registry.write().await;
470                        *guard = reg;
471                    }
472                    entry
473                }
474                Err(e) => {
475                    // Corrupt registry: propagate to MCP wire per §Error 伝播規律.
476                    return Err(format!("Continue failed: {e}"));
477                }
478            }
479        };
480
481        if let Some(entry) = pool_entry {
482            // ── Pool routing (Crux: MCP thin proxy IPC boundary) ──────────────
483            let json = continue_via_pool(
484                &entry,
485                session_id,
486                response,
487                query_id.map(str::to_string),
488                usage,
489            )
490            .await
491            .map_err(|e| format!("Continue failed: {e}"))?;
492            return Ok(json);
493        }
494
495        // ── In-MCP path ────────────────────────────────────────────────────────
496        let query_id = match query_id {
497            Some(qid) => QueryId::parse(qid),
498            None => self
499                .registry
500                .resolve_sole_pending_id(session_id)
501                .await
502                .map_err(|e| format!("Continue failed: {e}"))?,
503        };
504
505        let result = self
506            .registry
507            .feed_response(session_id, &query_id, response, usage.as_ref())
508            .await
509            .map_err(|e| format!("Continue failed: {e}"))?;
510
511        let transcript_warning = self.maybe_log_transcript(&result, session_id);
512        let json = result.to_json(session_id).to_string();
513        let json = splice_transcript_warning(&json, transcript_warning);
514        let save_warning = self.maybe_save_eval(&result, session_id, &json);
515        Ok(splice_save_warning(&json, save_warning))
516    }
517
518    // ─── Internal ───────────────────────────────────────────────
519
520    pub(super) fn maybe_log_transcript(
521        &self,
522        result: &FeedResult,
523        session_id: &str,
524    ) -> Option<String> {
525        if let FeedResult::Finished(exec_result) = result {
526            // Mutex poison means a previous thread panicked while holding the lock.
527            // Strategy name is non-critical for correctness, but the failure must be
528            // surfaced to MCP callers so it is observable, not silently dropped.
529            // See CLAUDE.md §Service 層の Error 伝播規律.
530            let strategy = match self.session_strategies.lock() {
531                Ok(mut map) => map.remove(session_id),
532                Err(e) => {
533                    tracing::warn!(
534                        "session_strategies mutex poisoned for '{}': {}",
535                        session_id,
536                        e
537                    );
538                    // Return warning immediately; transcript cannot be written
539                    // without strategy context being reliably recoverable.
540                    return Some(format!(
541                        "session_strategies mutex poisoned for '{session_id}': {e}"
542                    ));
543                }
544            };
545            // write_transcript_log returns Ok(Some(warning)) when meta write
546            // failed but the main log succeeded, so both Err and meta warning
547            // are surfaced as transcript_warning on the wire response.
548            match write_transcript_log(
549                &self.log_config,
550                session_id,
551                &exec_result.metrics,
552                strategy.as_deref(),
553            ) {
554                Err(e) => Some(e.to_string()),
555                Ok(meta_warning) => meta_warning,
556            }
557        } else {
558            None
559        }
560    }
561
562    /// Persist eval result for a finished session, returning any storage
563    /// failure as `Some(msg)` so the caller can surface it on the wire
564    /// response. `None` covers both "not an eval session" and
565    /// "successfully saved" — they are indistinguishable to the caller
566    /// because both produce the same wire shape.
567    pub(super) fn maybe_save_eval(
568        &self,
569        result: &FeedResult,
570        session_id: &str,
571        result_json: &str,
572    ) -> Option<String> {
573        if !matches!(result, FeedResult::Finished(_)) {
574            return None;
575        }
576        let strategy = {
577            let mut map = self.eval_sessions.lock().unwrap_or_else(|e| e.into_inner());
578            map.remove(session_id)
579        };
580        strategy.and_then(|s| {
581            super::eval_store::save_eval_result(&self.log_config.app_dir(), &s, result_json).err()
582        })
583    }
584
585    /// Start a Lua session with the given env snapshot and tick until the first
586    /// pause or completion.
587    ///
588    /// # Arguments
589    ///
590    /// - `env_map` — frozen env snapshot built by `resolve_env`; passed to
591    ///   `executor.start_session_with_env` so `alc.env` is populated before any
592    ///   Lua code runs (TIME boundary: snapshot is taken before this call).
593    /// - `code` — Lua source to execute.
594    /// - `ctx` — JSON context accessible as `ctx` global in the Lua VM.
595    /// - `strategy` — optional strategy name (used to correlate eval sessions).
596    /// - `extra_lib_paths` — additional `require` search paths.
597    /// - `variant_pkgs` — variant package overrides.
598    ///
599    /// # Errors
600    ///
601    /// Returns `Err(String)` if session spawn or initial execution fails.
602    pub(super) async fn start_and_tick(
603        &self,
604        env_map: Arc<HashMap<String, String>>,
605        code: String,
606        ctx: serde_json::Value,
607        strategy: Option<&str>,
608        extra_lib_paths: Vec<std::path::PathBuf>,
609        variant_pkgs: Vec<VariantPkg>,
610    ) -> Result<String, String> {
611        let scenarios_dir = self.log_config.app_dir().scenarios_dir();
612        let session = self
613            .executor
614            .start_session_with_env(
615                env_map,
616                code,
617                ctx,
618                extra_lib_paths,
619                variant_pkgs,
620                Arc::clone(&self.state_store),
621                Arc::clone(&self.card_store),
622                scenarios_dir,
623            )
624            .await?;
625        let (session_id, result) = self
626            .registry
627            .start_execution(session)
628            .await
629            .map_err(|e| format!("Execution failed: {e}"))?;
630        if let Some(s) = strategy {
631            if let Ok(mut map) = self.session_strategies.lock() {
632                map.insert(session_id.clone(), s.to_string());
633            }
634        }
635        let transcript_warning = self.maybe_log_transcript(&result, &session_id);
636        let json = result.to_json(&session_id).to_string();
637        Ok(splice_transcript_warning(&json, transcript_warning))
638    }
639}
640
641#[cfg(test)]
642mod tests {
643    use std::path::PathBuf;
644    use std::sync::Arc;
645
646    use algocline_core::{
647        AppDir, ExecutionMetrics, ExecutionObserver, LlmQuery, QueryId, TerminalState,
648    };
649    use algocline_engine::{ExecutionResult, FeedResult};
650
651    use super::super::config::{AppConfig, LogDirSource};
652    use super::{splice_transcript_warning, AppService};
653
654    fn make_metrics_with_transcript() -> ExecutionMetrics {
655        let metrics = ExecutionMetrics::new();
656        let observer = metrics.create_observer();
657        observer.on_paused(&[LlmQuery {
658            id: QueryId::single(),
659            prompt: "test prompt".into(),
660            system: None,
661            max_tokens: 100,
662            grounded: false,
663            underspecified: false,
664        }]);
665        metrics
666    }
667
668    fn make_finished_result(metrics: ExecutionMetrics) -> FeedResult {
669        FeedResult::Finished(ExecutionResult {
670            state: TerminalState::Completed {
671                result: serde_json::json!({"ok": true}),
672            },
673            metrics,
674        })
675    }
676
677    /// Build a minimal AppService with log_enabled and a custom log_dir.
678    async fn make_app_service_with_log_dir(log_dir: PathBuf) -> AppService {
679        let executor = Arc::new(
680            algocline_engine::Executor::new(vec![])
681                .await
682                .expect("executor"),
683        );
684        let tmp_app = tempfile::tempdir().expect("test tempdir");
685        let log_config = AppConfig {
686            log_dir: Some(log_dir),
687            log_dir_source: LogDirSource::EnvVar,
688            log_enabled: true,
689            prompt_preview_chars: 200,
690            app_dir: Arc::new(AppDir::new(tmp_app.path().to_path_buf())),
691        };
692        std::mem::forget(tmp_app);
693        AppService::new(executor, log_config, vec![])
694    }
695
696    // ── (b) maybe_log_transcript returns Some when write fails ──────────
697
698    #[tokio::test]
699    async fn maybe_log_transcript_returns_some_on_write_failure() {
700        let tmp = tempfile::tempdir().expect("test tempdir");
701        let log_dir = tmp.path().to_path_buf();
702        // Block write by creating a directory at the session file path.
703        std::fs::create_dir_all(log_dir.join("fail-session.json"))
704            .expect("pre-create dir to block write");
705        let svc = make_app_service_with_log_dir(log_dir).await;
706        let metrics = make_metrics_with_transcript();
707        let result = make_finished_result(metrics);
708        let warning = svc.maybe_log_transcript(&result, "fail-session");
709        assert!(warning.is_some(), "expected Some warning on write failure");
710        let msg = warning.unwrap();
711        assert!(
712            msg.contains("transcript"),
713            "warning should mention 'transcript', got: {msg}"
714        );
715    }
716
717    #[tokio::test]
718    async fn maybe_log_transcript_returns_none_on_non_finished() {
719        let tmp = tempfile::tempdir().expect("test tempdir");
720        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
721        let result = FeedResult::Accepted { remaining: 1 };
722        let warning = svc.maybe_log_transcript(&result, "any-session");
723        assert!(warning.is_none(), "Accepted result should return None");
724    }
725
726    // ── (c) splice_transcript_warning inserts field into JSON ───────────
727
728    #[test]
729    fn splice_transcript_warning_injects_field_when_some() {
730        let json = r#"{"status":"finished","result":{}}"#;
731        let out = splice_transcript_warning(json, Some("write failed".to_string()));
732        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
733        assert_eq!(
734            v["transcript_warning"].as_str(),
735            Some("write failed"),
736            "transcript_warning field should be present"
737        );
738        // Original fields are preserved.
739        assert_eq!(v["status"].as_str(), Some("finished"));
740    }
741
742    #[test]
743    fn splice_transcript_warning_passthrough_when_none() {
744        let json = r#"{"status":"finished"}"#;
745        let out = splice_transcript_warning(json, None);
746        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
747        assert!(
748            v.get("transcript_warning").is_none(),
749            "transcript_warning must be absent when warning is None"
750        );
751    }
752
753    // ── ST6: pool registry routing tests ────────────────────────────────────
754
755    use crate::pool::PoolSessionEntry;
756
757    /// T1: continue_single falls through to in-MCP path when session is not in pool registry.
758    ///
759    /// An unknown session ID should not be found in the pool registry and
760    /// should reach the `SessionRegistry::feed_response` path, which returns
761    /// an error because no session exists in the in-memory registry either.
762    #[tokio::test]
763    async fn continue_single_in_mcp_path_on_registry_miss() {
764        let tmp = tempfile::tempdir().expect("test tempdir");
765        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
766
767        // The pool registry on disk and in-memory is empty (no workers registered).
768        // continue_single should fall through to the in-MCP path and return
769        // "not found" because the in-memory session registry also has nothing.
770        let result = svc
771            .continue_single(
772                "nonexistent-session-id",
773                "some response".to_string(),
774                None,
775                None,
776            )
777            .await;
778        assert!(
779            result.is_err(),
780            "unknown session must return Err on in-MCP path"
781        );
782        let msg = result.unwrap_err();
783        assert!(
784            msg.contains("not found") || msg.contains("Continue failed"),
785            "error must indicate session not found, got: {msg}"
786        );
787    }
788
789    /// T2: AppService::new initialises pool_registry as empty when pool dir absent.
790    ///
791    /// Verifies that startup GC with a missing registry.json (normal first-run)
792    /// produces an empty PoolRegistry (not an error).
793    #[tokio::test]
794    async fn app_service_new_initialises_empty_pool_registry() {
795        let tmp = tempfile::tempdir().expect("test tempdir");
796        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
797
798        let reg = svc.pool_registry.read().await;
799        assert!(
800            reg.sessions.is_empty(),
801            "pool registry must be empty on first-run (no registry.json)"
802        );
803    }
804
805    /// T2b: AppService correctly stores pool registry paths derived from app_dir.
806    ///
807    /// Verifies that pool_dir / pool_reg_path / pool_lock_path are
808    /// non-empty paths derived from state_dir/pool/*.
809    #[tokio::test]
810    async fn app_service_pool_paths_correctly_derived() {
811        let tmp = tempfile::tempdir().expect("test tempdir");
812        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
813
814        assert!(
815            svc.pool_dir.ends_with("pool"),
816            "pool_dir must end in 'pool', got: {}",
817            svc.pool_dir.display()
818        );
819        assert!(
820            svc.pool_reg_path.ends_with("pool/registry.json"),
821            "pool_reg_path must end in 'pool/registry.json', got: {}",
822            svc.pool_reg_path.display()
823        );
824        assert!(
825            svc.pool_lock_path.ends_with("pool/registry.lock"),
826            "pool_lock_path must end in 'pool/registry.lock', got: {}",
827            svc.pool_lock_path.display()
828        );
829    }
830
831    /// T3: continue_single propagates PoolError::RegistryCorrupted to MCP wire.
832    ///
833    /// When registry.json is corrupt and there is a cache miss, continue_single
834    /// must return Err (not silently proceed with empty registry). This verifies
835    /// the CLAUDE.md §Error 伝播規律 invariant — no unwrap_or_default() swallowing.
836    #[tokio::test]
837    async fn continue_single_propagates_corrupted_registry_error() {
838        let tmp = tempfile::tempdir().expect("test tempdir");
839        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
840
841        // Write a corrupt registry.json to the pool directory.
842        let pool_dir = svc.pool_dir.clone();
843        std::fs::create_dir_all(&pool_dir).expect("create pool dir");
844        std::fs::write(pool_dir.join("registry.json"), b"{ not valid json !!!")
845            .expect("write corrupt registry");
846
847        // The in-memory cache is empty (startup GC failed on the corrupt file,
848        // so pool_registry is empty default).  The disk read in continue_single
849        // will hit the corrupt file and must propagate the error.
850        let result = svc
851            .continue_single("any-session-id", "response".to_string(), None, None)
852            .await;
853        assert!(
854            result.is_err(),
855            "corrupted registry must cause Err, not silent empty fallback"
856        );
857        let msg = result.unwrap_err();
858        assert!(
859            msg.contains("corrupted") || msg.contains("parse") || msg.contains("Continue failed"),
860            "error must mention registry problem, got: {msg}"
861        );
862    }
863
864    // ── pool_cache_reload_warning splice tests ───────────────────────────────
865
866    use super::super::eval_store::splice_response_string;
867
868    /// T1 (happy path): splice_response_string inserts pool_cache_reload_warning
869    /// into a valid JSON object response.
870    ///
871    /// Verifies the crux-card constraint: cache-reload failure must surface on
872    /// the MCP wire as an additive field, not remain warn!-only.
873    #[test]
874    fn splice_response_string_injects_cache_reload_warning() {
875        let json = r#"{"status":"finished","result":{"ok":true}}"#;
876        let msg = "failed to reload pool registry: No such file or directory";
877        let out = splice_response_string(json, "pool_cache_reload_warning", msg);
878        let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
879        assert_eq!(
880            v["pool_cache_reload_warning"].as_str(),
881            Some(msg),
882            "pool_cache_reload_warning must be present in response"
883        );
884        // Original fields preserved (additive, not destructive).
885        assert_eq!(v["status"].as_str(), Some("finished"));
886    }
887
888    /// T2 (edge case): splice_response_string is a no-op when input is not a
889    /// JSON object (e.g. bare string or array).
890    ///
891    /// Guards against panics when strategy output is malformed.
892    #[test]
893    fn splice_response_string_passthrough_on_non_object_json() {
894        let non_object = r#""just a string""#;
895        let out = splice_response_string(non_object, "pool_cache_reload_warning", "err");
896        // Must return original unchanged.
897        assert_eq!(out, non_object);
898    }
899
900    /// T3 (error path / None branch): when cache_reload_warning is None, the
901    /// pool_cache_reload_warning field must NOT appear in the response JSON.
902    ///
903    /// Verifies the None arm of the new match block in run.rs leaves the JSON
904    /// untouched, consistent with the pool_save_error pattern.
905    #[test]
906    fn splice_response_string_not_called_when_none() {
907        let json = r#"{"status":"finished"}"#;
908        // Simulate the None branch: we simply do not call splice_response_string.
909        let v: serde_json::Value = serde_json::from_str(json).expect("valid JSON");
910        assert!(
911            v.get("pool_cache_reload_warning").is_none(),
912            "pool_cache_reload_warning must be absent when no cache-reload error occurred"
913        );
914    }
915
916    /// T1b: in-memory pool registry lookup finds an entry and routes to pool path.
917    ///
918    /// Inserts a live entry (current process PID) into the in-memory registry
919    /// and verifies that continue_single attempts the pool path (fails with
920    /// connection error because no real worker socket exists, not "not found").
921    #[tokio::test]
922    async fn continue_single_routes_to_pool_on_registry_hit() {
923        let tmp = tempfile::tempdir().expect("test tempdir");
924        let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
925
926        // Insert a fake entry pointing to a non-existent socket.
927        // This simulates the case where a pool session was started.
928        let fake_sock = tmp.path().join("nonexistent.sock");
929        let entry = PoolSessionEntry::new(
930            "test-pool-session",
931            std::process::id(), // live PID — survives GC
932            fake_sock.clone(),
933            env!("CARGO_PKG_VERSION"),
934        );
935        {
936            let mut reg = svc.pool_registry.write().await;
937            reg.add(entry);
938        }
939
940        // continue_single should find the entry and attempt pool path.
941        // The UDS connect will fail (no socket file) → PoolError::Connect → Err.
942        // Importantly, the error is a connection error, NOT a "session not found" error.
943        let result = svc
944            .continue_single("test-pool-session", "response".to_string(), None, None)
945            .await;
946        assert!(
947            result.is_err(),
948            "pool path must fail with connect error (no real worker)"
949        );
950        let msg = result.unwrap_err();
951        // The error must come from pool path (UDS connect), not from SessionRegistry.
952        // "not found" would indicate the in-MCP path was taken instead.
953        assert!(
954            !msg.contains("session not found") || msg.contains("Continue failed"),
955            "error must be from pool path (UDS connect), got: {msg}"
956        );
957    }
958
959    // ── resolve_env unit tests ──────────────────────────────────────────────────
960
961    use super::resolve_env;
962
963    /// T1 (happy path): inject keys are accessible in the resolved map.
964    ///
965    /// Verifies the SPACE boundary inject path: keys supplied via
966    /// `ctx.env.inject` appear in the frozen snapshot.
967    #[test]
968    fn resolve_env_inject_keys_readable() {
969        let ctx = serde_json::json!({
970            "env": {
971                "inject": { "FOO": "bar", "BAZ": "qux" }
972            }
973        });
974        let map = resolve_env(&ctx, None, None).expect("resolve_env should succeed");
975        assert_eq!(map.get("FOO").map(String::as_str), Some("bar"));
976        assert_eq!(map.get("BAZ").map(String::as_str), Some("qux"));
977    }
978
979    /// T1b (happy path): empty ctx produces an empty env map.
980    ///
981    /// Verifies that `alc.env` is always present (empty map is valid) even
982    /// when no env configuration is supplied in the invocation context.
983    #[test]
984    fn resolve_env_empty_ctx_produces_empty_map() {
985        let ctx = serde_json::Value::Null;
986        let map = resolve_env(&ctx, None, None).expect("resolve_env with null ctx should succeed");
987        assert!(map.is_empty(), "empty ctx must produce an empty env map");
988    }
989
990    /// T1c (happy path): inject priority over dotenv file for same key.
991    ///
992    /// Verifies the SOURCE boundary: inject (Layer 3) overwrites dotenv (Layer 2).
993    #[test]
994    fn resolve_env_inject_overwrites_dotenv() {
995        let tmp = tempfile::tempdir().expect("test tempdir");
996        let env_file = tmp.path().join(".env");
997        // The .env file declares PRIORITY=from_dotenv.
998        std::fs::write(&env_file, b"PRIORITY=from_dotenv\n").expect("write .env");
999
1000        let ctx = serde_json::json!({
1001            "env": {
1002                "dotenv": env_file.to_str().expect("valid path"),
1003                "inject": { "PRIORITY": "from_inject" }
1004            }
1005        });
1006        let map = resolve_env(&ctx, None, None).expect("resolve_env should succeed");
1007        // inject (higher priority) must win over dotenv.
1008        assert_eq!(
1009            map.get("PRIORITY").map(String::as_str),
1010            Some("from_inject"),
1011            "inject must shadow dotenv for the same key"
1012        );
1013    }
1014
1015    /// T1d (happy path): dotenv file keys are loaded when path is absolute.
1016    ///
1017    /// Verifies Layer 2 (dotenv) of the SOURCE boundary merge chain.
1018    #[test]
1019    fn resolve_env_dotenv_absolute_path_loaded() {
1020        let tmp = tempfile::tempdir().expect("test tempdir");
1021        let env_file = tmp.path().join(".env");
1022        std::fs::write(&env_file, b"DOTENV_KEY=dotenv_val\n").expect("write .env");
1023
1024        let ctx = serde_json::json!({
1025            "env": {
1026                "dotenv": env_file.to_str().expect("valid path")
1027            }
1028        });
1029        let map = resolve_env(&ctx, None, None).expect("resolve_env should succeed");
1030        assert_eq!(
1031            map.get("DOTENV_KEY").map(String::as_str),
1032            Some("dotenv_val"),
1033            "key from dotenv file must be accessible"
1034        );
1035    }
1036
1037    /// T1e (happy path): allowlist filter retains only listed keys.
1038    ///
1039    /// Verifies alc.toml [env].allow filtering is applied after 3-source merge.
1040    #[test]
1041    fn resolve_env_allowlist_filters_inject_keys() {
1042        let ctx = serde_json::json!({
1043            "env": {
1044                "inject": { "ALLOWED": "yes", "BLOCKED": "no" }
1045            }
1046        });
1047        let allow = vec!["ALLOWED".to_string()];
1048        let map =
1049            resolve_env(&ctx, None, Some(allow.as_slice())).expect("resolve_env should succeed");
1050        assert_eq!(map.get("ALLOWED").map(String::as_str), Some("yes"));
1051        assert!(
1052            map.get("BLOCKED").is_none(),
1053            "BLOCKED key must be excluded by allowlist"
1054        );
1055    }
1056
1057    /// T2 (boundary): allow_os=false (default) must not include any OS env vars.
1058    ///
1059    /// Verifies the SOURCE boundary: OS env is excluded unless explicitly opted in.
1060    #[test]
1061    fn resolve_env_allow_os_false_excludes_os_vars() {
1062        // PATH is nearly always set in the test environment.
1063        let ctx = serde_json::json!({ "env": { "allow_os": false } });
1064        let map = resolve_env(&ctx, None, None).expect("resolve_env should succeed");
1065        // Even if PATH is set in the OS, it must not appear in the snapshot.
1066        assert!(
1067            map.get("PATH").is_none(),
1068            "OS env must not leak when allow_os is false"
1069        );
1070    }
1071
1072    /// T2b (boundary): relative dotenv path without project_root returns Err.
1073    ///
1074    /// Verifies the plan Risks #3 decision: relative path + None project_root = Err.
1075    #[test]
1076    fn resolve_env_relative_dotenv_without_project_root_errors() {
1077        let ctx = serde_json::json!({
1078            "env": { "dotenv": ".env" }
1079        });
1080        let result = resolve_env(&ctx, None, None);
1081        assert!(
1082            result.is_err(),
1083            "relative dotenv path without project_root must return Err"
1084        );
1085        let msg = result.unwrap_err();
1086        assert!(
1087            msg.contains("project_root"),
1088            "error must mention project_root, got: {msg}"
1089        );
1090    }
1091
1092    /// T2c (boundary): relative dotenv path with project_root is resolved correctly.
1093    ///
1094    /// Verifies that a relative path is joined against project_root.
1095    #[test]
1096    fn resolve_env_relative_dotenv_with_project_root_resolved() {
1097        let tmp = tempfile::tempdir().expect("test tempdir");
1098        std::fs::write(tmp.path().join(".env"), b"REL_KEY=rel_val\n").expect("write .env");
1099
1100        let ctx = serde_json::json!({ "env": { "dotenv": ".env" } });
1101        let map = resolve_env(&ctx, Some(tmp.path()), None).expect("resolve_env should succeed");
1102        assert_eq!(
1103            map.get("REL_KEY").map(String::as_str),
1104            Some("rel_val"),
1105            "relative dotenv path must be resolved against project_root"
1106        );
1107    }
1108
1109    /// T2d (boundary): empty allowlist (None) means no filtering — all keys pass through.
1110    #[test]
1111    fn resolve_env_none_allowlist_keeps_all_inject_keys() {
1112        let ctx = serde_json::json!({
1113            "env": { "inject": { "A": "1", "B": "2" } }
1114        });
1115        let map = resolve_env(&ctx, None, None).expect("resolve_env should succeed");
1116        assert_eq!(
1117            map.len(),
1118            2,
1119            "all inject keys must be retained when allowlist is None"
1120        );
1121    }
1122
1123    /// T3 (error path): inject value that is not a string returns Err.
1124    ///
1125    /// Verifies that non-string inject values propagate as Result::Err (no silent drop).
1126    #[test]
1127    fn resolve_env_inject_non_string_value_errors() {
1128        let ctx = serde_json::json!({
1129            "env": { "inject": { "BAD": 42 } }
1130        });
1131        let result = resolve_env(&ctx, None, None);
1132        assert!(result.is_err(), "non-string inject value must return Err");
1133        let msg = result.unwrap_err();
1134        assert!(
1135            msg.contains("BAD"),
1136            "error must mention the offending key, got: {msg}"
1137        );
1138    }
1139
1140    /// T3b (error path): inject object that is not an object returns Err.
1141    #[test]
1142    fn resolve_env_inject_not_an_object_errors() {
1143        let ctx = serde_json::json!({
1144            "env": { "inject": ["not", "an", "object"] }
1145        });
1146        let result = resolve_env(&ctx, None, None);
1147        assert!(result.is_err(), "non-object inject value must return Err");
1148    }
1149
1150    /// T3c (error path): missing dotenv file propagates as Err (no silent skip).
1151    ///
1152    /// Verifies SOURCE boundary: dotenv I/O errors are surfaced, not swallowed.
1153    #[test]
1154    fn resolve_env_missing_dotenv_file_errors() {
1155        let ctx = serde_json::json!({
1156            "env": { "dotenv": "/nonexistent/path/to/.env" }
1157        });
1158        let result = resolve_env(&ctx, None, None);
1159        assert!(
1160            result.is_err(),
1161            "missing dotenv file must return Err, not empty map"
1162        );
1163        let msg = result.unwrap_err();
1164        assert!(
1165            msg.contains("dotenv"),
1166            "error must mention dotenv, got: {msg}"
1167        );
1168    }
1169}