Skip to main content

algocline_app/service/
engine_api_impl.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use algocline_core::{EngineApi, QueryResponse};
5use algocline_engine::state::{ResetReport, StateError};
6use async_trait::async_trait;
7
8use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
9
10use super::list_opts::ListOpts;
11use super::AppService;
12
13/// Delegates each [`EngineApi`] method to the corresponding `AppService`
14/// inherent method via fully-qualified syntax (`AppService::method(self, …)`).
15///
16/// This avoids ambiguity between the trait method and the inherent method
17/// of the same name, preventing accidental infinite recursion if the
18/// inherent method is ever removed or renamed.
19#[async_trait]
20impl EngineApi for AppService {
21    // ─── Core execution ──────────────────────────────────────
22
23    async fn run(
24        &self,
25        code: Option<String>,
26        code_file: Option<String>,
27        ctx: Option<serde_json::Value>,
28        project_root: Option<String>,
29        host_mode: Option<bool>,
30    ) -> Result<String, String> {
31        AppService::run(self, code, code_file, ctx, project_root, host_mode).await
32    }
33
34    async fn advice(
35        &self,
36        strategy: &str,
37        task: Option<String>,
38        opts: Option<serde_json::Value>,
39        project_root: Option<String>,
40    ) -> Result<String, String> {
41        AppService::advice(self, strategy, task, opts, project_root).await
42    }
43
44    async fn continue_single(
45        &self,
46        session_id: &str,
47        response: String,
48        query_id: Option<&str>,
49        usage: Option<algocline_core::TokenUsage>,
50    ) -> Result<String, String> {
51        AppService::continue_single(self, session_id, response, query_id, usage).await
52    }
53
54    async fn continue_batch(
55        &self,
56        session_id: &str,
57        responses: Vec<QueryResponse>,
58    ) -> Result<String, String> {
59        AppService::continue_batch(self, session_id, responses).await
60    }
61
62    // ─── Session status ──────────────────────────────────────
63
64    async fn status(
65        &self,
66        session_id: Option<&str>,
67        pending_filter: Option<serde_json::Value>,
68        include_history: bool,
69    ) -> Result<String, String> {
70        AppService::status(self, session_id, pending_filter, include_history).await
71    }
72
73    // ─── Evaluation ──────────────────────────────────────────
74
75    async fn eval(
76        &self,
77        scenario: Option<String>,
78        scenario_file: Option<String>,
79        scenario_name: Option<String>,
80        strategy: &str,
81        strategy_opts: Option<serde_json::Value>,
82        auto_card: bool,
83    ) -> Result<String, String> {
84        AppService::eval(
85            self,
86            scenario,
87            scenario_file,
88            scenario_name,
89            strategy,
90            strategy_opts,
91            auto_card,
92        )
93        .await
94    }
95
96    async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
97        AppService::eval_history(self, strategy, limit)
98    }
99
100    async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
101        AppService::eval_detail(self, eval_id)
102    }
103
104    async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
105        AppService::eval_compare(self, eval_id_a, eval_id_b).await
106    }
107
108    // ─── Scenarios ───────────────────────────────────────────
109
110    async fn scenario_list(&self) -> Result<String, String> {
111        AppService::scenario_list(self)
112    }
113
114    async fn scenario_show(&self, name: &str) -> Result<String, String> {
115        AppService::scenario_show(self, name)
116    }
117
118    async fn scenario_install(&self, url: String) -> Result<String, String> {
119        AppService::scenario_install(self, url).await
120    }
121
122    // ─── Packages ────────────────────────────────────────────
123
124    async fn pkg_link(
125        &self,
126        path: String,
127        name: Option<String>,
128        force: Option<bool>,
129        scope: Option<String>,
130        project_root: Option<String>,
131    ) -> Result<String, String> {
132        AppService::pkg_link(self, path, name, force, scope, project_root).await
133    }
134
135    async fn pkg_unlink(&self, name: String) -> Result<String, String> {
136        AppService::pkg_unlink(self, name).await
137    }
138
139    #[allow(clippy::too_many_arguments)]
140    async fn pkg_list(
141        &self,
142        project_root: Option<String>,
143        limit: Option<i32>,
144        sort: Option<String>,
145        filter: Option<serde_json::Value>,
146        fields: Option<Vec<String>>,
147        verbose: Option<String>,
148    ) -> Result<String, String> {
149        // `filter` is a free-form JSON Value at the MCP boundary (so the
150        // trait stays core-crate-pure). If the caller sends something
151        // that is not a JSON object we treat it as "no filter" and log
152        // the drop so operators can diagnose unexpected filter shapes
153        // in production.
154        let filter_map = match filter {
155            None => None,
156            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
157                Ok(map) => Some(map),
158                Err(e) => {
159                    tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
160                    None
161                }
162            },
163        };
164
165        // Negative limit values from MCP callers are clamped to 0 rather
166        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
167        // Downstream semantics: `Some(0)` means "no limit" (return all) —
168        // the truncate path in `AppService::pkg_list` short-circuits on 0.
169        let opts = ListOpts {
170            limit: limit.map(|n| n.max(0) as usize),
171            sort,
172            filter: filter_map,
173            fields,
174            verbose,
175        };
176
177        AppService::pkg_list(self, project_root, opts)
178            .await
179            .map_err(|e| e.to_string())
180    }
181
182    async fn pkg_install(
183        &self,
184        url: String,
185        name: Option<String>,
186        force: Option<bool>,
187    ) -> Result<String, String> {
188        AppService::pkg_install(self, url, name, force).await
189    }
190
191    async fn pkg_remove(
192        &self,
193        name: &str,
194        project_root: Option<String>,
195        version: Option<String>,
196        scope: Option<String>,
197    ) -> Result<String, String> {
198        AppService::pkg_remove(self, name, project_root, version, scope).await
199    }
200
201    async fn pkg_repair(
202        &self,
203        name: Option<String>,
204        project_root: Option<String>,
205    ) -> Result<String, String> {
206        AppService::pkg_repair(self, name, project_root).await
207    }
208
209    async fn pkg_doctor(
210        &self,
211        name: Option<String>,
212        project_root: Option<String>,
213    ) -> Result<String, String> {
214        AppService::pkg_doctor(self, name, project_root).await
215    }
216
217    /// Run mlua-lspec tests for a package, a single file, or inline code.
218    ///
219    /// Forwards to [`AppService::pkg_test`]. See trait doc for full contract.
220    #[allow(clippy::too_many_arguments)]
221    async fn pkg_test(
222        &self,
223        pkg: Option<String>,
224        code_file: Option<String>,
225        code: Option<String>,
226        spec_dir: Option<String>,
227        filter: Option<String>,
228        search_paths: Option<Vec<String>>,
229        project_root: Option<String>,
230        auto_search_paths: Option<bool>,
231    ) -> Result<String, String> {
232        AppService::pkg_test(
233            self,
234            pkg,
235            code_file,
236            code,
237            spec_dir,
238            filter,
239            search_paths,
240            project_root,
241            auto_search_paths,
242        )
243        .await
244    }
245
246    // ─── Logging ─────────────────────────────────────────────
247
248    async fn add_note(
249        &self,
250        session_id: &str,
251        content: &str,
252        title: Option<&str>,
253    ) -> Result<String, String> {
254        AppService::add_note(self, session_id, content, title).await
255    }
256
257    async fn log_view(
258        &self,
259        session_id: Option<&str>,
260        limit: Option<usize>,
261        max_chars: Option<usize>,
262    ) -> Result<String, String> {
263        AppService::log_view(self, session_id, limit, max_chars).await
264    }
265
266    async fn stats(
267        &self,
268        strategy_filter: Option<&str>,
269        days: Option<u64>,
270    ) -> Result<String, String> {
271        AppService::stats(self, strategy_filter, days)
272    }
273
274    // ─── Project lifecycle ────────────────────────────────────
275
276    async fn init(&self, project_root: Option<String>) -> Result<String, String> {
277        AppService::init(self, project_root).await
278    }
279
280    async fn update(&self, project_root: Option<String>) -> Result<String, String> {
281        AppService::update(self, project_root).await
282    }
283
284    async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
285        AppService::migrate(self, project_root).await
286    }
287
288    // ─── Session activation (issue #1776627475) ──────────────
289
290    async fn session_new(
291        &self,
292        project_root: Option<String>,
293        mode: Option<String>,
294    ) -> Result<String, String> {
295        let session = self.activate_session(project_root.as_deref(), mode.as_deref())?;
296        let result = serde_json::json!({
297            "session_id": session.session_id,
298            "project_root": session
299                .project_root
300                .as_ref()
301                .map(|p| p.to_string_lossy().to_string()),
302            "mode": session.mode.as_str(),
303        });
304        serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
305    }
306
307    // ─── Cards ───────────────────────────────────────────────
308
309    async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
310        AppService::card_list(self, pkg.as_deref())
311    }
312
313    async fn card_get(&self, card_id: &str) -> Result<String, String> {
314        AppService::card_get(self, card_id)
315    }
316
317    async fn card_find(
318        &self,
319        pkg: Option<String>,
320        where_: Option<serde_json::Value>,
321        order_by: Option<serde_json::Value>,
322        limit: Option<usize>,
323        offset: Option<usize>,
324    ) -> Result<String, String> {
325        AppService::card_find(self, pkg, where_, order_by, limit, offset)
326    }
327
328    async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
329        AppService::card_alias_list(self, pkg.as_deref())
330    }
331
332    async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
333        AppService::card_get_by_alias(self, name)
334    }
335
336    async fn card_alias_set(
337        &self,
338        name: &str,
339        card_id: &str,
340        pkg: Option<String>,
341        note: Option<String>,
342    ) -> Result<String, String> {
343        AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
344    }
345
346    async fn card_append(
347        &self,
348        card_id: &str,
349        fields: serde_json::Value,
350    ) -> Result<String, String> {
351        AppService::card_append(self, card_id, fields)
352    }
353
354    async fn card_install(&self, url: String) -> Result<String, String> {
355        AppService::card_install(self, url).await
356    }
357
358    async fn card_samples(
359        &self,
360        card_id: &str,
361        offset: Option<usize>,
362        limit: Option<usize>,
363        where_: Option<serde_json::Value>,
364    ) -> Result<String, String> {
365        AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
366    }
367
368    async fn card_lineage(
369        &self,
370        card_id: &str,
371        direction: Option<String>,
372        depth: Option<usize>,
373        include_stats: Option<bool>,
374        relation_filter: Option<Vec<String>>,
375    ) -> Result<String, String> {
376        AppService::card_lineage(
377            self,
378            card_id,
379            direction.as_deref(),
380            depth,
381            include_stats,
382            relation_filter,
383        )
384    }
385
386    async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
387        AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
388    }
389
390    async fn card_analyze(&self, card_id: &str, pkg: Option<String>) -> Result<String, String> {
391        AppService::card_analyze(self, card_id, pkg).await
392    }
393
394    async fn card_publish(
395        &self,
396        card_id: &str,
397        target_repo: &str,
398        commit_message: Option<&str>,
399    ) -> Result<String, String> {
400        AppService::card_publish(self, card_id, target_repo, commit_message).await
401    }
402
403    // ─── Hub ─────────────────────────────────────────────────
404
405    async fn hub_reindex(
406        &self,
407        output_path: Option<String>,
408        source_dir: Option<String>,
409    ) -> Result<String, String> {
410        self.hub_reindex(output_path.as_deref(), source_dir.as_deref())
411            .await
412    }
413
414    async fn hub_gendoc(
415        &self,
416        source_dir: String,
417        out_dir: Option<String>,
418        projections: Option<Vec<String>>,
419        config_path: Option<String>,
420        lint_strict: Option<bool>,
421    ) -> Result<String, String> {
422        let svc = self.clone();
423        tokio::task::spawn_blocking(move || {
424            crate::AppService::hub_gendoc(
425                &svc,
426                &source_dir,
427                out_dir.as_deref(),
428                projections.as_deref(),
429                config_path.as_deref(),
430                lint_strict,
431            )
432        })
433        .await
434        .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
435    }
436
437    async fn hub_dist(
438        &self,
439        source_dir: String,
440        output_path: Option<String>,
441        out_dir: Option<String>,
442        preset: Option<String>,
443        project_root: Option<String>,
444        projections: Option<Vec<String>>,
445        config_path: Option<String>,
446        lint_strict: Option<bool>,
447    ) -> Result<String, String> {
448        self.hub_dist(
449            &source_dir,
450            output_path.as_deref(),
451            out_dir.as_deref(),
452            preset.as_deref(),
453            project_root.as_deref(),
454            projections.as_deref(),
455            config_path.as_deref(),
456            lint_strict,
457        )
458        .await
459    }
460
461    async fn hub_info(&self, pkg: String) -> Result<String, String> {
462        let svc = self.clone();
463        tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
464            .await
465            .map_err(|e| format!("hub_info task panicked: {e}"))?
466    }
467
468    #[allow(clippy::too_many_arguments)]
469    async fn hub_search(
470        &self,
471        query: Option<String>,
472        category: Option<String>,
473        installed_only: Option<bool>,
474        limit: Option<i32>,
475        sort: Option<String>,
476        filter: Option<serde_json::Value>,
477        fields: Option<Vec<String>>,
478        verbose: Option<String>,
479        local_indices: Option<Vec<String>>,
480    ) -> Result<String, String> {
481        let svc = self.clone();
482
483        // `filter` is a free-form JSON Value at the MCP boundary (so the
484        // trait stays core-crate-pure). If the caller sends something
485        // that is not a JSON object we treat it as "no filter" — the
486        // explicit category/installed_only params still cover the common
487        // cases. The MCP `JsonSchema` layer will have already flagged
488        // hard type errors. We log the drop so operators can diagnose
489        // unexpected filter shapes in production.
490        let filter_map = match filter {
491            None => None,
492            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
493                Ok(map) => Some(map),
494                Err(e) => {
495                    tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
496                    None
497                }
498            },
499        };
500
501        // Negative limit values from MCP callers are clamped to 0 rather
502        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
503        // Downstream semantics: `Some(0)` means "no limit" (return all) —
504        // the truncate path in `AppService::hub_search` short-circuits on 0.
505        let opts = ListOpts {
506            limit: limit.map(|n| n.max(0) as usize),
507            sort,
508            filter: filter_map,
509            fields,
510            verbose,
511        };
512
513        tokio::task::spawn_blocking(move || {
514            AppService::hub_search(
515                &svc,
516                query.as_deref(),
517                category.as_deref(),
518                installed_only,
519                opts,
520                local_indices,
521            )
522        })
523        .await
524        .map_err(|e| format!("hub_search task panicked: {e}"))?
525    }
526
527    // ─── Package read ─────────────────────────────────────────
528
529    async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
530        AppService::pkg_read_init_lua(self, name, None)
531    }
532
533    async fn pkg_get_narrative_md(&self, name: &str) -> Result<Option<String>, String> {
534        AppService::pkg_get_narrative_md(self, name).await
535    }
536
537    async fn pkg_meta(&self, name: &str) -> Result<String, String> {
538        let filter = serde_json::json!({ "name": name });
539        let json_str = EngineApi::pkg_list(
540            self,
541            None,
542            None,
543            None,
544            Some(filter),
545            None,
546            Some("full".to_string()),
547        )
548        .await?;
549        let val: serde_json::Value = serde_json::from_str(&json_str)
550            .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
551        let pkgs = val
552            .get("packages")
553            .and_then(|p| p.as_array())
554            .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
555        if pkgs.is_empty() {
556            return Err(format!("pkg not found: {name}"));
557        }
558        serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
559    }
560
561    // ─── Package scaffold ─────────────────────────────────────
562
563    async fn pkg_scaffold(
564        &self,
565        name: String,
566        target_dir: Option<String>,
567        category: Option<String>,
568        description: Option<String>,
569    ) -> Result<String, String> {
570        let svc = self.clone();
571        tokio::task::spawn_blocking(move || {
572            AppService::pkg_scaffold(
573                &svc,
574                &name,
575                target_dir.as_deref(),
576                category.as_deref(),
577                description.as_deref(),
578            )
579        })
580        .await
581        .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
582    }
583
584    // ─── Hub resources ───────────────────────────────────────
585
586    /// Aggregate hub index across all registered cache sources.
587    ///
588    /// Delegates to `AppService::aggregate_index`, then serializes the
589    /// result to a JSON string. Individual source failures and registry-load
590    /// failures are embedded in the response JSON under a `"warnings"` field
591    /// so the MCP caller can observe partial failures without losing the
592    /// aggregate result.
593    async fn hub_index_aggregate(&self) -> Result<String, String> {
594        let svc = self.clone();
595        let (index, warnings) = tokio::task::spawn_blocking(move || {
596            AppService::aggregate_index(&svc).map_err(|e| e.to_string())
597        })
598        .await
599        .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
600
601        let mut json = serde_json::to_value(&index)
602            .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
603        if !warnings.is_empty() {
604            if let Some(obj) = json.as_object_mut() {
605                obj.insert("warnings".to_string(), serde_json::json!(warnings));
606            }
607        }
608        serde_json::to_string(&json)
609            .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
610    }
611
612    // ─── Settings ────────────────────────────────────────────
613
614    async fn setting_resolve(&self, target: Option<String>) -> Result<String, String> {
615        let app_dir = self.log_config.app_dir();
616        let project_root = self.resolve_root(None);
617        tokio::task::spawn_blocking(move || {
618            crate::service::setting::resolve_setting(
619                &app_dir,
620                project_root.as_deref(),
621                target.as_deref(),
622            )
623            .map_err(|e| e.to_string())
624            .and_then(|r| {
625                serde_json::to_string(&r).map_err(|e| format!("setting_resolve: serialize: {e}"))
626            })
627        })
628        .await
629        .map_err(|e| format!("setting_resolve: task panicked: {e}"))?
630    }
631
632    // ─── State management ────────────────────────────────────
633
634    async fn state_list(&self, namespace: String) -> Result<String, String> {
635        let store = Arc::clone(&self.state_store);
636        tokio::task::spawn_blocking(move || {
637            store
638                .list_dispatched(&namespace)
639                .map_err(AppService::state_err_to_wire)
640                .and_then(|keys| {
641                    // Wire shape: { "keys": [string] } per docs/state-management.md L92.
642                    // TODO(ST2): verify this shape via real rmcp stdio in tests/e2e.rs
643                    // (test_alc_state_list_* happy path should assert parsed["keys"].is_array()).
644                    serde_json::to_string(&serde_json::json!({"keys": keys}))
645                        .map_err(|e| format!("state_list: serialize: {e}"))
646                })
647        })
648        .await
649        .map_err(|e| format!("state_list: task panicked: {e}"))?
650    }
651
652    async fn state_show(&self, namespace: String, key: String) -> Result<String, String> {
653        let store = Arc::clone(&self.state_store);
654        tokio::task::spawn_blocking(move || {
655            store
656                .show_dispatched(&namespace, &key)
657                .map_err(AppService::state_err_to_wire)
658                .and_then(|value| {
659                    serde_json::to_string(&value).map_err(|e| format!("state_show: serialize: {e}"))
660                })
661        })
662        .await
663        .map_err(|e| format!("state_show: task panicked: {e}"))?
664    }
665
666    async fn state_reset(
667        &self,
668        namespace: String,
669        key: String,
670        steps: Option<Vec<String>>,
671        fields: Option<Vec<String>>,
672    ) -> Result<String, String> {
673        let store = Arc::clone(&self.state_store);
674        // Clone input slices before moving into closure so we can echo them in the response.
675        let steps_input: Vec<String> = steps.clone().unwrap_or_default();
676        let fields_input: Vec<String> = fields.clone().unwrap_or_default();
677        tokio::task::spawn_blocking(move || {
678            let steps_slice: Vec<String> = steps.unwrap_or_default();
679            let fields_slice: Vec<String> = fields.unwrap_or_default();
680            store
681                .reset_dispatched_with_backup(&namespace, &key, &steps_slice, &fields_slice)
682                .map_err(AppService::state_err_to_wire)
683                .and_then(|report: ResetReport| {
684                    let v = serde_json::json!({
685                        "ok": true,
686                        "backup_path": report.backup_path.to_string_lossy(),
687                        "steps_removed": report.steps_removed,
688                        "steps_input": steps_input,
689                        "fields_removed": report.fields_removed,
690                        "fields_input": fields_input,
691                    });
692                    serde_json::to_string(&v).map_err(|e| format!("state_reset: serialize: {e}"))
693                })
694        })
695        .await
696        .map_err(|e| format!("state_reset: task panicked: {e}"))?
697    }
698
699    async fn state_set(
700        &self,
701        namespace: String,
702        key: String,
703        value: serde_json::Value,
704    ) -> Result<String, String> {
705        let store = Arc::clone(&self.state_store);
706        tokio::task::spawn_blocking(move || {
707            store
708                .set_dispatched(&namespace, &key, &value)
709                .map_err(AppService::state_err_to_wire)
710                .map(|_| r#"{"ok":true}"#.to_string())
711        })
712        .await
713        .map_err(|e| format!("state_set: task panicked: {e}"))?
714    }
715
716    async fn state_delete(&self, namespace: String, key: String) -> Result<String, String> {
717        let store = Arc::clone(&self.state_store);
718        tokio::task::spawn_blocking(move || {
719            store
720                .delete_dispatched(&namespace, &key)
721                .map_err(AppService::state_err_to_wire)
722                .and_then(|existed| {
723                    serde_json::to_string(&serde_json::json!({"ok": true, "existed": existed}))
724                        .map_err(|e| format!("state_delete: serialize: {e}"))
725                })
726        })
727        .await
728        .map_err(|e| format!("state_delete: task panicked: {e}"))?
729    }
730
731    // ─── Diagnostics ─────────────────────────────────────────
732
733    async fn info(&self) -> String {
734        let svc = self.clone();
735        tokio::task::spawn_blocking(move || AppService::info(&svc))
736            .await
737            .unwrap_or_else(|e| format!("{{\"error\": \"info: task panicked: {e}\"}}"))
738    }
739
740    // ─── Pool management ─────────────────────────────────────
741
742    async fn pool_ensure(&self) -> Result<String, String> {
743        AppService::pool_ensure_impl(self).await
744    }
745
746    async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
747        AppService::pool_status_impl(self, sid).await
748    }
749
750    async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
751        AppService::pool_stop_impl(self, sid).await
752    }
753}
754
755// ─── State management inherent helpers ───────────────────────────────────────
756
757impl AppService {
758    /// Convert a [`StateError`] into a typed wire error JSON string.
759    ///
760    /// Each variant maps to a distinct `"error"` code so callers can distinguish
761    /// `NOT_FOUND` from generic I/O errors at the wire level.
762    ///
763    /// # Arguments
764    /// - `e` — the engine-layer error to convert.
765    ///
766    /// # Returns
767    /// A JSON string `{"error":"<CODE>",...}`. Falls back to an INTERNAL error JSON
768    /// string if serialization itself fails (should never occur for string-only values).
769    fn state_err_to_wire(e: StateError) -> String {
770        let v = match e {
771            StateError::KeyNotFound { namespace, key } => {
772                serde_json::json!({"error": "NOT_FOUND", "namespace": namespace, "key": key})
773            }
774            StateError::UnsafeSegment { which, value } => {
775                serde_json::json!({"error": "UNSAFE_SEGMENT", "which": which, "value": value})
776            }
777            StateError::IoBackup(io_err) => {
778                serde_json::json!({"error": "IO_BACKUP", "message": io_err.to_string()})
779            }
780            StateError::IoRead(io_err) => {
781                serde_json::json!({"error": "IO_READ", "message": io_err.to_string()})
782            }
783            StateError::IoWrite(io_err) => {
784                serde_json::json!({"error": "IO_WRITE", "message": io_err.to_string()})
785            }
786            StateError::Serde(serde_err) => {
787                serde_json::json!({"error": "SERDE", "message": serde_err.to_string()})
788            }
789            StateError::ShapeInvalid { reason } => {
790                serde_json::json!({"error": "SHAPE_INVALID", "reason": reason})
791            }
792        };
793        serde_json::to_string(&v).unwrap_or_else(|e| {
794            // justification: the json! macro above only contains string values, so
795            // to_string() cannot fail under normal conditions. The unwrap_or_else
796            // is a purely defensive fallback.
797            format!("{{\"error\":\"INTERNAL\",\"message\":\"serialize failed: {e}\"}}")
798        })
799    }
800}
801
802// ─── Pool inherent helpers ────────────────────────────────────────────────────
803
804impl AppService {
805    /// Scan registry.json, GC dead workers, and return live sessions.
806    ///
807    /// Idempotent: calling twice produces the same result when no workers change
808    /// state between calls.  Does NOT spawn new workers.
809    pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
810        let reg_path = self.pool_reg_path.clone();
811        let lock_path = self.pool_lock_path.clone();
812
813        let sessions =
814            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
815                with_registry_lock(&lock_path, || {
816                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
817                    let survivors = reg.scan_and_gc()?;
818                    // Persist GC result back to disk.
819                    reg.save(&reg_path)?;
820                    let entries = survivors
821                        .iter()
822                        .map(|e| {
823                            serde_json::json!({
824                                "sid": e.sid,
825                                "pid": e.pid,
826                                "sock": e.sock.to_string_lossy(),
827                                "version": e.version,
828                            })
829                        })
830                        .collect::<Vec<_>>();
831                    Ok(entries)
832                })
833            })
834            .await
835            .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
836            .map_err(|e| e.to_string())?;
837
838        let pool_version = env!("CARGO_PKG_VERSION");
839        serde_json::to_string(&serde_json::json!({
840            "sessions": sessions,
841            "pool_version": pool_version,
842        }))
843        .map_err(|e| format!("pool_ensure: serialize: {e}"))
844    }
845
846    /// Return pool worker status from registry.json.
847    ///
848    /// When `sid` is `Some`, restricts output to that single worker.
849    /// Uses kill -0 liveness check for each returned entry.
850    pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
851        let reg_path = self.pool_reg_path.clone();
852        let lock_path = self.pool_lock_path.clone();
853
854        let sessions =
855            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
856                with_registry_lock(&lock_path, || {
857                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
858                    // GC dead entries in-place so status reflects reality.
859                    let _ = reg.scan_and_gc()?;
860                    reg.save(&reg_path)?;
861
862                    let entries: Vec<serde_json::Value> = reg
863                        .sessions
864                        .iter()
865                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
866                        .map(|e| {
867                            serde_json::json!({
868                                "sid": e.sid,
869                                "pid": e.pid,
870                                "sock": e.sock.to_string_lossy(),
871                                "version": e.version,
872                                "created_at": e.created_at,
873                                // Status is "running" for all live entries (UDS ping not required in POC).
874                                "status": "running",
875                            })
876                        })
877                        .collect();
878                    Ok(entries)
879                })
880            })
881            .await
882            .map_err(|e| format!("pool_status: task panicked: {e}"))?
883            .map_err(|e| e.to_string())?;
884
885        let pool_version = env!("CARGO_PKG_VERSION");
886        serde_json::to_string(&serde_json::json!({
887            "sessions": sessions,
888            "pool_version": pool_version,
889        }))
890        .map_err(|e| format!("pool_status: serialize: {e}"))
891    }
892
893    /// Send SIGTERM to all workers or a single worker identified by `sid`.
894    ///
895    /// After SIGTERM, removes the entry from registry.json.
896    /// Returns `{"stopped": [...], "errors": [...]}`.
897    /// SIGTERM send failures are surfaced in the `errors` array (not dropped silently).
898    pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
899        let reg_path = self.pool_reg_path.clone();
900        let lock_path = self.pool_lock_path.clone();
901
902        let result = tokio::task::spawn_blocking(
903            move || -> Result<(Vec<String>, Vec<String>), PoolError> {
904                with_registry_lock(&lock_path, || {
905                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
906
907                    // Determine targets.
908                    let targets: Vec<_> = reg
909                        .sessions
910                        .iter()
911                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
912                        .cloned()
913                        .collect();
914
915                    let mut stopped: Vec<String> = Vec::new();
916                    let mut errors: Vec<String> = Vec::new();
917
918                    for entry in &targets {
919                        #[cfg(unix)]
920                        {
921                            // K-52: guard u32 → i32 (pid_t) overflow; also reject pid == 0
922                            // (POSIX kill(2): pid=0 signals every process in the calling process
923                            // group, pid<0 signals a process group — both are unsafe here).
924                            let pid_t = match i32::try_from(entry.pid) {
925                                Ok(p) if p > 0 => p,
926                                Ok(_) => {
927                                    errors.push(format!(
928                                        "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
929                                        entry.sid, entry.pid
930                                    ));
931                                    reg.remove(&entry.sid);
932                                    continue;
933                                }
934                                Err(_) => {
935                                    errors.push(format!(
936                                        "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
937                                        entry.sid, entry.pid
938                                    ));
939                                    // Remove the entry anyway (PID is invalid, worker is unreachable).
940                                    reg.remove(&entry.sid);
941                                    continue;
942                                }
943                            };
944
945                            // SAFETY: libc::kill(pid, SIGTERM) is a thin syscall wrapper.
946                            // pid_t > 0, verified by the match arm above.
947                            // pid fits in i32 (verified above).
948                            let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
949                            if ret == 0 {
950                                stopped.push(entry.sid.clone());
951                            } else {
952                                let os_err = std::io::Error::last_os_error();
953                                if os_err.raw_os_error() == Some(libc::ESRCH) {
954                                    // Process already dead — treat as stopped (idempotent).
955                                    stopped.push(entry.sid.clone());
956                                } else {
957                                    errors.push(format!(
958                                        "sid={}: SIGTERM failed: {}",
959                                        entry.sid, os_err
960                                    ));
961                                }
962                            }
963                        }
964                        #[cfg(not(unix))]
965                        {
966                            // Non-Unix: cannot send SIGTERM; report as unsupported.
967                            errors.push(format!(
968                                "sid={}: SIGTERM not supported on this platform",
969                                entry.sid
970                            ));
971                        }
972                        // Remove from registry regardless of SIGTERM result
973                        // (dead or dying, we no longer track it).
974                        reg.remove(&entry.sid);
975                    }
976
977                    // Persist updated registry (entries removed).
978                    reg.save(&reg_path)?;
979
980                    Ok((stopped, errors))
981                })
982            },
983        )
984        .await
985        .map_err(|e| format!("pool_stop: task panicked: {e}"))?
986        .map_err(|e| e.to_string())?;
987
988        let (stopped, errors) = result;
989        serde_json::to_string(&serde_json::json!({
990            "stopped": stopped,
991            "errors": errors,
992        }))
993        .map_err(|e| format!("pool_stop: serialize: {e}"))
994    }
995}
996
997// ─── Tests ────────────────────────────────────────────────────────────────────
998
999#[cfg(test)]
1000mod tests {
1001    use super::super::test_support::make_app_service_at;
1002
1003    /// pool_stop_impl rejects pid=0 without delivering SIGTERM.
1004    ///
1005    /// A registry.json containing `"pid": 0` must be handled as an invalid
1006    /// POSIX target: the error is surfaced in the `errors` array, the entry is
1007    /// removed from the on-disk registry, and the test process itself survives
1008    /// (proving no SIGTERM was sent to the process group).
1009    #[tokio::test]
1010    #[cfg(unix)]
1011    async fn pool_stop_pid_zero_is_rejected() {
1012        // Arrange: build an AppService rooted at a tempdir so no real $HOME is
1013        // touched, then seed registry.json with a single pid=0 entry.
1014        let tmp = tempfile::tempdir().expect("tempdir");
1015        let root = tmp.path().to_path_buf();
1016        let svc = make_app_service_at(root.clone()).await;
1017
1018        // The pool registry lives at {app_dir}/state/pool/registry.json.
1019        // AppDir::state_dir() resolves to {root}/state.
1020        let pool_reg_path = root.join("state").join("pool").join("registry.json");
1021        std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
1022
1023        let seeded = serde_json::json!({
1024            "sessions": [{
1025                "sid": "zero-pid-session",
1026                "pid": 0u32,
1027                "sock": "/tmp/alc-pool/zero.sock",
1028                "version": "0.30.0",
1029                "created_at": "2026-01-01T00:00:00Z"
1030            }]
1031        });
1032        std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
1033
1034        // Act: stop all sessions.
1035        let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
1036        let result: serde_json::Value =
1037            serde_json::from_str(&json_str).expect("response is valid JSON");
1038
1039        // Assert (1): the error message contains "not a valid POSIX target pid".
1040        let errors = result["errors"].as_array().expect("errors array");
1041        assert!(
1042            !errors.is_empty(),
1043            "expected at least one error for pid=0 entry"
1044        );
1045        let err_msg = errors[0].as_str().unwrap_or("");
1046        assert!(
1047            err_msg.contains("not a valid POSIX target pid"),
1048            "unexpected error message: {err_msg}"
1049        );
1050
1051        // Assert (2): stopped array is empty (no process was stopped).
1052        let stopped = result["stopped"].as_array().expect("stopped array");
1053        assert!(
1054            stopped.is_empty(),
1055            "pid=0 entry must not appear in stopped list"
1056        );
1057
1058        // Assert (3): the entry is removed from the on-disk registry.
1059        let on_disk: serde_json::Value =
1060            serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
1061                .expect("parse registry");
1062        let sessions = on_disk["sessions"].as_array().expect("sessions array");
1063        assert!(
1064            sessions.is_empty(),
1065            "pid=0 entry must be removed from on-disk registry"
1066        );
1067
1068        // Assert (4): test process is still alive — trivially confirmed by
1069        // reaching this line without being killed by SIGTERM.
1070    }
1071}