1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11#[async_trait]
18impl EngineApi for AppService {
19 async fn run(
22 &self,
23 code: Option<String>,
24 code_file: Option<String>,
25 ctx: Option<serde_json::Value>,
26 project_root: Option<String>,
27 host_mode: Option<bool>,
28 ) -> Result<String, String> {
29 AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30 }
31
32 async fn advice(
33 &self,
34 strategy: &str,
35 task: Option<String>,
36 opts: Option<serde_json::Value>,
37 project_root: Option<String>,
38 ) -> Result<String, String> {
39 AppService::advice(self, strategy, task, opts, project_root).await
40 }
41
42 async fn continue_single(
43 &self,
44 session_id: &str,
45 response: String,
46 query_id: Option<&str>,
47 usage: Option<algocline_core::TokenUsage>,
48 ) -> Result<String, String> {
49 AppService::continue_single(self, session_id, response, query_id, usage).await
50 }
51
52 async fn continue_batch(
53 &self,
54 session_id: &str,
55 responses: Vec<QueryResponse>,
56 ) -> Result<String, String> {
57 AppService::continue_batch(self, session_id, responses).await
58 }
59
60 async fn status(
63 &self,
64 session_id: Option<&str>,
65 pending_filter: Option<serde_json::Value>,
66 include_history: bool,
67 ) -> Result<String, String> {
68 AppService::status(self, session_id, pending_filter, include_history).await
69 }
70
71 async fn eval(
74 &self,
75 scenario: Option<String>,
76 scenario_file: Option<String>,
77 scenario_name: Option<String>,
78 strategy: &str,
79 strategy_opts: Option<serde_json::Value>,
80 auto_card: bool,
81 ) -> Result<String, String> {
82 AppService::eval(
83 self,
84 scenario,
85 scenario_file,
86 scenario_name,
87 strategy,
88 strategy_opts,
89 auto_card,
90 )
91 .await
92 }
93
94 async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95 AppService::eval_history(self, strategy, limit)
96 }
97
98 async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99 AppService::eval_detail(self, eval_id)
100 }
101
102 async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103 AppService::eval_compare(self, eval_id_a, eval_id_b).await
104 }
105
106 async fn scenario_list(&self) -> Result<String, String> {
109 AppService::scenario_list(self)
110 }
111
112 async fn scenario_show(&self, name: &str) -> Result<String, String> {
113 AppService::scenario_show(self, name)
114 }
115
116 async fn scenario_install(&self, url: String) -> Result<String, String> {
117 AppService::scenario_install(self, url).await
118 }
119
120 async fn pkg_link(
123 &self,
124 path: String,
125 name: Option<String>,
126 force: Option<bool>,
127 scope: Option<String>,
128 project_root: Option<String>,
129 ) -> Result<String, String> {
130 AppService::pkg_link(self, path, name, force, scope, project_root).await
131 }
132
133 async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134 AppService::pkg_unlink(self, name).await
135 }
136
137 #[allow(clippy::too_many_arguments)]
138 async fn pkg_list(
139 &self,
140 project_root: Option<String>,
141 limit: Option<i32>,
142 sort: Option<String>,
143 filter: Option<serde_json::Value>,
144 fields: Option<Vec<String>>,
145 verbose: Option<String>,
146 ) -> Result<String, String> {
147 let filter_map = match filter {
153 None => None,
154 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155 Ok(map) => Some(map),
156 Err(e) => {
157 tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158 None
159 }
160 },
161 };
162
163 let opts = ListOpts {
168 limit: limit.map(|n| n.max(0) as usize),
169 sort,
170 filter: filter_map,
171 fields,
172 verbose,
173 };
174
175 AppService::pkg_list(self, project_root, opts)
176 .await
177 .map_err(|e| e.to_string())
178 }
179
180 async fn pkg_install(
181 &self,
182 url: String,
183 name: Option<String>,
184 force: Option<bool>,
185 ) -> Result<String, String> {
186 AppService::pkg_install(self, url, name, force).await
187 }
188
189 async fn pkg_remove(
190 &self,
191 name: &str,
192 project_root: Option<String>,
193 version: Option<String>,
194 scope: Option<String>,
195 ) -> Result<String, String> {
196 AppService::pkg_remove(self, name, project_root, version, scope).await
197 }
198
199 async fn pkg_repair(
200 &self,
201 name: Option<String>,
202 project_root: Option<String>,
203 ) -> Result<String, String> {
204 AppService::pkg_repair(self, name, project_root).await
205 }
206
207 async fn pkg_doctor(
208 &self,
209 name: Option<String>,
210 project_root: Option<String>,
211 ) -> Result<String, String> {
212 AppService::pkg_doctor(self, name, project_root).await
213 }
214
215 #[allow(clippy::too_many_arguments)]
219 async fn pkg_test(
220 &self,
221 pkg: Option<String>,
222 code_file: Option<String>,
223 code: Option<String>,
224 spec_dir: Option<String>,
225 filter: Option<String>,
226 search_paths: Option<Vec<String>>,
227 project_root: Option<String>,
228 ) -> Result<String, String> {
229 AppService::pkg_test(
230 self,
231 pkg,
232 code_file,
233 code,
234 spec_dir,
235 filter,
236 search_paths,
237 project_root,
238 )
239 .await
240 }
241
242 async fn add_note(
245 &self,
246 session_id: &str,
247 content: &str,
248 title: Option<&str>,
249 ) -> Result<String, String> {
250 AppService::add_note(self, session_id, content, title).await
251 }
252
253 async fn log_view(
254 &self,
255 session_id: Option<&str>,
256 limit: Option<usize>,
257 max_chars: Option<usize>,
258 ) -> Result<String, String> {
259 AppService::log_view(self, session_id, limit, max_chars).await
260 }
261
262 async fn stats(
263 &self,
264 strategy_filter: Option<&str>,
265 days: Option<u64>,
266 ) -> Result<String, String> {
267 AppService::stats(self, strategy_filter, days)
268 }
269
270 async fn init(&self, project_root: Option<String>) -> Result<String, String> {
273 AppService::init(self, project_root).await
274 }
275
276 async fn update(&self, project_root: Option<String>) -> Result<String, String> {
277 AppService::update(self, project_root).await
278 }
279
280 async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
281 AppService::migrate(self, project_root).await
282 }
283
284 async fn session_new(
287 &self,
288 project_root: Option<String>,
289 mode: Option<String>,
290 ) -> Result<String, String> {
291 let session = self.activate_session(project_root.as_deref(), mode.as_deref())?;
292 let result = serde_json::json!({
293 "session_id": session.session_id,
294 "project_root": session
295 .project_root
296 .as_ref()
297 .map(|p| p.to_string_lossy().to_string()),
298 "mode": session.mode.as_str(),
299 });
300 serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
301 }
302
303 async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
306 AppService::card_list(self, pkg.as_deref())
307 }
308
309 async fn card_get(&self, card_id: &str) -> Result<String, String> {
310 AppService::card_get(self, card_id)
311 }
312
313 async fn card_find(
314 &self,
315 pkg: Option<String>,
316 where_: Option<serde_json::Value>,
317 order_by: Option<serde_json::Value>,
318 limit: Option<usize>,
319 offset: Option<usize>,
320 ) -> Result<String, String> {
321 AppService::card_find(self, pkg, where_, order_by, limit, offset)
322 }
323
324 async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
325 AppService::card_alias_list(self, pkg.as_deref())
326 }
327
328 async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
329 AppService::card_get_by_alias(self, name)
330 }
331
332 async fn card_alias_set(
333 &self,
334 name: &str,
335 card_id: &str,
336 pkg: Option<String>,
337 note: Option<String>,
338 ) -> Result<String, String> {
339 AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
340 }
341
342 async fn card_append(
343 &self,
344 card_id: &str,
345 fields: serde_json::Value,
346 ) -> Result<String, String> {
347 AppService::card_append(self, card_id, fields)
348 }
349
350 async fn card_install(&self, url: String) -> Result<String, String> {
351 AppService::card_install(self, url).await
352 }
353
354 async fn card_samples(
355 &self,
356 card_id: &str,
357 offset: Option<usize>,
358 limit: Option<usize>,
359 where_: Option<serde_json::Value>,
360 ) -> Result<String, String> {
361 AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
362 }
363
364 async fn card_lineage(
365 &self,
366 card_id: &str,
367 direction: Option<String>,
368 depth: Option<usize>,
369 include_stats: Option<bool>,
370 relation_filter: Option<Vec<String>>,
371 ) -> Result<String, String> {
372 AppService::card_lineage(
373 self,
374 card_id,
375 direction.as_deref(),
376 depth,
377 include_stats,
378 relation_filter,
379 )
380 }
381
382 async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
383 AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
384 }
385
386 async fn card_analyze(&self, card_id: &str, pkg: Option<String>) -> Result<String, String> {
387 AppService::card_analyze(self, card_id, pkg).await
388 }
389
390 async fn hub_reindex(
393 &self,
394 output_path: Option<String>,
395 source_dir: Option<String>,
396 ) -> Result<String, String> {
397 let svc = self.clone();
398 tokio::task::spawn_blocking(move || {
399 AppService::hub_reindex(&svc, output_path.as_deref(), source_dir.as_deref())
400 })
401 .await
402 .map_err(|e| format!("hub_reindex task panicked: {e}"))?
403 }
404
405 async fn hub_gendoc(
406 &self,
407 source_dir: String,
408 out_dir: Option<String>,
409 projections: Option<Vec<String>>,
410 config_path: Option<String>,
411 lint_strict: Option<bool>,
412 ) -> Result<String, String> {
413 let svc = self.clone();
414 tokio::task::spawn_blocking(move || {
415 crate::AppService::hub_gendoc(
416 &svc,
417 &source_dir,
418 out_dir.as_deref(),
419 projections.as_deref(),
420 config_path.as_deref(),
421 lint_strict,
422 )
423 })
424 .await
425 .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
426 }
427
428 async fn hub_dist(
429 &self,
430 source_dir: String,
431 output_path: Option<String>,
432 out_dir: Option<String>,
433 preset: Option<String>,
434 project_root: Option<String>,
435 projections: Option<Vec<String>>,
436 config_path: Option<String>,
437 lint_strict: Option<bool>,
438 ) -> Result<String, String> {
439 let svc = self.clone();
440 tokio::task::spawn_blocking(move || {
441 AppService::hub_dist(
442 &svc,
443 &source_dir,
444 output_path.as_deref(),
445 out_dir.as_deref(),
446 preset.as_deref(),
447 project_root.as_deref(),
448 projections.as_deref(),
449 config_path.as_deref(),
450 lint_strict,
451 )
452 })
453 .await
454 .map_err(|e| format!("hub_dist task panicked: {e}"))?
455 }
456
457 async fn hub_info(&self, pkg: String) -> Result<String, String> {
458 let svc = self.clone();
459 tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
460 .await
461 .map_err(|e| format!("hub_info task panicked: {e}"))?
462 }
463
464 #[allow(clippy::too_many_arguments)]
465 async fn hub_search(
466 &self,
467 query: Option<String>,
468 category: Option<String>,
469 installed_only: Option<bool>,
470 limit: Option<i32>,
471 sort: Option<String>,
472 filter: Option<serde_json::Value>,
473 fields: Option<Vec<String>>,
474 verbose: Option<String>,
475 local_indices: Option<Vec<String>>,
476 ) -> Result<String, String> {
477 let svc = self.clone();
478
479 let filter_map = match filter {
487 None => None,
488 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
489 Ok(map) => Some(map),
490 Err(e) => {
491 tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
492 None
493 }
494 },
495 };
496
497 let opts = ListOpts {
502 limit: limit.map(|n| n.max(0) as usize),
503 sort,
504 filter: filter_map,
505 fields,
506 verbose,
507 };
508
509 tokio::task::spawn_blocking(move || {
510 AppService::hub_search(
511 &svc,
512 query.as_deref(),
513 category.as_deref(),
514 installed_only,
515 opts,
516 local_indices,
517 )
518 })
519 .await
520 .map_err(|e| format!("hub_search task panicked: {e}"))?
521 }
522
523 async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
526 AppService::pkg_read_init_lua(self, name, None)
527 }
528
529 async fn pkg_get_narrative_md(&self, name: &str) -> Result<Option<String>, String> {
530 AppService::pkg_get_narrative_md(self, name).await
531 }
532
533 async fn pkg_meta(&self, name: &str) -> Result<String, String> {
534 let filter = serde_json::json!({ "name": name });
535 let json_str = EngineApi::pkg_list(
536 self,
537 None,
538 None,
539 None,
540 Some(filter),
541 None,
542 Some("full".to_string()),
543 )
544 .await?;
545 let val: serde_json::Value = serde_json::from_str(&json_str)
546 .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
547 let pkgs = val
548 .get("packages")
549 .and_then(|p| p.as_array())
550 .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
551 if pkgs.is_empty() {
552 return Err(format!("pkg not found: {name}"));
553 }
554 serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
555 }
556
557 async fn pkg_scaffold(
560 &self,
561 name: String,
562 target_dir: Option<String>,
563 category: Option<String>,
564 description: Option<String>,
565 ) -> Result<String, String> {
566 let svc = self.clone();
567 tokio::task::spawn_blocking(move || {
568 AppService::pkg_scaffold(
569 &svc,
570 &name,
571 target_dir.as_deref(),
572 category.as_deref(),
573 description.as_deref(),
574 )
575 })
576 .await
577 .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
578 }
579
580 async fn hub_index_aggregate(&self) -> Result<String, String> {
590 let svc = self.clone();
591 let (index, warnings) = tokio::task::spawn_blocking(move || {
592 AppService::aggregate_index(&svc).map_err(|e| e.to_string())
593 })
594 .await
595 .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
596
597 let mut json = serde_json::to_value(&index)
598 .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
599 if !warnings.is_empty() {
600 if let Some(obj) = json.as_object_mut() {
601 obj.insert("warnings".to_string(), serde_json::json!(warnings));
602 }
603 }
604 serde_json::to_string(&json)
605 .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
606 }
607
608 async fn info(&self) -> String {
611 AppService::info(self)
612 }
613
614 async fn pool_ensure(&self) -> Result<String, String> {
617 AppService::pool_ensure_impl(self).await
618 }
619
620 async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
621 AppService::pool_status_impl(self, sid).await
622 }
623
624 async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
625 AppService::pool_stop_impl(self, sid).await
626 }
627}
628
629impl AppService {
632 pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
637 let reg_path = self.pool_reg_path.clone();
638 let lock_path = self.pool_lock_path.clone();
639
640 let sessions =
641 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
642 with_registry_lock(&lock_path, || {
643 let mut reg = PoolRegistry::load_or_default(®_path)?;
644 let survivors = reg.scan_and_gc()?;
645 reg.save(®_path)?;
647 let entries = survivors
648 .iter()
649 .map(|e| {
650 serde_json::json!({
651 "sid": e.sid,
652 "pid": e.pid,
653 "sock": e.sock.to_string_lossy(),
654 "version": e.version,
655 })
656 })
657 .collect::<Vec<_>>();
658 Ok(entries)
659 })
660 })
661 .await
662 .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
663 .map_err(|e| e.to_string())?;
664
665 let pool_version = env!("CARGO_PKG_VERSION");
666 serde_json::to_string(&serde_json::json!({
667 "sessions": sessions,
668 "pool_version": pool_version,
669 }))
670 .map_err(|e| format!("pool_ensure: serialize: {e}"))
671 }
672
673 pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
678 let reg_path = self.pool_reg_path.clone();
679 let lock_path = self.pool_lock_path.clone();
680
681 let sessions =
682 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
683 with_registry_lock(&lock_path, || {
684 let mut reg = PoolRegistry::load_or_default(®_path)?;
685 let _ = reg.scan_and_gc()?;
687 reg.save(®_path)?;
688
689 let entries: Vec<serde_json::Value> = reg
690 .sessions
691 .iter()
692 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
693 .map(|e| {
694 serde_json::json!({
695 "sid": e.sid,
696 "pid": e.pid,
697 "sock": e.sock.to_string_lossy(),
698 "version": e.version,
699 "created_at": e.created_at,
700 "status": "running",
702 })
703 })
704 .collect();
705 Ok(entries)
706 })
707 })
708 .await
709 .map_err(|e| format!("pool_status: task panicked: {e}"))?
710 .map_err(|e| e.to_string())?;
711
712 let pool_version = env!("CARGO_PKG_VERSION");
713 serde_json::to_string(&serde_json::json!({
714 "sessions": sessions,
715 "pool_version": pool_version,
716 }))
717 .map_err(|e| format!("pool_status: serialize: {e}"))
718 }
719
720 pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
726 let reg_path = self.pool_reg_path.clone();
727 let lock_path = self.pool_lock_path.clone();
728
729 let result = tokio::task::spawn_blocking(
730 move || -> Result<(Vec<String>, Vec<String>), PoolError> {
731 with_registry_lock(&lock_path, || {
732 let mut reg = PoolRegistry::load_or_default(®_path)?;
733
734 let targets: Vec<_> = reg
736 .sessions
737 .iter()
738 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
739 .cloned()
740 .collect();
741
742 let mut stopped: Vec<String> = Vec::new();
743 let mut errors: Vec<String> = Vec::new();
744
745 for entry in &targets {
746 #[cfg(unix)]
747 {
748 let pid_t = match i32::try_from(entry.pid) {
752 Ok(p) if p > 0 => p,
753 Ok(_) => {
754 errors.push(format!(
755 "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
756 entry.sid, entry.pid
757 ));
758 reg.remove(&entry.sid);
759 continue;
760 }
761 Err(_) => {
762 errors.push(format!(
763 "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
764 entry.sid, entry.pid
765 ));
766 reg.remove(&entry.sid);
768 continue;
769 }
770 };
771
772 let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
776 if ret == 0 {
777 stopped.push(entry.sid.clone());
778 } else {
779 let os_err = std::io::Error::last_os_error();
780 if os_err.raw_os_error() == Some(libc::ESRCH) {
781 stopped.push(entry.sid.clone());
783 } else {
784 errors.push(format!(
785 "sid={}: SIGTERM failed: {}",
786 entry.sid, os_err
787 ));
788 }
789 }
790 }
791 #[cfg(not(unix))]
792 {
793 errors.push(format!(
795 "sid={}: SIGTERM not supported on this platform",
796 entry.sid
797 ));
798 }
799 reg.remove(&entry.sid);
802 }
803
804 reg.save(®_path)?;
806
807 Ok((stopped, errors))
808 })
809 },
810 )
811 .await
812 .map_err(|e| format!("pool_stop: task panicked: {e}"))?
813 .map_err(|e| e.to_string())?;
814
815 let (stopped, errors) = result;
816 serde_json::to_string(&serde_json::json!({
817 "stopped": stopped,
818 "errors": errors,
819 }))
820 .map_err(|e| format!("pool_stop: serialize: {e}"))
821 }
822}
823
824#[cfg(test)]
827mod tests {
828 use super::super::test_support::make_app_service_at;
829
830 #[tokio::test]
837 #[cfg(unix)]
838 async fn pool_stop_pid_zero_is_rejected() {
839 let tmp = tempfile::tempdir().expect("tempdir");
842 let root = tmp.path().to_path_buf();
843 let svc = make_app_service_at(root.clone()).await;
844
845 let pool_reg_path = root.join("state").join("pool").join("registry.json");
848 std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
849
850 let seeded = serde_json::json!({
851 "sessions": [{
852 "sid": "zero-pid-session",
853 "pid": 0u32,
854 "sock": "/tmp/alc-pool/zero.sock",
855 "version": "0.30.0",
856 "created_at": "2026-01-01T00:00:00Z"
857 }]
858 });
859 std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
860
861 let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
863 let result: serde_json::Value =
864 serde_json::from_str(&json_str).expect("response is valid JSON");
865
866 let errors = result["errors"].as_array().expect("errors array");
868 assert!(
869 !errors.is_empty(),
870 "expected at least one error for pid=0 entry"
871 );
872 let err_msg = errors[0].as_str().unwrap_or("");
873 assert!(
874 err_msg.contains("not a valid POSIX target pid"),
875 "unexpected error message: {err_msg}"
876 );
877
878 let stopped = result["stopped"].as_array().expect("stopped array");
880 assert!(
881 stopped.is_empty(),
882 "pid=0 entry must not appear in stopped list"
883 );
884
885 let on_disk: serde_json::Value =
887 serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
888 .expect("parse registry");
889 let sessions = on_disk["sessions"].as_array().expect("sessions array");
890 assert!(
891 sessions.is_empty(),
892 "pid=0 entry must be removed from on-disk registry"
893 );
894
895 }
898}