Skip to main content

algocline_app/service/
engine_api_impl.rs

1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11/// Delegates each [`EngineApi`] method to the corresponding `AppService`
12/// inherent method via fully-qualified syntax (`AppService::method(self, …)`).
13///
14/// This avoids ambiguity between the trait method and the inherent method
15/// of the same name, preventing accidental infinite recursion if the
16/// inherent method is ever removed or renamed.
17#[async_trait]
18impl EngineApi for AppService {
19    // ─── Core execution ──────────────────────────────────────
20
21    async fn run(
22        &self,
23        code: Option<String>,
24        code_file: Option<String>,
25        ctx: Option<serde_json::Value>,
26        project_root: Option<String>,
27        host_mode: Option<bool>,
28    ) -> Result<String, String> {
29        AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30    }
31
32    async fn advice(
33        &self,
34        strategy: &str,
35        task: Option<String>,
36        opts: Option<serde_json::Value>,
37        project_root: Option<String>,
38    ) -> Result<String, String> {
39        AppService::advice(self, strategy, task, opts, project_root).await
40    }
41
42    async fn continue_single(
43        &self,
44        session_id: &str,
45        response: String,
46        query_id: Option<&str>,
47        usage: Option<algocline_core::TokenUsage>,
48    ) -> Result<String, String> {
49        AppService::continue_single(self, session_id, response, query_id, usage).await
50    }
51
52    async fn continue_batch(
53        &self,
54        session_id: &str,
55        responses: Vec<QueryResponse>,
56    ) -> Result<String, String> {
57        AppService::continue_batch(self, session_id, responses).await
58    }
59
60    // ─── Session status ──────────────────────────────────────
61
62    async fn status(
63        &self,
64        session_id: Option<&str>,
65        pending_filter: Option<serde_json::Value>,
66        include_history: bool,
67    ) -> Result<String, String> {
68        AppService::status(self, session_id, pending_filter, include_history).await
69    }
70
71    // ─── Evaluation ──────────────────────────────────────────
72
73    async fn eval(
74        &self,
75        scenario: Option<String>,
76        scenario_file: Option<String>,
77        scenario_name: Option<String>,
78        strategy: &str,
79        strategy_opts: Option<serde_json::Value>,
80        auto_card: bool,
81    ) -> Result<String, String> {
82        AppService::eval(
83            self,
84            scenario,
85            scenario_file,
86            scenario_name,
87            strategy,
88            strategy_opts,
89            auto_card,
90        )
91        .await
92    }
93
94    async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95        AppService::eval_history(self, strategy, limit)
96    }
97
98    async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99        AppService::eval_detail(self, eval_id)
100    }
101
102    async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103        AppService::eval_compare(self, eval_id_a, eval_id_b).await
104    }
105
106    // ─── Scenarios ───────────────────────────────────────────
107
108    async fn scenario_list(&self) -> Result<String, String> {
109        AppService::scenario_list(self)
110    }
111
112    async fn scenario_show(&self, name: &str) -> Result<String, String> {
113        AppService::scenario_show(self, name)
114    }
115
116    async fn scenario_install(&self, url: String) -> Result<String, String> {
117        AppService::scenario_install(self, url).await
118    }
119
120    // ─── Packages ────────────────────────────────────────────
121
122    async fn pkg_link(
123        &self,
124        path: String,
125        name: Option<String>,
126        force: Option<bool>,
127        scope: Option<String>,
128        project_root: Option<String>,
129    ) -> Result<String, String> {
130        AppService::pkg_link(self, path, name, force, scope, project_root).await
131    }
132
133    async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134        AppService::pkg_unlink(self, name).await
135    }
136
137    #[allow(clippy::too_many_arguments)]
138    async fn pkg_list(
139        &self,
140        project_root: Option<String>,
141        limit: Option<i32>,
142        sort: Option<String>,
143        filter: Option<serde_json::Value>,
144        fields: Option<Vec<String>>,
145        verbose: Option<String>,
146    ) -> Result<String, String> {
147        // `filter` is a free-form JSON Value at the MCP boundary (so the
148        // trait stays core-crate-pure). If the caller sends something
149        // that is not a JSON object we treat it as "no filter" and log
150        // the drop so operators can diagnose unexpected filter shapes
151        // in production.
152        let filter_map = match filter {
153            None => None,
154            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155                Ok(map) => Some(map),
156                Err(e) => {
157                    tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158                    None
159                }
160            },
161        };
162
163        // Negative limit values from MCP callers are clamped to 0 rather
164        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
165        // Downstream semantics: `Some(0)` means "no limit" (return all) —
166        // the truncate path in `AppService::pkg_list` short-circuits on 0.
167        let opts = ListOpts {
168            limit: limit.map(|n| n.max(0) as usize),
169            sort,
170            filter: filter_map,
171            fields,
172            verbose,
173        };
174
175        AppService::pkg_list(self, project_root, opts)
176            .await
177            .map_err(|e| e.to_string())
178    }
179
180    async fn pkg_install(
181        &self,
182        url: String,
183        name: Option<String>,
184        force: Option<bool>,
185    ) -> Result<String, String> {
186        AppService::pkg_install(self, url, name, force).await
187    }
188
189    async fn pkg_remove(
190        &self,
191        name: &str,
192        project_root: Option<String>,
193        version: Option<String>,
194        scope: Option<String>,
195    ) -> Result<String, String> {
196        AppService::pkg_remove(self, name, project_root, version, scope).await
197    }
198
199    async fn pkg_repair(
200        &self,
201        name: Option<String>,
202        project_root: Option<String>,
203    ) -> Result<String, String> {
204        AppService::pkg_repair(self, name, project_root).await
205    }
206
207    async fn pkg_doctor(
208        &self,
209        name: Option<String>,
210        project_root: Option<String>,
211    ) -> Result<String, String> {
212        AppService::pkg_doctor(self, name, project_root).await
213    }
214
215    /// Run mlua-lspec tests for a package, a single file, or inline code.
216    ///
217    /// Forwards to [`AppService::pkg_test`]. See trait doc for full contract.
218    #[allow(clippy::too_many_arguments)]
219    async fn pkg_test(
220        &self,
221        pkg: Option<String>,
222        code_file: Option<String>,
223        code: Option<String>,
224        spec_dir: Option<String>,
225        filter: Option<String>,
226        search_paths: Option<Vec<String>>,
227        project_root: Option<String>,
228        auto_search_paths: Option<bool>,
229    ) -> Result<String, String> {
230        AppService::pkg_test(
231            self,
232            pkg,
233            code_file,
234            code,
235            spec_dir,
236            filter,
237            search_paths,
238            project_root,
239            auto_search_paths,
240        )
241        .await
242    }
243
244    // ─── Logging ─────────────────────────────────────────────
245
246    async fn add_note(
247        &self,
248        session_id: &str,
249        content: &str,
250        title: Option<&str>,
251    ) -> Result<String, String> {
252        AppService::add_note(self, session_id, content, title).await
253    }
254
255    async fn log_view(
256        &self,
257        session_id: Option<&str>,
258        limit: Option<usize>,
259        max_chars: Option<usize>,
260    ) -> Result<String, String> {
261        AppService::log_view(self, session_id, limit, max_chars).await
262    }
263
264    async fn stats(
265        &self,
266        strategy_filter: Option<&str>,
267        days: Option<u64>,
268    ) -> Result<String, String> {
269        AppService::stats(self, strategy_filter, days)
270    }
271
272    // ─── Project lifecycle ────────────────────────────────────
273
274    async fn init(&self, project_root: Option<String>) -> Result<String, String> {
275        AppService::init(self, project_root).await
276    }
277
278    async fn update(&self, project_root: Option<String>) -> Result<String, String> {
279        AppService::update(self, project_root).await
280    }
281
282    async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
283        AppService::migrate(self, project_root).await
284    }
285
286    // ─── Session activation (issue #1776627475) ──────────────
287
288    async fn session_new(
289        &self,
290        project_root: Option<String>,
291        mode: Option<String>,
292    ) -> Result<String, String> {
293        let session = self.activate_session(project_root.as_deref(), mode.as_deref())?;
294        let result = serde_json::json!({
295            "session_id": session.session_id,
296            "project_root": session
297                .project_root
298                .as_ref()
299                .map(|p| p.to_string_lossy().to_string()),
300            "mode": session.mode.as_str(),
301        });
302        serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
303    }
304
305    // ─── Cards ───────────────────────────────────────────────
306
307    async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
308        AppService::card_list(self, pkg.as_deref())
309    }
310
311    async fn card_get(&self, card_id: &str) -> Result<String, String> {
312        AppService::card_get(self, card_id)
313    }
314
315    async fn card_find(
316        &self,
317        pkg: Option<String>,
318        where_: Option<serde_json::Value>,
319        order_by: Option<serde_json::Value>,
320        limit: Option<usize>,
321        offset: Option<usize>,
322    ) -> Result<String, String> {
323        AppService::card_find(self, pkg, where_, order_by, limit, offset)
324    }
325
326    async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
327        AppService::card_alias_list(self, pkg.as_deref())
328    }
329
330    async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
331        AppService::card_get_by_alias(self, name)
332    }
333
334    async fn card_alias_set(
335        &self,
336        name: &str,
337        card_id: &str,
338        pkg: Option<String>,
339        note: Option<String>,
340    ) -> Result<String, String> {
341        AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
342    }
343
344    async fn card_append(
345        &self,
346        card_id: &str,
347        fields: serde_json::Value,
348    ) -> Result<String, String> {
349        AppService::card_append(self, card_id, fields)
350    }
351
352    async fn card_install(&self, url: String) -> Result<String, String> {
353        AppService::card_install(self, url).await
354    }
355
356    async fn card_samples(
357        &self,
358        card_id: &str,
359        offset: Option<usize>,
360        limit: Option<usize>,
361        where_: Option<serde_json::Value>,
362    ) -> Result<String, String> {
363        AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
364    }
365
366    async fn card_lineage(
367        &self,
368        card_id: &str,
369        direction: Option<String>,
370        depth: Option<usize>,
371        include_stats: Option<bool>,
372        relation_filter: Option<Vec<String>>,
373    ) -> Result<String, String> {
374        AppService::card_lineage(
375            self,
376            card_id,
377            direction.as_deref(),
378            depth,
379            include_stats,
380            relation_filter,
381        )
382    }
383
384    async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
385        AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
386    }
387
388    async fn card_analyze(&self, card_id: &str, pkg: Option<String>) -> Result<String, String> {
389        AppService::card_analyze(self, card_id, pkg).await
390    }
391
392    // ─── Hub ─────────────────────────────────────────────────
393
394    async fn hub_reindex(
395        &self,
396        output_path: Option<String>,
397        source_dir: Option<String>,
398    ) -> Result<String, String> {
399        self.hub_reindex(output_path.as_deref(), source_dir.as_deref())
400            .await
401    }
402
403    async fn hub_gendoc(
404        &self,
405        source_dir: String,
406        out_dir: Option<String>,
407        projections: Option<Vec<String>>,
408        config_path: Option<String>,
409        lint_strict: Option<bool>,
410    ) -> Result<String, String> {
411        let svc = self.clone();
412        tokio::task::spawn_blocking(move || {
413            crate::AppService::hub_gendoc(
414                &svc,
415                &source_dir,
416                out_dir.as_deref(),
417                projections.as_deref(),
418                config_path.as_deref(),
419                lint_strict,
420            )
421        })
422        .await
423        .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
424    }
425
426    async fn hub_dist(
427        &self,
428        source_dir: String,
429        output_path: Option<String>,
430        out_dir: Option<String>,
431        preset: Option<String>,
432        project_root: Option<String>,
433        projections: Option<Vec<String>>,
434        config_path: Option<String>,
435        lint_strict: Option<bool>,
436    ) -> Result<String, String> {
437        self.hub_dist(
438            &source_dir,
439            output_path.as_deref(),
440            out_dir.as_deref(),
441            preset.as_deref(),
442            project_root.as_deref(),
443            projections.as_deref(),
444            config_path.as_deref(),
445            lint_strict,
446        )
447        .await
448    }
449
450    async fn hub_info(&self, pkg: String) -> Result<String, String> {
451        let svc = self.clone();
452        tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
453            .await
454            .map_err(|e| format!("hub_info task panicked: {e}"))?
455    }
456
457    #[allow(clippy::too_many_arguments)]
458    async fn hub_search(
459        &self,
460        query: Option<String>,
461        category: Option<String>,
462        installed_only: Option<bool>,
463        limit: Option<i32>,
464        sort: Option<String>,
465        filter: Option<serde_json::Value>,
466        fields: Option<Vec<String>>,
467        verbose: Option<String>,
468        local_indices: Option<Vec<String>>,
469    ) -> Result<String, String> {
470        let svc = self.clone();
471
472        // `filter` is a free-form JSON Value at the MCP boundary (so the
473        // trait stays core-crate-pure). If the caller sends something
474        // that is not a JSON object we treat it as "no filter" — the
475        // explicit category/installed_only params still cover the common
476        // cases. The MCP `JsonSchema` layer will have already flagged
477        // hard type errors. We log the drop so operators can diagnose
478        // unexpected filter shapes in production.
479        let filter_map = match filter {
480            None => None,
481            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
482                Ok(map) => Some(map),
483                Err(e) => {
484                    tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
485                    None
486                }
487            },
488        };
489
490        // Negative limit values from MCP callers are clamped to 0 rather
491        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
492        // Downstream semantics: `Some(0)` means "no limit" (return all) —
493        // the truncate path in `AppService::hub_search` short-circuits on 0.
494        let opts = ListOpts {
495            limit: limit.map(|n| n.max(0) as usize),
496            sort,
497            filter: filter_map,
498            fields,
499            verbose,
500        };
501
502        tokio::task::spawn_blocking(move || {
503            AppService::hub_search(
504                &svc,
505                query.as_deref(),
506                category.as_deref(),
507                installed_only,
508                opts,
509                local_indices,
510            )
511        })
512        .await
513        .map_err(|e| format!("hub_search task panicked: {e}"))?
514    }
515
516    // ─── Package read ─────────────────────────────────────────
517
518    async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
519        AppService::pkg_read_init_lua(self, name, None)
520    }
521
522    async fn pkg_get_narrative_md(&self, name: &str) -> Result<Option<String>, String> {
523        AppService::pkg_get_narrative_md(self, name).await
524    }
525
526    async fn pkg_meta(&self, name: &str) -> Result<String, String> {
527        let filter = serde_json::json!({ "name": name });
528        let json_str = EngineApi::pkg_list(
529            self,
530            None,
531            None,
532            None,
533            Some(filter),
534            None,
535            Some("full".to_string()),
536        )
537        .await?;
538        let val: serde_json::Value = serde_json::from_str(&json_str)
539            .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
540        let pkgs = val
541            .get("packages")
542            .and_then(|p| p.as_array())
543            .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
544        if pkgs.is_empty() {
545            return Err(format!("pkg not found: {name}"));
546        }
547        serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
548    }
549
550    // ─── Package scaffold ─────────────────────────────────────
551
552    async fn pkg_scaffold(
553        &self,
554        name: String,
555        target_dir: Option<String>,
556        category: Option<String>,
557        description: Option<String>,
558    ) -> Result<String, String> {
559        let svc = self.clone();
560        tokio::task::spawn_blocking(move || {
561            AppService::pkg_scaffold(
562                &svc,
563                &name,
564                target_dir.as_deref(),
565                category.as_deref(),
566                description.as_deref(),
567            )
568        })
569        .await
570        .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
571    }
572
573    // ─── Hub resources ───────────────────────────────────────
574
575    /// Aggregate hub index across all registered cache sources.
576    ///
577    /// Delegates to `AppService::aggregate_index`, then serializes the
578    /// result to a JSON string. Individual source failures and registry-load
579    /// failures are embedded in the response JSON under a `"warnings"` field
580    /// so the MCP caller can observe partial failures without losing the
581    /// aggregate result.
582    async fn hub_index_aggregate(&self) -> Result<String, String> {
583        let svc = self.clone();
584        let (index, warnings) = tokio::task::spawn_blocking(move || {
585            AppService::aggregate_index(&svc).map_err(|e| e.to_string())
586        })
587        .await
588        .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
589
590        let mut json = serde_json::to_value(&index)
591            .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
592        if !warnings.is_empty() {
593            if let Some(obj) = json.as_object_mut() {
594                obj.insert("warnings".to_string(), serde_json::json!(warnings));
595            }
596        }
597        serde_json::to_string(&json)
598            .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
599    }
600
601    // ─── Settings ────────────────────────────────────────────
602
603    async fn setting_resolve(&self, target: Option<String>) -> Result<String, String> {
604        let app_dir = self.log_config.app_dir();
605        let project_root = self.resolve_root(None);
606        tokio::task::spawn_blocking(move || {
607            crate::service::setting::resolve_setting(
608                &app_dir,
609                project_root.as_deref(),
610                target.as_deref(),
611            )
612            .map_err(|e| e.to_string())
613            .and_then(|r| {
614                serde_json::to_string(&r).map_err(|e| format!("setting_resolve: serialize: {e}"))
615            })
616        })
617        .await
618        .map_err(|e| format!("setting_resolve: task panicked: {e}"))?
619    }
620
621    // ─── Diagnostics ─────────────────────────────────────────
622
623    async fn info(&self) -> String {
624        AppService::info(self)
625    }
626
627    // ─── Pool management ─────────────────────────────────────
628
629    async fn pool_ensure(&self) -> Result<String, String> {
630        AppService::pool_ensure_impl(self).await
631    }
632
633    async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
634        AppService::pool_status_impl(self, sid).await
635    }
636
637    async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
638        AppService::pool_stop_impl(self, sid).await
639    }
640}
641
642// ─── Pool inherent helpers ────────────────────────────────────────────────────
643
644impl AppService {
645    /// Scan registry.json, GC dead workers, and return live sessions.
646    ///
647    /// Idempotent: calling twice produces the same result when no workers change
648    /// state between calls.  Does NOT spawn new workers.
649    pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
650        let reg_path = self.pool_reg_path.clone();
651        let lock_path = self.pool_lock_path.clone();
652
653        let sessions =
654            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
655                with_registry_lock(&lock_path, || {
656                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
657                    let survivors = reg.scan_and_gc()?;
658                    // Persist GC result back to disk.
659                    reg.save(&reg_path)?;
660                    let entries = survivors
661                        .iter()
662                        .map(|e| {
663                            serde_json::json!({
664                                "sid": e.sid,
665                                "pid": e.pid,
666                                "sock": e.sock.to_string_lossy(),
667                                "version": e.version,
668                            })
669                        })
670                        .collect::<Vec<_>>();
671                    Ok(entries)
672                })
673            })
674            .await
675            .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
676            .map_err(|e| e.to_string())?;
677
678        let pool_version = env!("CARGO_PKG_VERSION");
679        serde_json::to_string(&serde_json::json!({
680            "sessions": sessions,
681            "pool_version": pool_version,
682        }))
683        .map_err(|e| format!("pool_ensure: serialize: {e}"))
684    }
685
686    /// Return pool worker status from registry.json.
687    ///
688    /// When `sid` is `Some`, restricts output to that single worker.
689    /// Uses kill -0 liveness check for each returned entry.
690    pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
691        let reg_path = self.pool_reg_path.clone();
692        let lock_path = self.pool_lock_path.clone();
693
694        let sessions =
695            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
696                with_registry_lock(&lock_path, || {
697                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
698                    // GC dead entries in-place so status reflects reality.
699                    let _ = reg.scan_and_gc()?;
700                    reg.save(&reg_path)?;
701
702                    let entries: Vec<serde_json::Value> = reg
703                        .sessions
704                        .iter()
705                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
706                        .map(|e| {
707                            serde_json::json!({
708                                "sid": e.sid,
709                                "pid": e.pid,
710                                "sock": e.sock.to_string_lossy(),
711                                "version": e.version,
712                                "created_at": e.created_at,
713                                // Status is "running" for all live entries (UDS ping not required in POC).
714                                "status": "running",
715                            })
716                        })
717                        .collect();
718                    Ok(entries)
719                })
720            })
721            .await
722            .map_err(|e| format!("pool_status: task panicked: {e}"))?
723            .map_err(|e| e.to_string())?;
724
725        let pool_version = env!("CARGO_PKG_VERSION");
726        serde_json::to_string(&serde_json::json!({
727            "sessions": sessions,
728            "pool_version": pool_version,
729        }))
730        .map_err(|e| format!("pool_status: serialize: {e}"))
731    }
732
733    /// Send SIGTERM to all workers or a single worker identified by `sid`.
734    ///
735    /// After SIGTERM, removes the entry from registry.json.
736    /// Returns `{"stopped": [...], "errors": [...]}`.
737    /// SIGTERM send failures are surfaced in the `errors` array (not dropped silently).
738    pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
739        let reg_path = self.pool_reg_path.clone();
740        let lock_path = self.pool_lock_path.clone();
741
742        let result = tokio::task::spawn_blocking(
743            move || -> Result<(Vec<String>, Vec<String>), PoolError> {
744                with_registry_lock(&lock_path, || {
745                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
746
747                    // Determine targets.
748                    let targets: Vec<_> = reg
749                        .sessions
750                        .iter()
751                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
752                        .cloned()
753                        .collect();
754
755                    let mut stopped: Vec<String> = Vec::new();
756                    let mut errors: Vec<String> = Vec::new();
757
758                    for entry in &targets {
759                        #[cfg(unix)]
760                        {
761                            // K-52: guard u32 → i32 (pid_t) overflow; also reject pid == 0
762                            // (POSIX kill(2): pid=0 signals every process in the calling process
763                            // group, pid<0 signals a process group — both are unsafe here).
764                            let pid_t = match i32::try_from(entry.pid) {
765                                Ok(p) if p > 0 => p,
766                                Ok(_) => {
767                                    errors.push(format!(
768                                        "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
769                                        entry.sid, entry.pid
770                                    ));
771                                    reg.remove(&entry.sid);
772                                    continue;
773                                }
774                                Err(_) => {
775                                    errors.push(format!(
776                                        "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
777                                        entry.sid, entry.pid
778                                    ));
779                                    // Remove the entry anyway (PID is invalid, worker is unreachable).
780                                    reg.remove(&entry.sid);
781                                    continue;
782                                }
783                            };
784
785                            // SAFETY: libc::kill(pid, SIGTERM) is a thin syscall wrapper.
786                            // pid_t > 0, verified by the match arm above.
787                            // pid fits in i32 (verified above).
788                            let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
789                            if ret == 0 {
790                                stopped.push(entry.sid.clone());
791                            } else {
792                                let os_err = std::io::Error::last_os_error();
793                                if os_err.raw_os_error() == Some(libc::ESRCH) {
794                                    // Process already dead — treat as stopped (idempotent).
795                                    stopped.push(entry.sid.clone());
796                                } else {
797                                    errors.push(format!(
798                                        "sid={}: SIGTERM failed: {}",
799                                        entry.sid, os_err
800                                    ));
801                                }
802                            }
803                        }
804                        #[cfg(not(unix))]
805                        {
806                            // Non-Unix: cannot send SIGTERM; report as unsupported.
807                            errors.push(format!(
808                                "sid={}: SIGTERM not supported on this platform",
809                                entry.sid
810                            ));
811                        }
812                        // Remove from registry regardless of SIGTERM result
813                        // (dead or dying, we no longer track it).
814                        reg.remove(&entry.sid);
815                    }
816
817                    // Persist updated registry (entries removed).
818                    reg.save(&reg_path)?;
819
820                    Ok((stopped, errors))
821                })
822            },
823        )
824        .await
825        .map_err(|e| format!("pool_stop: task panicked: {e}"))?
826        .map_err(|e| e.to_string())?;
827
828        let (stopped, errors) = result;
829        serde_json::to_string(&serde_json::json!({
830            "stopped": stopped,
831            "errors": errors,
832        }))
833        .map_err(|e| format!("pool_stop: serialize: {e}"))
834    }
835}
836
837// ─── Tests ────────────────────────────────────────────────────────────────────
838
839#[cfg(test)]
840mod tests {
841    use super::super::test_support::make_app_service_at;
842
843    /// pool_stop_impl rejects pid=0 without delivering SIGTERM.
844    ///
845    /// A registry.json containing `"pid": 0` must be handled as an invalid
846    /// POSIX target: the error is surfaced in the `errors` array, the entry is
847    /// removed from the on-disk registry, and the test process itself survives
848    /// (proving no SIGTERM was sent to the process group).
849    #[tokio::test]
850    #[cfg(unix)]
851    async fn pool_stop_pid_zero_is_rejected() {
852        // Arrange: build an AppService rooted at a tempdir so no real $HOME is
853        // touched, then seed registry.json with a single pid=0 entry.
854        let tmp = tempfile::tempdir().expect("tempdir");
855        let root = tmp.path().to_path_buf();
856        let svc = make_app_service_at(root.clone()).await;
857
858        // The pool registry lives at {app_dir}/state/pool/registry.json.
859        // AppDir::state_dir() resolves to {root}/state.
860        let pool_reg_path = root.join("state").join("pool").join("registry.json");
861        std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
862
863        let seeded = serde_json::json!({
864            "sessions": [{
865                "sid": "zero-pid-session",
866                "pid": 0u32,
867                "sock": "/tmp/alc-pool/zero.sock",
868                "version": "0.30.0",
869                "created_at": "2026-01-01T00:00:00Z"
870            }]
871        });
872        std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
873
874        // Act: stop all sessions.
875        let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
876        let result: serde_json::Value =
877            serde_json::from_str(&json_str).expect("response is valid JSON");
878
879        // Assert (1): the error message contains "not a valid POSIX target pid".
880        let errors = result["errors"].as_array().expect("errors array");
881        assert!(
882            !errors.is_empty(),
883            "expected at least one error for pid=0 entry"
884        );
885        let err_msg = errors[0].as_str().unwrap_or("");
886        assert!(
887            err_msg.contains("not a valid POSIX target pid"),
888            "unexpected error message: {err_msg}"
889        );
890
891        // Assert (2): stopped array is empty (no process was stopped).
892        let stopped = result["stopped"].as_array().expect("stopped array");
893        assert!(
894            stopped.is_empty(),
895            "pid=0 entry must not appear in stopped list"
896        );
897
898        // Assert (3): the entry is removed from the on-disk registry.
899        let on_disk: serde_json::Value =
900            serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
901                .expect("parse registry");
902        let sessions = on_disk["sessions"].as_array().expect("sessions array");
903        assert!(
904            sessions.is_empty(),
905            "pid=0 entry must be removed from on-disk registry"
906        );
907
908        // Assert (4): test process is still alive — trivially confirmed by
909        // reaching this line without being killed by SIGTERM.
910    }
911}