Skip to main content

algocline_app/service/
engine_api_impl.rs

1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11/// Delegates each [`EngineApi`] method to the corresponding `AppService`
12/// inherent method via fully-qualified syntax (`AppService::method(self, …)`).
13///
14/// This avoids ambiguity between the trait method and the inherent method
15/// of the same name, preventing accidental infinite recursion if the
16/// inherent method is ever removed or renamed.
17#[async_trait]
18impl EngineApi for AppService {
19    // ─── Core execution ──────────────────────────────────────
20
21    async fn run(
22        &self,
23        code: Option<String>,
24        code_file: Option<String>,
25        ctx: Option<serde_json::Value>,
26        project_root: Option<String>,
27        host_mode: Option<bool>,
28    ) -> Result<String, String> {
29        AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30    }
31
32    async fn advice(
33        &self,
34        strategy: &str,
35        task: Option<String>,
36        opts: Option<serde_json::Value>,
37        project_root: Option<String>,
38    ) -> Result<String, String> {
39        AppService::advice(self, strategy, task, opts, project_root).await
40    }
41
42    async fn continue_single(
43        &self,
44        session_id: &str,
45        response: String,
46        query_id: Option<&str>,
47        usage: Option<algocline_core::TokenUsage>,
48    ) -> Result<String, String> {
49        AppService::continue_single(self, session_id, response, query_id, usage).await
50    }
51
52    async fn continue_batch(
53        &self,
54        session_id: &str,
55        responses: Vec<QueryResponse>,
56    ) -> Result<String, String> {
57        AppService::continue_batch(self, session_id, responses).await
58    }
59
60    // ─── Session status ──────────────────────────────────────
61
62    async fn status(
63        &self,
64        session_id: Option<&str>,
65        pending_filter: Option<serde_json::Value>,
66        include_history: bool,
67    ) -> Result<String, String> {
68        AppService::status(self, session_id, pending_filter, include_history).await
69    }
70
71    // ─── Evaluation ──────────────────────────────────────────
72
73    async fn eval(
74        &self,
75        scenario: Option<String>,
76        scenario_file: Option<String>,
77        scenario_name: Option<String>,
78        strategy: &str,
79        strategy_opts: Option<serde_json::Value>,
80        auto_card: bool,
81    ) -> Result<String, String> {
82        AppService::eval(
83            self,
84            scenario,
85            scenario_file,
86            scenario_name,
87            strategy,
88            strategy_opts,
89            auto_card,
90        )
91        .await
92    }
93
94    async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95        AppService::eval_history(self, strategy, limit)
96    }
97
98    async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99        AppService::eval_detail(self, eval_id)
100    }
101
102    async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103        AppService::eval_compare(self, eval_id_a, eval_id_b).await
104    }
105
106    // ─── Scenarios ───────────────────────────────────────────
107
108    async fn scenario_list(&self) -> Result<String, String> {
109        AppService::scenario_list(self)
110    }
111
112    async fn scenario_show(&self, name: &str) -> Result<String, String> {
113        AppService::scenario_show(self, name)
114    }
115
116    async fn scenario_install(&self, url: String) -> Result<String, String> {
117        AppService::scenario_install(self, url).await
118    }
119
120    // ─── Packages ────────────────────────────────────────────
121
122    async fn pkg_link(
123        &self,
124        path: String,
125        name: Option<String>,
126        force: Option<bool>,
127        scope: Option<String>,
128        project_root: Option<String>,
129    ) -> Result<String, String> {
130        AppService::pkg_link(self, path, name, force, scope, project_root).await
131    }
132
133    async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134        AppService::pkg_unlink(self, name).await
135    }
136
137    #[allow(clippy::too_many_arguments)]
138    async fn pkg_list(
139        &self,
140        project_root: Option<String>,
141        limit: Option<i32>,
142        sort: Option<String>,
143        filter: Option<serde_json::Value>,
144        fields: Option<Vec<String>>,
145        verbose: Option<String>,
146    ) -> Result<String, String> {
147        // `filter` is a free-form JSON Value at the MCP boundary (so the
148        // trait stays core-crate-pure). If the caller sends something
149        // that is not a JSON object we treat it as "no filter" and log
150        // the drop so operators can diagnose unexpected filter shapes
151        // in production.
152        let filter_map = match filter {
153            None => None,
154            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155                Ok(map) => Some(map),
156                Err(e) => {
157                    tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158                    None
159                }
160            },
161        };
162
163        // Negative limit values from MCP callers are clamped to 0 rather
164        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
165        // Downstream semantics: `Some(0)` means "no limit" (return all) —
166        // the truncate path in `AppService::pkg_list` short-circuits on 0.
167        let opts = ListOpts {
168            limit: limit.map(|n| n.max(0) as usize),
169            sort,
170            filter: filter_map,
171            fields,
172            verbose,
173        };
174
175        AppService::pkg_list(self, project_root, opts)
176            .await
177            .map_err(|e| e.to_string())
178    }
179
180    async fn pkg_install(
181        &self,
182        url: String,
183        name: Option<String>,
184        force: Option<bool>,
185    ) -> Result<String, String> {
186        AppService::pkg_install(self, url, name, force).await
187    }
188
189    async fn pkg_remove(
190        &self,
191        name: &str,
192        project_root: Option<String>,
193        version: Option<String>,
194        scope: Option<String>,
195    ) -> Result<String, String> {
196        AppService::pkg_remove(self, name, project_root, version, scope).await
197    }
198
199    async fn pkg_repair(
200        &self,
201        name: Option<String>,
202        project_root: Option<String>,
203    ) -> Result<String, String> {
204        AppService::pkg_repair(self, name, project_root).await
205    }
206
207    async fn pkg_doctor(
208        &self,
209        name: Option<String>,
210        project_root: Option<String>,
211    ) -> Result<String, String> {
212        AppService::pkg_doctor(self, name, project_root).await
213    }
214
215    // ─── Logging ─────────────────────────────────────────────
216
217    async fn add_note(
218        &self,
219        session_id: &str,
220        content: &str,
221        title: Option<&str>,
222    ) -> Result<String, String> {
223        AppService::add_note(self, session_id, content, title).await
224    }
225
226    async fn log_view(
227        &self,
228        session_id: Option<&str>,
229        limit: Option<usize>,
230        max_chars: Option<usize>,
231    ) -> Result<String, String> {
232        AppService::log_view(self, session_id, limit, max_chars).await
233    }
234
235    async fn stats(
236        &self,
237        strategy_filter: Option<&str>,
238        days: Option<u64>,
239    ) -> Result<String, String> {
240        AppService::stats(self, strategy_filter, days)
241    }
242
243    // ─── Project lifecycle ────────────────────────────────────
244
245    async fn init(&self, project_root: Option<String>) -> Result<String, String> {
246        AppService::init(self, project_root).await
247    }
248
249    async fn update(&self, project_root: Option<String>) -> Result<String, String> {
250        AppService::update(self, project_root).await
251    }
252
253    async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
254        AppService::migrate(self, project_root).await
255    }
256
257    // ─── Cards ───────────────────────────────────────────────
258
259    async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
260        AppService::card_list(self, pkg.as_deref())
261    }
262
263    async fn card_get(&self, card_id: &str) -> Result<String, String> {
264        AppService::card_get(self, card_id)
265    }
266
267    async fn card_find(
268        &self,
269        pkg: Option<String>,
270        where_: Option<serde_json::Value>,
271        order_by: Option<serde_json::Value>,
272        limit: Option<usize>,
273        offset: Option<usize>,
274    ) -> Result<String, String> {
275        AppService::card_find(self, pkg, where_, order_by, limit, offset)
276    }
277
278    async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
279        AppService::card_alias_list(self, pkg.as_deref())
280    }
281
282    async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
283        AppService::card_get_by_alias(self, name)
284    }
285
286    async fn card_alias_set(
287        &self,
288        name: &str,
289        card_id: &str,
290        pkg: Option<String>,
291        note: Option<String>,
292    ) -> Result<String, String> {
293        AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
294    }
295
296    async fn card_append(
297        &self,
298        card_id: &str,
299        fields: serde_json::Value,
300    ) -> Result<String, String> {
301        AppService::card_append(self, card_id, fields)
302    }
303
304    async fn card_install(&self, url: String) -> Result<String, String> {
305        AppService::card_install(self, url).await
306    }
307
308    async fn card_samples(
309        &self,
310        card_id: &str,
311        offset: Option<usize>,
312        limit: Option<usize>,
313        where_: Option<serde_json::Value>,
314    ) -> Result<String, String> {
315        AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
316    }
317
318    async fn card_lineage(
319        &self,
320        card_id: &str,
321        direction: Option<String>,
322        depth: Option<usize>,
323        include_stats: Option<bool>,
324        relation_filter: Option<Vec<String>>,
325    ) -> Result<String, String> {
326        AppService::card_lineage(
327            self,
328            card_id,
329            direction.as_deref(),
330            depth,
331            include_stats,
332            relation_filter,
333        )
334    }
335
336    async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
337        AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
338    }
339
340    // ─── Hub ─────────────────────────────────────────────────
341
342    async fn hub_reindex(
343        &self,
344        output_path: Option<String>,
345        source_dir: Option<String>,
346    ) -> Result<String, String> {
347        let svc = self.clone();
348        tokio::task::spawn_blocking(move || {
349            AppService::hub_reindex(&svc, output_path.as_deref(), source_dir.as_deref())
350        })
351        .await
352        .map_err(|e| format!("hub_reindex task panicked: {e}"))?
353    }
354
355    async fn hub_gendoc(
356        &self,
357        source_dir: String,
358        out_dir: Option<String>,
359        projections: Option<Vec<String>>,
360        config_path: Option<String>,
361        lint_strict: Option<bool>,
362    ) -> Result<String, String> {
363        let svc = self.clone();
364        tokio::task::spawn_blocking(move || {
365            crate::AppService::hub_gendoc(
366                &svc,
367                &source_dir,
368                out_dir.as_deref(),
369                projections.as_deref(),
370                config_path.as_deref(),
371                lint_strict,
372            )
373        })
374        .await
375        .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
376    }
377
378    async fn hub_dist(
379        &self,
380        source_dir: String,
381        output_path: Option<String>,
382        out_dir: Option<String>,
383        preset: Option<String>,
384        project_root: Option<String>,
385        projections: Option<Vec<String>>,
386        config_path: Option<String>,
387        lint_strict: Option<bool>,
388    ) -> Result<String, String> {
389        let svc = self.clone();
390        tokio::task::spawn_blocking(move || {
391            AppService::hub_dist(
392                &svc,
393                &source_dir,
394                output_path.as_deref(),
395                out_dir.as_deref(),
396                preset.as_deref(),
397                project_root.as_deref(),
398                projections.as_deref(),
399                config_path.as_deref(),
400                lint_strict,
401            )
402        })
403        .await
404        .map_err(|e| format!("hub_dist task panicked: {e}"))?
405    }
406
407    async fn hub_info(&self, pkg: String) -> Result<String, String> {
408        let svc = self.clone();
409        tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
410            .await
411            .map_err(|e| format!("hub_info task panicked: {e}"))?
412    }
413
414    #[allow(clippy::too_many_arguments)]
415    async fn hub_search(
416        &self,
417        query: Option<String>,
418        category: Option<String>,
419        installed_only: Option<bool>,
420        limit: Option<i32>,
421        sort: Option<String>,
422        filter: Option<serde_json::Value>,
423        fields: Option<Vec<String>>,
424        verbose: Option<String>,
425    ) -> Result<String, String> {
426        let svc = self.clone();
427
428        // `filter` is a free-form JSON Value at the MCP boundary (so the
429        // trait stays core-crate-pure). If the caller sends something
430        // that is not a JSON object we treat it as "no filter" — the
431        // explicit category/installed_only params still cover the common
432        // cases. The MCP `JsonSchema` layer will have already flagged
433        // hard type errors. We log the drop so operators can diagnose
434        // unexpected filter shapes in production.
435        let filter_map = match filter {
436            None => None,
437            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
438                Ok(map) => Some(map),
439                Err(e) => {
440                    tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
441                    None
442                }
443            },
444        };
445
446        // Negative limit values from MCP callers are clamped to 0 rather
447        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
448        // Downstream semantics: `Some(0)` means "no limit" (return all) —
449        // the truncate path in `AppService::hub_search` short-circuits on 0.
450        let opts = ListOpts {
451            limit: limit.map(|n| n.max(0) as usize),
452            sort,
453            filter: filter_map,
454            fields,
455            verbose,
456        };
457
458        tokio::task::spawn_blocking(move || {
459            AppService::hub_search(
460                &svc,
461                query.as_deref(),
462                category.as_deref(),
463                installed_only,
464                opts,
465            )
466        })
467        .await
468        .map_err(|e| format!("hub_search task panicked: {e}"))?
469    }
470
471    // ─── Package read ─────────────────────────────────────────
472
473    async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
474        AppService::pkg_read_init_lua(self, name, None)
475    }
476
477    async fn pkg_meta(&self, name: &str) -> Result<String, String> {
478        let filter = serde_json::json!({ "name": name });
479        let json_str = EngineApi::pkg_list(
480            self,
481            None,
482            None,
483            None,
484            Some(filter),
485            None,
486            Some("full".to_string()),
487        )
488        .await?;
489        let val: serde_json::Value = serde_json::from_str(&json_str)
490            .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
491        let pkgs = val
492            .get("packages")
493            .and_then(|p| p.as_array())
494            .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
495        if pkgs.is_empty() {
496            return Err(format!("pkg not found: {name}"));
497        }
498        serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
499    }
500
501    // ─── Package scaffold ─────────────────────────────────────
502
503    async fn pkg_scaffold(
504        &self,
505        name: String,
506        target_dir: Option<String>,
507        category: Option<String>,
508        description: Option<String>,
509    ) -> Result<String, String> {
510        let svc = self.clone();
511        tokio::task::spawn_blocking(move || {
512            AppService::pkg_scaffold(
513                &svc,
514                &name,
515                target_dir.as_deref(),
516                category.as_deref(),
517                description.as_deref(),
518            )
519        })
520        .await
521        .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
522    }
523
524    // ─── Hub resources ───────────────────────────────────────
525
526    /// Aggregate hub index across all registered cache sources.
527    ///
528    /// Delegates to `AppService::aggregate_index`, then serializes the
529    /// result to a JSON string. Individual source failures and registry-load
530    /// failures are embedded in the response JSON under a `"warnings"` field
531    /// so the MCP caller can observe partial failures without losing the
532    /// aggregate result.
533    async fn hub_index_aggregate(&self) -> Result<String, String> {
534        let svc = self.clone();
535        let (index, warnings) = tokio::task::spawn_blocking(move || {
536            AppService::aggregate_index(&svc).map_err(|e| e.to_string())
537        })
538        .await
539        .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
540
541        let mut json = serde_json::to_value(&index)
542            .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
543        if !warnings.is_empty() {
544            if let Some(obj) = json.as_object_mut() {
545                obj.insert("warnings".to_string(), serde_json::json!(warnings));
546            }
547        }
548        serde_json::to_string(&json)
549            .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
550    }
551
552    // ─── Diagnostics ─────────────────────────────────────────
553
554    async fn info(&self) -> String {
555        AppService::info(self)
556    }
557
558    // ─── Pool management ─────────────────────────────────────
559
560    async fn pool_ensure(&self) -> Result<String, String> {
561        AppService::pool_ensure_impl(self).await
562    }
563
564    async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
565        AppService::pool_status_impl(self, sid).await
566    }
567
568    async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
569        AppService::pool_stop_impl(self, sid).await
570    }
571}
572
573// ─── Pool inherent helpers ────────────────────────────────────────────────────
574
575impl AppService {
576    /// Scan registry.json, GC dead workers, and return live sessions.
577    ///
578    /// Idempotent: calling twice produces the same result when no workers change
579    /// state between calls.  Does NOT spawn new workers.
580    pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
581        let reg_path = self.pool_reg_path.clone();
582        let lock_path = self.pool_lock_path.clone();
583
584        let sessions =
585            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
586                with_registry_lock(&lock_path, || {
587                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
588                    let survivors = reg.scan_and_gc()?;
589                    // Persist GC result back to disk.
590                    reg.save(&reg_path)?;
591                    let entries = survivors
592                        .iter()
593                        .map(|e| {
594                            serde_json::json!({
595                                "sid": e.sid,
596                                "pid": e.pid,
597                                "sock": e.sock.to_string_lossy(),
598                                "version": e.version,
599                            })
600                        })
601                        .collect::<Vec<_>>();
602                    Ok(entries)
603                })
604            })
605            .await
606            .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
607            .map_err(|e| e.to_string())?;
608
609        let pool_version = env!("CARGO_PKG_VERSION");
610        serde_json::to_string(&serde_json::json!({
611            "sessions": sessions,
612            "pool_version": pool_version,
613        }))
614        .map_err(|e| format!("pool_ensure: serialize: {e}"))
615    }
616
617    /// Return pool worker status from registry.json.
618    ///
619    /// When `sid` is `Some`, restricts output to that single worker.
620    /// Uses kill -0 liveness check for each returned entry.
621    pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
622        let reg_path = self.pool_reg_path.clone();
623        let lock_path = self.pool_lock_path.clone();
624
625        let sessions =
626            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
627                with_registry_lock(&lock_path, || {
628                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
629                    // GC dead entries in-place so status reflects reality.
630                    let _ = reg.scan_and_gc()?;
631                    reg.save(&reg_path)?;
632
633                    let entries: Vec<serde_json::Value> = reg
634                        .sessions
635                        .iter()
636                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
637                        .map(|e| {
638                            serde_json::json!({
639                                "sid": e.sid,
640                                "pid": e.pid,
641                                "sock": e.sock.to_string_lossy(),
642                                "version": e.version,
643                                "created_at": e.created_at,
644                                // Status is "running" for all live entries (UDS ping not required in POC).
645                                "status": "running",
646                            })
647                        })
648                        .collect();
649                    Ok(entries)
650                })
651            })
652            .await
653            .map_err(|e| format!("pool_status: task panicked: {e}"))?
654            .map_err(|e| e.to_string())?;
655
656        let pool_version = env!("CARGO_PKG_VERSION");
657        serde_json::to_string(&serde_json::json!({
658            "sessions": sessions,
659            "pool_version": pool_version,
660        }))
661        .map_err(|e| format!("pool_status: serialize: {e}"))
662    }
663
664    /// Send SIGTERM to all workers or a single worker identified by `sid`.
665    ///
666    /// After SIGTERM, removes the entry from registry.json.
667    /// Returns `{"stopped": [...], "errors": [...]}`.
668    /// SIGTERM send failures are surfaced in the `errors` array (not dropped silently).
669    pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
670        let reg_path = self.pool_reg_path.clone();
671        let lock_path = self.pool_lock_path.clone();
672
673        let result = tokio::task::spawn_blocking(
674            move || -> Result<(Vec<String>, Vec<String>), PoolError> {
675                with_registry_lock(&lock_path, || {
676                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
677
678                    // Determine targets.
679                    let targets: Vec<_> = reg
680                        .sessions
681                        .iter()
682                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
683                        .cloned()
684                        .collect();
685
686                    let mut stopped: Vec<String> = Vec::new();
687                    let mut errors: Vec<String> = Vec::new();
688
689                    for entry in &targets {
690                        #[cfg(unix)]
691                        {
692                            // K-52: guard u32 → i32 (pid_t) overflow; also reject pid == 0
693                            // (POSIX kill(2): pid=0 signals every process in the calling process
694                            // group, pid<0 signals a process group — both are unsafe here).
695                            let pid_t = match i32::try_from(entry.pid) {
696                                Ok(p) if p > 0 => p,
697                                Ok(_) => {
698                                    errors.push(format!(
699                                        "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
700                                        entry.sid, entry.pid
701                                    ));
702                                    reg.remove(&entry.sid);
703                                    continue;
704                                }
705                                Err(_) => {
706                                    errors.push(format!(
707                                        "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
708                                        entry.sid, entry.pid
709                                    ));
710                                    // Remove the entry anyway (PID is invalid, worker is unreachable).
711                                    reg.remove(&entry.sid);
712                                    continue;
713                                }
714                            };
715
716                            // SAFETY: libc::kill(pid, SIGTERM) is a thin syscall wrapper.
717                            // pid_t > 0, verified by the match arm above.
718                            // pid fits in i32 (verified above).
719                            let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
720                            if ret == 0 {
721                                stopped.push(entry.sid.clone());
722                            } else {
723                                let os_err = std::io::Error::last_os_error();
724                                if os_err.raw_os_error() == Some(libc::ESRCH) {
725                                    // Process already dead — treat as stopped (idempotent).
726                                    stopped.push(entry.sid.clone());
727                                } else {
728                                    errors.push(format!(
729                                        "sid={}: SIGTERM failed: {}",
730                                        entry.sid, os_err
731                                    ));
732                                }
733                            }
734                        }
735                        #[cfg(not(unix))]
736                        {
737                            // Non-Unix: cannot send SIGTERM; report as unsupported.
738                            errors.push(format!(
739                                "sid={}: SIGTERM not supported on this platform",
740                                entry.sid
741                            ));
742                        }
743                        // Remove from registry regardless of SIGTERM result
744                        // (dead or dying, we no longer track it).
745                        reg.remove(&entry.sid);
746                    }
747
748                    // Persist updated registry (entries removed).
749                    reg.save(&reg_path)?;
750
751                    Ok((stopped, errors))
752                })
753            },
754        )
755        .await
756        .map_err(|e| format!("pool_stop: task panicked: {e}"))?
757        .map_err(|e| e.to_string())?;
758
759        let (stopped, errors) = result;
760        serde_json::to_string(&serde_json::json!({
761            "stopped": stopped,
762            "errors": errors,
763        }))
764        .map_err(|e| format!("pool_stop: serialize: {e}"))
765    }
766}
767
768// ─── Tests ────────────────────────────────────────────────────────────────────
769
770#[cfg(test)]
771mod tests {
772    use super::super::test_support::make_app_service_at;
773
774    /// pool_stop_impl rejects pid=0 without delivering SIGTERM.
775    ///
776    /// A registry.json containing `"pid": 0` must be handled as an invalid
777    /// POSIX target: the error is surfaced in the `errors` array, the entry is
778    /// removed from the on-disk registry, and the test process itself survives
779    /// (proving no SIGTERM was sent to the process group).
780    #[tokio::test]
781    #[cfg(unix)]
782    async fn pool_stop_pid_zero_is_rejected() {
783        // Arrange: build an AppService rooted at a tempdir so no real $HOME is
784        // touched, then seed registry.json with a single pid=0 entry.
785        let tmp = tempfile::tempdir().expect("tempdir");
786        let root = tmp.path().to_path_buf();
787        let svc = make_app_service_at(root.clone()).await;
788
789        // The pool registry lives at {app_dir}/state/pool/registry.json.
790        // AppDir::state_dir() resolves to {root}/state.
791        let pool_reg_path = root.join("state").join("pool").join("registry.json");
792        std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
793
794        let seeded = serde_json::json!({
795            "sessions": [{
796                "sid": "zero-pid-session",
797                "pid": 0u32,
798                "sock": "/tmp/alc-pool/zero.sock",
799                "version": "0.30.0",
800                "created_at": "2026-01-01T00:00:00Z"
801            }]
802        });
803        std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
804
805        // Act: stop all sessions.
806        let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
807        let result: serde_json::Value =
808            serde_json::from_str(&json_str).expect("response is valid JSON");
809
810        // Assert (1): the error message contains "not a valid POSIX target pid".
811        let errors = result["errors"].as_array().expect("errors array");
812        assert!(
813            !errors.is_empty(),
814            "expected at least one error for pid=0 entry"
815        );
816        let err_msg = errors[0].as_str().unwrap_or("");
817        assert!(
818            err_msg.contains("not a valid POSIX target pid"),
819            "unexpected error message: {err_msg}"
820        );
821
822        // Assert (2): stopped array is empty (no process was stopped).
823        let stopped = result["stopped"].as_array().expect("stopped array");
824        assert!(
825            stopped.is_empty(),
826            "pid=0 entry must not appear in stopped list"
827        );
828
829        // Assert (3): the entry is removed from the on-disk registry.
830        let on_disk: serde_json::Value =
831            serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
832                .expect("parse registry");
833        let sessions = on_disk["sessions"].as_array().expect("sessions array");
834        assert!(
835            sessions.is_empty(),
836            "pid=0 entry must be removed from on-disk registry"
837        );
838
839        // Assert (4): test process is still alive — trivially confirmed by
840        // reaching this line without being killed by SIGTERM.
841    }
842}