Skip to main content

algocline_app/service/
engine_api_impl.rs

1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11/// Delegates each [`EngineApi`] method to the corresponding `AppService`
12/// inherent method via fully-qualified syntax (`AppService::method(self, …)`).
13///
14/// This avoids ambiguity between the trait method and the inherent method
15/// of the same name, preventing accidental infinite recursion if the
16/// inherent method is ever removed or renamed.
17#[async_trait]
18impl EngineApi for AppService {
19    // ─── Core execution ──────────────────────────────────────
20
21    async fn run(
22        &self,
23        code: Option<String>,
24        code_file: Option<String>,
25        ctx: Option<serde_json::Value>,
26        project_root: Option<String>,
27        host_mode: Option<bool>,
28    ) -> Result<String, String> {
29        AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30    }
31
32    async fn advice(
33        &self,
34        strategy: &str,
35        task: Option<String>,
36        opts: Option<serde_json::Value>,
37        project_root: Option<String>,
38    ) -> Result<String, String> {
39        AppService::advice(self, strategy, task, opts, project_root).await
40    }
41
42    async fn continue_single(
43        &self,
44        session_id: &str,
45        response: String,
46        query_id: Option<&str>,
47        usage: Option<algocline_core::TokenUsage>,
48    ) -> Result<String, String> {
49        AppService::continue_single(self, session_id, response, query_id, usage).await
50    }
51
52    async fn continue_batch(
53        &self,
54        session_id: &str,
55        responses: Vec<QueryResponse>,
56    ) -> Result<String, String> {
57        AppService::continue_batch(self, session_id, responses).await
58    }
59
60    // ─── Session status ──────────────────────────────────────
61
62    async fn status(
63        &self,
64        session_id: Option<&str>,
65        pending_filter: Option<serde_json::Value>,
66        include_history: bool,
67    ) -> Result<String, String> {
68        AppService::status(self, session_id, pending_filter, include_history).await
69    }
70
71    // ─── Evaluation ──────────────────────────────────────────
72
73    async fn eval(
74        &self,
75        scenario: Option<String>,
76        scenario_file: Option<String>,
77        scenario_name: Option<String>,
78        strategy: &str,
79        strategy_opts: Option<serde_json::Value>,
80        auto_card: bool,
81    ) -> Result<String, String> {
82        AppService::eval(
83            self,
84            scenario,
85            scenario_file,
86            scenario_name,
87            strategy,
88            strategy_opts,
89            auto_card,
90        )
91        .await
92    }
93
94    async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95        AppService::eval_history(self, strategy, limit)
96    }
97
98    async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99        AppService::eval_detail(self, eval_id)
100    }
101
102    async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103        AppService::eval_compare(self, eval_id_a, eval_id_b).await
104    }
105
106    // ─── Scenarios ───────────────────────────────────────────
107
108    async fn scenario_list(&self) -> Result<String, String> {
109        AppService::scenario_list(self)
110    }
111
112    async fn scenario_show(&self, name: &str) -> Result<String, String> {
113        AppService::scenario_show(self, name)
114    }
115
116    async fn scenario_install(&self, url: String) -> Result<String, String> {
117        AppService::scenario_install(self, url).await
118    }
119
120    // ─── Packages ────────────────────────────────────────────
121
122    async fn pkg_link(
123        &self,
124        path: String,
125        name: Option<String>,
126        force: Option<bool>,
127        scope: Option<String>,
128        project_root: Option<String>,
129    ) -> Result<String, String> {
130        AppService::pkg_link(self, path, name, force, scope, project_root).await
131    }
132
133    async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134        AppService::pkg_unlink(self, name).await
135    }
136
137    #[allow(clippy::too_many_arguments)]
138    async fn pkg_list(
139        &self,
140        project_root: Option<String>,
141        limit: Option<i32>,
142        sort: Option<String>,
143        filter: Option<serde_json::Value>,
144        fields: Option<Vec<String>>,
145        verbose: Option<String>,
146    ) -> Result<String, String> {
147        // `filter` is a free-form JSON Value at the MCP boundary (so the
148        // trait stays core-crate-pure). If the caller sends something
149        // that is not a JSON object we treat it as "no filter" and log
150        // the drop so operators can diagnose unexpected filter shapes
151        // in production.
152        let filter_map = match filter {
153            None => None,
154            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155                Ok(map) => Some(map),
156                Err(e) => {
157                    tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158                    None
159                }
160            },
161        };
162
163        // Negative limit values from MCP callers are clamped to 0 rather
164        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
165        // Downstream semantics: `Some(0)` means "no limit" (return all) —
166        // the truncate path in `AppService::pkg_list` short-circuits on 0.
167        let opts = ListOpts {
168            limit: limit.map(|n| n.max(0) as usize),
169            sort,
170            filter: filter_map,
171            fields,
172            verbose,
173        };
174
175        AppService::pkg_list(self, project_root, opts)
176            .await
177            .map_err(|e| e.to_string())
178    }
179
180    async fn pkg_install(&self, url: String, name: Option<String>) -> Result<String, String> {
181        AppService::pkg_install(self, url, name).await
182    }
183
184    async fn pkg_remove(
185        &self,
186        name: &str,
187        project_root: Option<String>,
188        version: Option<String>,
189        scope: Option<String>,
190    ) -> Result<String, String> {
191        AppService::pkg_remove(self, name, project_root, version, scope).await
192    }
193
194    async fn pkg_repair(
195        &self,
196        name: Option<String>,
197        project_root: Option<String>,
198    ) -> Result<String, String> {
199        AppService::pkg_repair(self, name, project_root).await
200    }
201
202    async fn pkg_doctor(
203        &self,
204        name: Option<String>,
205        project_root: Option<String>,
206    ) -> Result<String, String> {
207        AppService::pkg_doctor(self, name, project_root).await
208    }
209
210    // ─── Logging ─────────────────────────────────────────────
211
212    async fn add_note(
213        &self,
214        session_id: &str,
215        content: &str,
216        title: Option<&str>,
217    ) -> Result<String, String> {
218        AppService::add_note(self, session_id, content, title).await
219    }
220
221    async fn log_view(
222        &self,
223        session_id: Option<&str>,
224        limit: Option<usize>,
225        max_chars: Option<usize>,
226    ) -> Result<String, String> {
227        AppService::log_view(self, session_id, limit, max_chars).await
228    }
229
230    async fn stats(
231        &self,
232        strategy_filter: Option<&str>,
233        days: Option<u64>,
234    ) -> Result<String, String> {
235        AppService::stats(self, strategy_filter, days)
236    }
237
238    // ─── Project lifecycle ────────────────────────────────────
239
240    async fn init(&self, project_root: Option<String>) -> Result<String, String> {
241        AppService::init(self, project_root).await
242    }
243
244    async fn update(&self, project_root: Option<String>) -> Result<String, String> {
245        AppService::update(self, project_root).await
246    }
247
248    async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
249        AppService::migrate(self, project_root).await
250    }
251
252    // ─── Cards ───────────────────────────────────────────────
253
254    async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
255        AppService::card_list(self, pkg.as_deref())
256    }
257
258    async fn card_get(&self, card_id: &str) -> Result<String, String> {
259        AppService::card_get(self, card_id)
260    }
261
262    async fn card_find(
263        &self,
264        pkg: Option<String>,
265        where_: Option<serde_json::Value>,
266        order_by: Option<serde_json::Value>,
267        limit: Option<usize>,
268        offset: Option<usize>,
269    ) -> Result<String, String> {
270        AppService::card_find(self, pkg, where_, order_by, limit, offset)
271    }
272
273    async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
274        AppService::card_alias_list(self, pkg.as_deref())
275    }
276
277    async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
278        AppService::card_get_by_alias(self, name)
279    }
280
281    async fn card_alias_set(
282        &self,
283        name: &str,
284        card_id: &str,
285        pkg: Option<String>,
286        note: Option<String>,
287    ) -> Result<String, String> {
288        AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
289    }
290
291    async fn card_append(
292        &self,
293        card_id: &str,
294        fields: serde_json::Value,
295    ) -> Result<String, String> {
296        AppService::card_append(self, card_id, fields)
297    }
298
299    async fn card_install(&self, url: String) -> Result<String, String> {
300        AppService::card_install(self, url).await
301    }
302
303    async fn card_samples(
304        &self,
305        card_id: &str,
306        offset: Option<usize>,
307        limit: Option<usize>,
308        where_: Option<serde_json::Value>,
309    ) -> Result<String, String> {
310        AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
311    }
312
313    async fn card_lineage(
314        &self,
315        card_id: &str,
316        direction: Option<String>,
317        depth: Option<usize>,
318        include_stats: Option<bool>,
319        relation_filter: Option<Vec<String>>,
320    ) -> Result<String, String> {
321        AppService::card_lineage(
322            self,
323            card_id,
324            direction.as_deref(),
325            depth,
326            include_stats,
327            relation_filter,
328        )
329    }
330
331    async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
332        AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
333    }
334
335    // ─── Hub ─────────────────────────────────────────────────
336
337    async fn hub_reindex(
338        &self,
339        output_path: Option<String>,
340        source_dir: Option<String>,
341    ) -> Result<String, String> {
342        let svc = self.clone();
343        tokio::task::spawn_blocking(move || {
344            AppService::hub_reindex(&svc, output_path.as_deref(), source_dir.as_deref())
345        })
346        .await
347        .map_err(|e| format!("hub_reindex task panicked: {e}"))?
348    }
349
350    async fn hub_gendoc(
351        &self,
352        source_dir: String,
353        out_dir: Option<String>,
354        projections: Option<Vec<String>>,
355        config_path: Option<String>,
356        lint_strict: Option<bool>,
357    ) -> Result<String, String> {
358        let svc = self.clone();
359        tokio::task::spawn_blocking(move || {
360            crate::AppService::hub_gendoc(
361                &svc,
362                &source_dir,
363                out_dir.as_deref(),
364                projections.as_deref(),
365                config_path.as_deref(),
366                lint_strict,
367            )
368        })
369        .await
370        .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
371    }
372
373    async fn hub_dist(
374        &self,
375        source_dir: String,
376        output_path: Option<String>,
377        out_dir: Option<String>,
378        preset: Option<String>,
379        project_root: Option<String>,
380        projections: Option<Vec<String>>,
381        config_path: Option<String>,
382        lint_strict: Option<bool>,
383    ) -> Result<String, String> {
384        let svc = self.clone();
385        tokio::task::spawn_blocking(move || {
386            AppService::hub_dist(
387                &svc,
388                &source_dir,
389                output_path.as_deref(),
390                out_dir.as_deref(),
391                preset.as_deref(),
392                project_root.as_deref(),
393                projections.as_deref(),
394                config_path.as_deref(),
395                lint_strict,
396            )
397        })
398        .await
399        .map_err(|e| format!("hub_dist task panicked: {e}"))?
400    }
401
402    async fn hub_info(&self, pkg: String) -> Result<String, String> {
403        let svc = self.clone();
404        tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
405            .await
406            .map_err(|e| format!("hub_info task panicked: {e}"))?
407    }
408
409    #[allow(clippy::too_many_arguments)]
410    async fn hub_search(
411        &self,
412        query: Option<String>,
413        category: Option<String>,
414        installed_only: Option<bool>,
415        limit: Option<i32>,
416        sort: Option<String>,
417        filter: Option<serde_json::Value>,
418        fields: Option<Vec<String>>,
419        verbose: Option<String>,
420    ) -> Result<String, String> {
421        let svc = self.clone();
422
423        // `filter` is a free-form JSON Value at the MCP boundary (so the
424        // trait stays core-crate-pure). If the caller sends something
425        // that is not a JSON object we treat it as "no filter" — the
426        // explicit category/installed_only params still cover the common
427        // cases. The MCP `JsonSchema` layer will have already flagged
428        // hard type errors. We log the drop so operators can diagnose
429        // unexpected filter shapes in production.
430        let filter_map = match filter {
431            None => None,
432            Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
433                Ok(map) => Some(map),
434                Err(e) => {
435                    tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
436                    None
437                }
438            },
439        };
440
441        // Negative limit values from MCP callers are clamped to 0 rather
442        // than wrapping to a huge usize (unchecked-user-bound-input pattern).
443        // Downstream semantics: `Some(0)` means "no limit" (return all) —
444        // the truncate path in `AppService::hub_search` short-circuits on 0.
445        let opts = ListOpts {
446            limit: limit.map(|n| n.max(0) as usize),
447            sort,
448            filter: filter_map,
449            fields,
450            verbose,
451        };
452
453        tokio::task::spawn_blocking(move || {
454            AppService::hub_search(
455                &svc,
456                query.as_deref(),
457                category.as_deref(),
458                installed_only,
459                opts,
460            )
461        })
462        .await
463        .map_err(|e| format!("hub_search task panicked: {e}"))?
464    }
465
466    // ─── Package read ─────────────────────────────────────────
467
468    async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
469        AppService::pkg_read_init_lua(self, name, None)
470    }
471
472    async fn pkg_meta(&self, name: &str) -> Result<String, String> {
473        let filter = serde_json::json!({ "name": name });
474        let json_str = EngineApi::pkg_list(
475            self,
476            None,
477            None,
478            None,
479            Some(filter),
480            None,
481            Some("full".to_string()),
482        )
483        .await?;
484        let val: serde_json::Value = serde_json::from_str(&json_str)
485            .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
486        let pkgs = val
487            .get("packages")
488            .and_then(|p| p.as_array())
489            .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
490        if pkgs.is_empty() {
491            return Err(format!("pkg not found: {name}"));
492        }
493        serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
494    }
495
496    // ─── Package scaffold ─────────────────────────────────────
497
498    async fn pkg_scaffold(
499        &self,
500        name: String,
501        target_dir: Option<String>,
502        category: Option<String>,
503        description: Option<String>,
504    ) -> Result<String, String> {
505        let svc = self.clone();
506        tokio::task::spawn_blocking(move || {
507            AppService::pkg_scaffold(
508                &svc,
509                &name,
510                target_dir.as_deref(),
511                category.as_deref(),
512                description.as_deref(),
513            )
514        })
515        .await
516        .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
517    }
518
519    // ─── Hub resources ───────────────────────────────────────
520
521    /// Aggregate hub index across all registered cache sources.
522    ///
523    /// Delegates to `AppService::aggregate_index`, then serializes the
524    /// result to a JSON string. Individual source failures and registry-load
525    /// failures are embedded in the response JSON under a `"warnings"` field
526    /// so the MCP caller can observe partial failures without losing the
527    /// aggregate result.
528    async fn hub_index_aggregate(&self) -> Result<String, String> {
529        let svc = self.clone();
530        let (index, warnings) = tokio::task::spawn_blocking(move || {
531            AppService::aggregate_index(&svc).map_err(|e| e.to_string())
532        })
533        .await
534        .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
535
536        let mut json = serde_json::to_value(&index)
537            .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
538        if !warnings.is_empty() {
539            if let Some(obj) = json.as_object_mut() {
540                obj.insert("warnings".to_string(), serde_json::json!(warnings));
541            }
542        }
543        serde_json::to_string(&json)
544            .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
545    }
546
547    // ─── Diagnostics ─────────────────────────────────────────
548
549    async fn info(&self) -> String {
550        AppService::info(self)
551    }
552
553    // ─── Pool management ─────────────────────────────────────
554
555    async fn pool_ensure(&self) -> Result<String, String> {
556        AppService::pool_ensure_impl(self).await
557    }
558
559    async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
560        AppService::pool_status_impl(self, sid).await
561    }
562
563    async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
564        AppService::pool_stop_impl(self, sid).await
565    }
566}
567
568// ─── Pool inherent helpers ────────────────────────────────────────────────────
569
570impl AppService {
571    /// Scan registry.json, GC dead workers, and return live sessions.
572    ///
573    /// Idempotent: calling twice produces the same result when no workers change
574    /// state between calls.  Does NOT spawn new workers.
575    pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
576        let reg_path = self.pool_reg_path.clone();
577        let lock_path = self.pool_lock_path.clone();
578
579        let sessions =
580            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
581                with_registry_lock(&lock_path, || {
582                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
583                    let survivors = reg.scan_and_gc()?;
584                    // Persist GC result back to disk.
585                    reg.save(&reg_path)?;
586                    let entries = survivors
587                        .iter()
588                        .map(|e| {
589                            serde_json::json!({
590                                "sid": e.sid,
591                                "pid": e.pid,
592                                "sock": e.sock.to_string_lossy(),
593                                "version": e.version,
594                            })
595                        })
596                        .collect::<Vec<_>>();
597                    Ok(entries)
598                })
599            })
600            .await
601            .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
602            .map_err(|e| e.to_string())?;
603
604        let pool_version = env!("CARGO_PKG_VERSION");
605        serde_json::to_string(&serde_json::json!({
606            "sessions": sessions,
607            "pool_version": pool_version,
608        }))
609        .map_err(|e| format!("pool_ensure: serialize: {e}"))
610    }
611
612    /// Return pool worker status from registry.json.
613    ///
614    /// When `sid` is `Some`, restricts output to that single worker.
615    /// Uses kill -0 liveness check for each returned entry.
616    pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
617        let reg_path = self.pool_reg_path.clone();
618        let lock_path = self.pool_lock_path.clone();
619
620        let sessions =
621            tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
622                with_registry_lock(&lock_path, || {
623                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
624                    // GC dead entries in-place so status reflects reality.
625                    let _ = reg.scan_and_gc()?;
626                    reg.save(&reg_path)?;
627
628                    let entries: Vec<serde_json::Value> = reg
629                        .sessions
630                        .iter()
631                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
632                        .map(|e| {
633                            serde_json::json!({
634                                "sid": e.sid,
635                                "pid": e.pid,
636                                "sock": e.sock.to_string_lossy(),
637                                "version": e.version,
638                                "created_at": e.created_at,
639                                // Status is "running" for all live entries (UDS ping not required in POC).
640                                "status": "running",
641                            })
642                        })
643                        .collect();
644                    Ok(entries)
645                })
646            })
647            .await
648            .map_err(|e| format!("pool_status: task panicked: {e}"))?
649            .map_err(|e| e.to_string())?;
650
651        let pool_version = env!("CARGO_PKG_VERSION");
652        serde_json::to_string(&serde_json::json!({
653            "sessions": sessions,
654            "pool_version": pool_version,
655        }))
656        .map_err(|e| format!("pool_status: serialize: {e}"))
657    }
658
659    /// Send SIGTERM to all workers or a single worker identified by `sid`.
660    ///
661    /// After SIGTERM, removes the entry from registry.json.
662    /// Returns `{"stopped": [...], "errors": [...]}`.
663    /// SIGTERM send failures are surfaced in the `errors` array (not dropped silently).
664    pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
665        let reg_path = self.pool_reg_path.clone();
666        let lock_path = self.pool_lock_path.clone();
667
668        let result = tokio::task::spawn_blocking(
669            move || -> Result<(Vec<String>, Vec<String>), PoolError> {
670                with_registry_lock(&lock_path, || {
671                    let mut reg = PoolRegistry::load_or_default(&reg_path)?;
672
673                    // Determine targets.
674                    let targets: Vec<_> = reg
675                        .sessions
676                        .iter()
677                        .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
678                        .cloned()
679                        .collect();
680
681                    let mut stopped: Vec<String> = Vec::new();
682                    let mut errors: Vec<String> = Vec::new();
683
684                    for entry in &targets {
685                        #[cfg(unix)]
686                        {
687                            // K-52: guard u32 → i32 (pid_t) overflow; also reject pid == 0
688                            // (POSIX kill(2): pid=0 signals every process in the calling process
689                            // group, pid<0 signals a process group — both are unsafe here).
690                            let pid_t = match i32::try_from(entry.pid) {
691                                Ok(p) if p > 0 => p,
692                                Ok(_) => {
693                                    errors.push(format!(
694                                        "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
695                                        entry.sid, entry.pid
696                                    ));
697                                    reg.remove(&entry.sid);
698                                    continue;
699                                }
700                                Err(_) => {
701                                    errors.push(format!(
702                                        "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
703                                        entry.sid, entry.pid
704                                    ));
705                                    // Remove the entry anyway (PID is invalid, worker is unreachable).
706                                    reg.remove(&entry.sid);
707                                    continue;
708                                }
709                            };
710
711                            // SAFETY: libc::kill(pid, SIGTERM) is a thin syscall wrapper.
712                            // pid_t > 0, verified by the match arm above.
713                            // pid fits in i32 (verified above).
714                            let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
715                            if ret == 0 {
716                                stopped.push(entry.sid.clone());
717                            } else {
718                                let os_err = std::io::Error::last_os_error();
719                                if os_err.raw_os_error() == Some(libc::ESRCH) {
720                                    // Process already dead — treat as stopped (idempotent).
721                                    stopped.push(entry.sid.clone());
722                                } else {
723                                    errors.push(format!(
724                                        "sid={}: SIGTERM failed: {}",
725                                        entry.sid, os_err
726                                    ));
727                                }
728                            }
729                        }
730                        #[cfg(not(unix))]
731                        {
732                            // Non-Unix: cannot send SIGTERM; report as unsupported.
733                            errors.push(format!(
734                                "sid={}: SIGTERM not supported on this platform",
735                                entry.sid
736                            ));
737                        }
738                        // Remove from registry regardless of SIGTERM result
739                        // (dead or dying, we no longer track it).
740                        reg.remove(&entry.sid);
741                    }
742
743                    // Persist updated registry (entries removed).
744                    reg.save(&reg_path)?;
745
746                    Ok((stopped, errors))
747                })
748            },
749        )
750        .await
751        .map_err(|e| format!("pool_stop: task panicked: {e}"))?
752        .map_err(|e| e.to_string())?;
753
754        let (stopped, errors) = result;
755        serde_json::to_string(&serde_json::json!({
756            "stopped": stopped,
757            "errors": errors,
758        }))
759        .map_err(|e| format!("pool_stop: serialize: {e}"))
760    }
761}
762
763// ─── Tests ────────────────────────────────────────────────────────────────────
764
765#[cfg(test)]
766mod tests {
767    use super::super::test_support::make_app_service_at;
768
769    /// pool_stop_impl rejects pid=0 without delivering SIGTERM.
770    ///
771    /// A registry.json containing `"pid": 0` must be handled as an invalid
772    /// POSIX target: the error is surfaced in the `errors` array, the entry is
773    /// removed from the on-disk registry, and the test process itself survives
774    /// (proving no SIGTERM was sent to the process group).
775    #[tokio::test]
776    #[cfg(unix)]
777    async fn pool_stop_pid_zero_is_rejected() {
778        // Arrange: build an AppService rooted at a tempdir so no real $HOME is
779        // touched, then seed registry.json with a single pid=0 entry.
780        let tmp = tempfile::tempdir().expect("tempdir");
781        let root = tmp.path().to_path_buf();
782        let svc = make_app_service_at(root.clone()).await;
783
784        // The pool registry lives at {app_dir}/state/pool/registry.json.
785        // AppDir::state_dir() resolves to {root}/state.
786        let pool_reg_path = root.join("state").join("pool").join("registry.json");
787        std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
788
789        let seeded = serde_json::json!({
790            "sessions": [{
791                "sid": "zero-pid-session",
792                "pid": 0u32,
793                "sock": "/tmp/alc-pool/zero.sock",
794                "version": "0.30.0",
795                "created_at": "2026-01-01T00:00:00Z"
796            }]
797        });
798        std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
799
800        // Act: stop all sessions.
801        let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
802        let result: serde_json::Value =
803            serde_json::from_str(&json_str).expect("response is valid JSON");
804
805        // Assert (1): the error message contains "not a valid POSIX target pid".
806        let errors = result["errors"].as_array().expect("errors array");
807        assert!(
808            !errors.is_empty(),
809            "expected at least one error for pid=0 entry"
810        );
811        let err_msg = errors[0].as_str().unwrap_or("");
812        assert!(
813            err_msg.contains("not a valid POSIX target pid"),
814            "unexpected error message: {err_msg}"
815        );
816
817        // Assert (2): stopped array is empty (no process was stopped).
818        let stopped = result["stopped"].as_array().expect("stopped array");
819        assert!(
820            stopped.is_empty(),
821            "pid=0 entry must not appear in stopped list"
822        );
823
824        // Assert (3): the entry is removed from the on-disk registry.
825        let on_disk: serde_json::Value =
826            serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
827                .expect("parse registry");
828        let sessions = on_disk["sessions"].as_array().expect("sessions array");
829        assert!(
830            sessions.is_empty(),
831            "pid=0 entry must be removed from on-disk registry"
832        );
833
834        // Assert (4): test process is still alive — trivially confirmed by
835        // reaching this line without being killed by SIGTERM.
836    }
837}