1use std::collections::HashMap;
2use std::sync::Arc;
3
4use algocline_core::{EngineApi, QueryResponse};
5use algocline_engine::state::{ResetReport, StateError};
6use async_trait::async_trait;
7
8use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
9
10use super::list_opts::ListOpts;
11use super::AppService;
12
13#[async_trait]
20impl EngineApi for AppService {
21 async fn run(
24 &self,
25 code: Option<String>,
26 code_file: Option<String>,
27 ctx: Option<serde_json::Value>,
28 project_root: Option<String>,
29 host_mode: Option<bool>,
30 ) -> Result<String, String> {
31 AppService::run(self, code, code_file, ctx, project_root, host_mode).await
32 }
33
34 async fn advice(
35 &self,
36 strategy: &str,
37 task: Option<String>,
38 opts: Option<serde_json::Value>,
39 project_root: Option<String>,
40 ) -> Result<String, String> {
41 AppService::advice(self, strategy, task, opts, project_root).await
42 }
43
44 async fn continue_single(
45 &self,
46 session_id: &str,
47 response: String,
48 query_id: Option<&str>,
49 usage: Option<algocline_core::TokenUsage>,
50 ) -> Result<String, String> {
51 AppService::continue_single(self, session_id, response, query_id, usage).await
52 }
53
54 async fn continue_batch(
55 &self,
56 session_id: &str,
57 responses: Vec<QueryResponse>,
58 ) -> Result<String, String> {
59 AppService::continue_batch(self, session_id, responses).await
60 }
61
62 async fn status(
65 &self,
66 session_id: Option<&str>,
67 pending_filter: Option<serde_json::Value>,
68 include_history: bool,
69 ) -> Result<String, String> {
70 AppService::status(self, session_id, pending_filter, include_history).await
71 }
72
73 async fn eval(
76 &self,
77 scenario: Option<String>,
78 scenario_file: Option<String>,
79 scenario_name: Option<String>,
80 strategy: &str,
81 strategy_opts: Option<serde_json::Value>,
82 auto_card: bool,
83 ) -> Result<String, String> {
84 AppService::eval(
85 self,
86 scenario,
87 scenario_file,
88 scenario_name,
89 strategy,
90 strategy_opts,
91 auto_card,
92 )
93 .await
94 }
95
96 async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
97 AppService::eval_history(self, strategy, limit)
98 }
99
100 async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
101 AppService::eval_detail(self, eval_id)
102 }
103
104 async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
105 AppService::eval_compare(self, eval_id_a, eval_id_b).await
106 }
107
108 async fn scenario_list(&self) -> Result<String, String> {
111 AppService::scenario_list(self)
112 }
113
114 async fn scenario_show(&self, name: &str) -> Result<String, String> {
115 AppService::scenario_show(self, name)
116 }
117
118 async fn scenario_install(&self, url: String) -> Result<String, String> {
119 AppService::scenario_install(self, url).await
120 }
121
122 async fn pkg_link(
125 &self,
126 path: String,
127 name: Option<String>,
128 force: Option<bool>,
129 scope: Option<String>,
130 project_root: Option<String>,
131 ) -> Result<String, String> {
132 AppService::pkg_link(self, path, name, force, scope, project_root).await
133 }
134
135 async fn pkg_unlink(&self, name: String) -> Result<String, String> {
136 AppService::pkg_unlink(self, name).await
137 }
138
139 #[allow(clippy::too_many_arguments)]
140 async fn pkg_list(
141 &self,
142 project_root: Option<String>,
143 limit: Option<i32>,
144 sort: Option<String>,
145 filter: Option<serde_json::Value>,
146 fields: Option<Vec<String>>,
147 verbose: Option<String>,
148 ) -> Result<String, String> {
149 let filter_map = match filter {
155 None => None,
156 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
157 Ok(map) => Some(map),
158 Err(e) => {
159 tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
160 None
161 }
162 },
163 };
164
165 let opts = ListOpts {
170 limit: limit.map(|n| n.max(0) as usize),
171 sort,
172 filter: filter_map,
173 fields,
174 verbose,
175 };
176
177 AppService::pkg_list(self, project_root, opts)
178 .await
179 .map_err(|e| e.to_string())
180 }
181
182 async fn pkg_install(
183 &self,
184 url: String,
185 name: Option<String>,
186 force: Option<bool>,
187 ) -> Result<String, String> {
188 AppService::pkg_install(self, url, name, force).await
189 }
190
191 async fn pkg_remove(
192 &self,
193 name: &str,
194 project_root: Option<String>,
195 version: Option<String>,
196 scope: Option<String>,
197 ) -> Result<String, String> {
198 AppService::pkg_remove(self, name, project_root, version, scope).await
199 }
200
201 async fn pkg_repair(
202 &self,
203 name: Option<String>,
204 project_root: Option<String>,
205 ) -> Result<String, String> {
206 AppService::pkg_repair(self, name, project_root).await
207 }
208
209 async fn pkg_doctor(
210 &self,
211 name: Option<String>,
212 project_root: Option<String>,
213 ) -> Result<String, String> {
214 AppService::pkg_doctor(self, name, project_root).await
215 }
216
217 #[allow(clippy::too_many_arguments)]
221 async fn pkg_test(
222 &self,
223 pkg: Option<String>,
224 code_file: Option<String>,
225 code: Option<String>,
226 spec_dir: Option<String>,
227 filter: Option<String>,
228 search_paths: Option<Vec<String>>,
229 project_root: Option<String>,
230 auto_search_paths: Option<bool>,
231 ) -> Result<String, String> {
232 AppService::pkg_test(
233 self,
234 pkg,
235 code_file,
236 code,
237 spec_dir,
238 filter,
239 search_paths,
240 project_root,
241 auto_search_paths,
242 )
243 .await
244 }
245
246 async fn add_note(
249 &self,
250 session_id: &str,
251 content: &str,
252 title: Option<&str>,
253 ) -> Result<String, String> {
254 AppService::add_note(self, session_id, content, title).await
255 }
256
257 async fn log_view(
258 &self,
259 session_id: Option<&str>,
260 limit: Option<usize>,
261 max_chars: Option<usize>,
262 ) -> Result<String, String> {
263 AppService::log_view(self, session_id, limit, max_chars).await
264 }
265
266 async fn stats(
267 &self,
268 strategy_filter: Option<&str>,
269 days: Option<u64>,
270 ) -> Result<String, String> {
271 AppService::stats(self, strategy_filter, days)
272 }
273
274 async fn init(&self, project_root: Option<String>) -> Result<String, String> {
277 AppService::init(self, project_root).await
278 }
279
280 async fn update(&self, project_root: Option<String>) -> Result<String, String> {
281 AppService::update(self, project_root).await
282 }
283
284 async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
285 AppService::migrate(self, project_root).await
286 }
287
288 async fn session_new(
291 &self,
292 project_root: Option<String>,
293 mode: Option<String>,
294 ) -> Result<String, String> {
295 let session = self.activate_session(project_root.as_deref(), mode.as_deref())?;
296 let result = serde_json::json!({
297 "session_id": session.session_id,
298 "project_root": session
299 .project_root
300 .as_ref()
301 .map(|p| p.to_string_lossy().to_string()),
302 "mode": session.mode.as_str(),
303 });
304 serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
305 }
306
307 async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
310 AppService::card_list(self, pkg.as_deref())
311 }
312
313 async fn card_get(&self, card_id: &str) -> Result<String, String> {
314 AppService::card_get(self, card_id)
315 }
316
317 async fn card_find(
318 &self,
319 pkg: Option<String>,
320 where_: Option<serde_json::Value>,
321 order_by: Option<serde_json::Value>,
322 limit: Option<usize>,
323 offset: Option<usize>,
324 ) -> Result<String, String> {
325 AppService::card_find(self, pkg, where_, order_by, limit, offset)
326 }
327
328 async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
329 AppService::card_alias_list(self, pkg.as_deref())
330 }
331
332 async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
333 AppService::card_get_by_alias(self, name)
334 }
335
336 async fn card_alias_set(
337 &self,
338 name: &str,
339 card_id: &str,
340 pkg: Option<String>,
341 note: Option<String>,
342 ) -> Result<String, String> {
343 AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
344 }
345
346 async fn card_append(
347 &self,
348 card_id: &str,
349 fields: serde_json::Value,
350 ) -> Result<String, String> {
351 AppService::card_append(self, card_id, fields)
352 }
353
354 async fn card_install(&self, url: String) -> Result<String, String> {
355 AppService::card_install(self, url).await
356 }
357
358 async fn card_samples(
359 &self,
360 card_id: &str,
361 offset: Option<usize>,
362 limit: Option<usize>,
363 where_: Option<serde_json::Value>,
364 ) -> Result<String, String> {
365 AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
366 }
367
368 async fn card_lineage(
369 &self,
370 card_id: &str,
371 direction: Option<String>,
372 depth: Option<usize>,
373 include_stats: Option<bool>,
374 relation_filter: Option<Vec<String>>,
375 ) -> Result<String, String> {
376 AppService::card_lineage(
377 self,
378 card_id,
379 direction.as_deref(),
380 depth,
381 include_stats,
382 relation_filter,
383 )
384 }
385
386 async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
387 AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
388 }
389
390 async fn card_analyze(&self, card_id: &str, pkg: Option<String>) -> Result<String, String> {
391 AppService::card_analyze(self, card_id, pkg).await
392 }
393
394 async fn card_publish(
395 &self,
396 card_id: &str,
397 target_repo: &str,
398 commit_message: Option<&str>,
399 ) -> Result<String, String> {
400 AppService::card_publish(self, card_id, target_repo, commit_message).await
401 }
402
403 async fn hub_reindex(
406 &self,
407 output_path: Option<String>,
408 source_dir: Option<String>,
409 ) -> Result<String, String> {
410 self.hub_reindex(output_path.as_deref(), source_dir.as_deref())
411 .await
412 }
413
414 async fn hub_gendoc(
415 &self,
416 source_dir: String,
417 out_dir: Option<String>,
418 projections: Option<Vec<String>>,
419 config_path: Option<String>,
420 lint_strict: Option<bool>,
421 ) -> Result<String, String> {
422 let svc = self.clone();
423 tokio::task::spawn_blocking(move || {
424 crate::AppService::hub_gendoc(
425 &svc,
426 &source_dir,
427 out_dir.as_deref(),
428 projections.as_deref(),
429 config_path.as_deref(),
430 lint_strict,
431 )
432 })
433 .await
434 .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
435 }
436
437 async fn hub_dist(
438 &self,
439 source_dir: String,
440 output_path: Option<String>,
441 out_dir: Option<String>,
442 preset: Option<String>,
443 project_root: Option<String>,
444 projections: Option<Vec<String>>,
445 config_path: Option<String>,
446 lint_strict: Option<bool>,
447 ) -> Result<String, String> {
448 self.hub_dist(
449 &source_dir,
450 output_path.as_deref(),
451 out_dir.as_deref(),
452 preset.as_deref(),
453 project_root.as_deref(),
454 projections.as_deref(),
455 config_path.as_deref(),
456 lint_strict,
457 )
458 .await
459 }
460
461 async fn hub_info(&self, pkg: String) -> Result<String, String> {
462 let svc = self.clone();
463 tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
464 .await
465 .map_err(|e| format!("hub_info task panicked: {e}"))?
466 }
467
468 #[allow(clippy::too_many_arguments)]
469 async fn hub_search(
470 &self,
471 query: Option<String>,
472 category: Option<String>,
473 installed_only: Option<bool>,
474 limit: Option<i32>,
475 sort: Option<String>,
476 filter: Option<serde_json::Value>,
477 fields: Option<Vec<String>>,
478 verbose: Option<String>,
479 local_indices: Option<Vec<String>>,
480 ) -> Result<String, String> {
481 let svc = self.clone();
482
483 let filter_map = match filter {
491 None => None,
492 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
493 Ok(map) => Some(map),
494 Err(e) => {
495 tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
496 None
497 }
498 },
499 };
500
501 let opts = ListOpts {
506 limit: limit.map(|n| n.max(0) as usize),
507 sort,
508 filter: filter_map,
509 fields,
510 verbose,
511 };
512
513 tokio::task::spawn_blocking(move || {
514 AppService::hub_search(
515 &svc,
516 query.as_deref(),
517 category.as_deref(),
518 installed_only,
519 opts,
520 local_indices,
521 )
522 })
523 .await
524 .map_err(|e| format!("hub_search task panicked: {e}"))?
525 }
526
527 async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
530 AppService::pkg_read_init_lua(self, name, None)
531 }
532
533 async fn pkg_get_narrative_md(&self, name: &str) -> Result<Option<String>, String> {
534 AppService::pkg_get_narrative_md(self, name).await
535 }
536
537 async fn pkg_meta(&self, name: &str) -> Result<String, String> {
538 let filter = serde_json::json!({ "name": name });
539 let json_str = EngineApi::pkg_list(
540 self,
541 None,
542 None,
543 None,
544 Some(filter),
545 None,
546 Some("full".to_string()),
547 )
548 .await?;
549 let val: serde_json::Value = serde_json::from_str(&json_str)
550 .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
551 let pkgs = val
552 .get("packages")
553 .and_then(|p| p.as_array())
554 .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
555 if pkgs.is_empty() {
556 return Err(format!("pkg not found: {name}"));
557 }
558 serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
559 }
560
561 async fn pkg_scaffold(
564 &self,
565 name: String,
566 target_dir: Option<String>,
567 category: Option<String>,
568 description: Option<String>,
569 ) -> Result<String, String> {
570 let svc = self.clone();
571 tokio::task::spawn_blocking(move || {
572 AppService::pkg_scaffold(
573 &svc,
574 &name,
575 target_dir.as_deref(),
576 category.as_deref(),
577 description.as_deref(),
578 )
579 })
580 .await
581 .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
582 }
583
584 async fn hub_index_aggregate(&self) -> Result<String, String> {
594 let svc = self.clone();
595 let (index, warnings) = tokio::task::spawn_blocking(move || {
596 AppService::aggregate_index(&svc).map_err(|e| e.to_string())
597 })
598 .await
599 .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
600
601 let mut json = serde_json::to_value(&index)
602 .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
603 if !warnings.is_empty() {
604 if let Some(obj) = json.as_object_mut() {
605 obj.insert("warnings".to_string(), serde_json::json!(warnings));
606 }
607 }
608 serde_json::to_string(&json)
609 .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
610 }
611
612 async fn setting_resolve(&self, target: Option<String>) -> Result<String, String> {
615 let app_dir = self.log_config.app_dir();
616 let project_root = self.resolve_root(None);
617 tokio::task::spawn_blocking(move || {
618 crate::service::setting::resolve_setting(
619 &app_dir,
620 project_root.as_deref(),
621 target.as_deref(),
622 )
623 .map_err(|e| e.to_string())
624 .and_then(|r| {
625 serde_json::to_string(&r).map_err(|e| format!("setting_resolve: serialize: {e}"))
626 })
627 })
628 .await
629 .map_err(|e| format!("setting_resolve: task panicked: {e}"))?
630 }
631
632 async fn state_list(&self, namespace: String) -> Result<String, String> {
635 let store = Arc::clone(&self.state_store);
636 tokio::task::spawn_blocking(move || {
637 store
638 .list_dispatched(&namespace)
639 .map_err(AppService::state_err_to_wire)
640 .and_then(|keys| {
641 serde_json::to_string(&serde_json::json!({"keys": keys}))
645 .map_err(|e| format!("state_list: serialize: {e}"))
646 })
647 })
648 .await
649 .map_err(|e| format!("state_list: task panicked: {e}"))?
650 }
651
652 async fn state_show(&self, namespace: String, key: String) -> Result<String, String> {
653 let store = Arc::clone(&self.state_store);
654 tokio::task::spawn_blocking(move || {
655 store
656 .show_dispatched(&namespace, &key)
657 .map_err(AppService::state_err_to_wire)
658 .and_then(|value| {
659 serde_json::to_string(&value).map_err(|e| format!("state_show: serialize: {e}"))
660 })
661 })
662 .await
663 .map_err(|e| format!("state_show: task panicked: {e}"))?
664 }
665
666 async fn state_reset(
667 &self,
668 namespace: String,
669 key: String,
670 steps: Option<Vec<String>>,
671 fields: Option<Vec<String>>,
672 ) -> Result<String, String> {
673 let store = Arc::clone(&self.state_store);
674 let steps_input: Vec<String> = steps.clone().unwrap_or_default();
676 let fields_input: Vec<String> = fields.clone().unwrap_or_default();
677 tokio::task::spawn_blocking(move || {
678 let steps_slice: Vec<String> = steps.unwrap_or_default();
679 let fields_slice: Vec<String> = fields.unwrap_or_default();
680 store
681 .reset_dispatched_with_backup(&namespace, &key, &steps_slice, &fields_slice)
682 .map_err(AppService::state_err_to_wire)
683 .and_then(|report: ResetReport| {
684 let v = serde_json::json!({
685 "ok": true,
686 "backup_path": report.backup_path.to_string_lossy(),
687 "steps_removed": report.steps_removed,
688 "steps_input": steps_input,
689 "fields_removed": report.fields_removed,
690 "fields_input": fields_input,
691 });
692 serde_json::to_string(&v).map_err(|e| format!("state_reset: serialize: {e}"))
693 })
694 })
695 .await
696 .map_err(|e| format!("state_reset: task panicked: {e}"))?
697 }
698
699 async fn state_set(
700 &self,
701 namespace: String,
702 key: String,
703 value: serde_json::Value,
704 ) -> Result<String, String> {
705 let store = Arc::clone(&self.state_store);
706 tokio::task::spawn_blocking(move || {
707 store
708 .set_dispatched(&namespace, &key, &value)
709 .map_err(AppService::state_err_to_wire)
710 .map(|_| r#"{"ok":true}"#.to_string())
711 })
712 .await
713 .map_err(|e| format!("state_set: task panicked: {e}"))?
714 }
715
716 async fn state_delete(&self, namespace: String, key: String) -> Result<String, String> {
717 let store = Arc::clone(&self.state_store);
718 tokio::task::spawn_blocking(move || {
719 store
720 .delete_dispatched(&namespace, &key)
721 .map_err(AppService::state_err_to_wire)
722 .and_then(|existed| {
723 serde_json::to_string(&serde_json::json!({"ok": true, "existed": existed}))
724 .map_err(|e| format!("state_delete: serialize: {e}"))
725 })
726 })
727 .await
728 .map_err(|e| format!("state_delete: task panicked: {e}"))?
729 }
730
731 async fn info(&self) -> String {
734 let svc = self.clone();
735 tokio::task::spawn_blocking(move || AppService::info(&svc))
736 .await
737 .unwrap_or_else(|e| format!("{{\"error\": \"info: task panicked: {e}\"}}"))
738 }
739
740 async fn pool_ensure(&self) -> Result<String, String> {
743 AppService::pool_ensure_impl(self).await
744 }
745
746 async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
747 AppService::pool_status_impl(self, sid).await
748 }
749
750 async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
751 AppService::pool_stop_impl(self, sid).await
752 }
753}
754
755impl AppService {
758 fn state_err_to_wire(e: StateError) -> String {
770 let v = match e {
771 StateError::KeyNotFound { namespace, key } => {
772 serde_json::json!({"error": "NOT_FOUND", "namespace": namespace, "key": key})
773 }
774 StateError::UnsafeSegment { which, value } => {
775 serde_json::json!({"error": "UNSAFE_SEGMENT", "which": which, "value": value})
776 }
777 StateError::IoBackup(io_err) => {
778 serde_json::json!({"error": "IO_BACKUP", "message": io_err.to_string()})
779 }
780 StateError::IoRead(io_err) => {
781 serde_json::json!({"error": "IO_READ", "message": io_err.to_string()})
782 }
783 StateError::IoWrite(io_err) => {
784 serde_json::json!({"error": "IO_WRITE", "message": io_err.to_string()})
785 }
786 StateError::Serde(serde_err) => {
787 serde_json::json!({"error": "SERDE", "message": serde_err.to_string()})
788 }
789 StateError::ShapeInvalid { reason } => {
790 serde_json::json!({"error": "SHAPE_INVALID", "reason": reason})
791 }
792 };
793 serde_json::to_string(&v).unwrap_or_else(|e| {
794 format!("{{\"error\":\"INTERNAL\",\"message\":\"serialize failed: {e}\"}}")
798 })
799 }
800}
801
802impl AppService {
805 pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
810 let reg_path = self.pool_reg_path.clone();
811 let lock_path = self.pool_lock_path.clone();
812
813 let sessions =
814 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
815 with_registry_lock(&lock_path, || {
816 let mut reg = PoolRegistry::load_or_default(®_path)?;
817 let survivors = reg.scan_and_gc()?;
818 reg.save(®_path)?;
820 let entries = survivors
821 .iter()
822 .map(|e| {
823 serde_json::json!({
824 "sid": e.sid,
825 "pid": e.pid,
826 "sock": e.sock.to_string_lossy(),
827 "version": e.version,
828 })
829 })
830 .collect::<Vec<_>>();
831 Ok(entries)
832 })
833 })
834 .await
835 .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
836 .map_err(|e| e.to_string())?;
837
838 let pool_version = env!("CARGO_PKG_VERSION");
839 serde_json::to_string(&serde_json::json!({
840 "sessions": sessions,
841 "pool_version": pool_version,
842 }))
843 .map_err(|e| format!("pool_ensure: serialize: {e}"))
844 }
845
846 pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
851 let reg_path = self.pool_reg_path.clone();
852 let lock_path = self.pool_lock_path.clone();
853
854 let sessions =
855 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
856 with_registry_lock(&lock_path, || {
857 let mut reg = PoolRegistry::load_or_default(®_path)?;
858 let _ = reg.scan_and_gc()?;
860 reg.save(®_path)?;
861
862 let entries: Vec<serde_json::Value> = reg
863 .sessions
864 .iter()
865 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
866 .map(|e| {
867 serde_json::json!({
868 "sid": e.sid,
869 "pid": e.pid,
870 "sock": e.sock.to_string_lossy(),
871 "version": e.version,
872 "created_at": e.created_at,
873 "status": "running",
875 })
876 })
877 .collect();
878 Ok(entries)
879 })
880 })
881 .await
882 .map_err(|e| format!("pool_status: task panicked: {e}"))?
883 .map_err(|e| e.to_string())?;
884
885 let pool_version = env!("CARGO_PKG_VERSION");
886 serde_json::to_string(&serde_json::json!({
887 "sessions": sessions,
888 "pool_version": pool_version,
889 }))
890 .map_err(|e| format!("pool_status: serialize: {e}"))
891 }
892
893 pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
899 let reg_path = self.pool_reg_path.clone();
900 let lock_path = self.pool_lock_path.clone();
901
902 let result = tokio::task::spawn_blocking(
903 move || -> Result<(Vec<String>, Vec<String>), PoolError> {
904 with_registry_lock(&lock_path, || {
905 let mut reg = PoolRegistry::load_or_default(®_path)?;
906
907 let targets: Vec<_> = reg
909 .sessions
910 .iter()
911 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
912 .cloned()
913 .collect();
914
915 let mut stopped: Vec<String> = Vec::new();
916 let mut errors: Vec<String> = Vec::new();
917
918 for entry in &targets {
919 #[cfg(unix)]
920 {
921 let pid_t = match i32::try_from(entry.pid) {
925 Ok(p) if p > 0 => p,
926 Ok(_) => {
927 errors.push(format!(
928 "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
929 entry.sid, entry.pid
930 ));
931 reg.remove(&entry.sid);
932 continue;
933 }
934 Err(_) => {
935 errors.push(format!(
936 "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
937 entry.sid, entry.pid
938 ));
939 reg.remove(&entry.sid);
941 continue;
942 }
943 };
944
945 let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
949 if ret == 0 {
950 stopped.push(entry.sid.clone());
951 } else {
952 let os_err = std::io::Error::last_os_error();
953 if os_err.raw_os_error() == Some(libc::ESRCH) {
954 stopped.push(entry.sid.clone());
956 } else {
957 errors.push(format!(
958 "sid={}: SIGTERM failed: {}",
959 entry.sid, os_err
960 ));
961 }
962 }
963 }
964 #[cfg(not(unix))]
965 {
966 errors.push(format!(
968 "sid={}: SIGTERM not supported on this platform",
969 entry.sid
970 ));
971 }
972 reg.remove(&entry.sid);
975 }
976
977 reg.save(®_path)?;
979
980 Ok((stopped, errors))
981 })
982 },
983 )
984 .await
985 .map_err(|e| format!("pool_stop: task panicked: {e}"))?
986 .map_err(|e| e.to_string())?;
987
988 let (stopped, errors) = result;
989 serde_json::to_string(&serde_json::json!({
990 "stopped": stopped,
991 "errors": errors,
992 }))
993 .map_err(|e| format!("pool_stop: serialize: {e}"))
994 }
995}
996
997#[cfg(test)]
1000mod tests {
1001 use super::super::test_support::make_app_service_at;
1002
1003 #[tokio::test]
1010 #[cfg(unix)]
1011 async fn pool_stop_pid_zero_is_rejected() {
1012 let tmp = tempfile::tempdir().expect("tempdir");
1015 let root = tmp.path().to_path_buf();
1016 let svc = make_app_service_at(root.clone()).await;
1017
1018 let pool_reg_path = root.join("state").join("pool").join("registry.json");
1021 std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
1022
1023 let seeded = serde_json::json!({
1024 "sessions": [{
1025 "sid": "zero-pid-session",
1026 "pid": 0u32,
1027 "sock": "/tmp/alc-pool/zero.sock",
1028 "version": "0.30.0",
1029 "created_at": "2026-01-01T00:00:00Z"
1030 }]
1031 });
1032 std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
1033
1034 let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
1036 let result: serde_json::Value =
1037 serde_json::from_str(&json_str).expect("response is valid JSON");
1038
1039 let errors = result["errors"].as_array().expect("errors array");
1041 assert!(
1042 !errors.is_empty(),
1043 "expected at least one error for pid=0 entry"
1044 );
1045 let err_msg = errors[0].as_str().unwrap_or("");
1046 assert!(
1047 err_msg.contains("not a valid POSIX target pid"),
1048 "unexpected error message: {err_msg}"
1049 );
1050
1051 let stopped = result["stopped"].as_array().expect("stopped array");
1053 assert!(
1054 stopped.is_empty(),
1055 "pid=0 entry must not appear in stopped list"
1056 );
1057
1058 let on_disk: serde_json::Value =
1060 serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
1061 .expect("parse registry");
1062 let sessions = on_disk["sessions"].as_array().expect("sessions array");
1063 assert!(
1064 sessions.is_empty(),
1065 "pid=0 entry must be removed from on-disk registry"
1066 );
1067
1068 }
1071}