1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11#[async_trait]
18impl EngineApi for AppService {
19 async fn run(
22 &self,
23 code: Option<String>,
24 code_file: Option<String>,
25 ctx: Option<serde_json::Value>,
26 project_root: Option<String>,
27 host_mode: Option<bool>,
28 ) -> Result<String, String> {
29 AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30 }
31
32 async fn advice(
33 &self,
34 strategy: &str,
35 task: Option<String>,
36 opts: Option<serde_json::Value>,
37 project_root: Option<String>,
38 ) -> Result<String, String> {
39 AppService::advice(self, strategy, task, opts, project_root).await
40 }
41
42 async fn continue_single(
43 &self,
44 session_id: &str,
45 response: String,
46 query_id: Option<&str>,
47 usage: Option<algocline_core::TokenUsage>,
48 ) -> Result<String, String> {
49 AppService::continue_single(self, session_id, response, query_id, usage).await
50 }
51
52 async fn continue_batch(
53 &self,
54 session_id: &str,
55 responses: Vec<QueryResponse>,
56 ) -> Result<String, String> {
57 AppService::continue_batch(self, session_id, responses).await
58 }
59
60 async fn status(
63 &self,
64 session_id: Option<&str>,
65 pending_filter: Option<serde_json::Value>,
66 include_history: bool,
67 ) -> Result<String, String> {
68 AppService::status(self, session_id, pending_filter, include_history).await
69 }
70
71 async fn eval(
74 &self,
75 scenario: Option<String>,
76 scenario_file: Option<String>,
77 scenario_name: Option<String>,
78 strategy: &str,
79 strategy_opts: Option<serde_json::Value>,
80 auto_card: bool,
81 ) -> Result<String, String> {
82 AppService::eval(
83 self,
84 scenario,
85 scenario_file,
86 scenario_name,
87 strategy,
88 strategy_opts,
89 auto_card,
90 )
91 .await
92 }
93
94 async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95 AppService::eval_history(self, strategy, limit)
96 }
97
98 async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99 AppService::eval_detail(self, eval_id)
100 }
101
102 async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103 AppService::eval_compare(self, eval_id_a, eval_id_b).await
104 }
105
106 async fn scenario_list(&self) -> Result<String, String> {
109 AppService::scenario_list(self)
110 }
111
112 async fn scenario_show(&self, name: &str) -> Result<String, String> {
113 AppService::scenario_show(self, name)
114 }
115
116 async fn scenario_install(&self, url: String) -> Result<String, String> {
117 AppService::scenario_install(self, url).await
118 }
119
120 async fn pkg_link(
123 &self,
124 path: String,
125 name: Option<String>,
126 force: Option<bool>,
127 scope: Option<String>,
128 project_root: Option<String>,
129 ) -> Result<String, String> {
130 AppService::pkg_link(self, path, name, force, scope, project_root).await
131 }
132
133 async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134 AppService::pkg_unlink(self, name).await
135 }
136
137 #[allow(clippy::too_many_arguments)]
138 async fn pkg_list(
139 &self,
140 project_root: Option<String>,
141 limit: Option<i32>,
142 sort: Option<String>,
143 filter: Option<serde_json::Value>,
144 fields: Option<Vec<String>>,
145 verbose: Option<String>,
146 ) -> Result<String, String> {
147 let filter_map = match filter {
153 None => None,
154 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155 Ok(map) => Some(map),
156 Err(e) => {
157 tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158 None
159 }
160 },
161 };
162
163 let opts = ListOpts {
168 limit: limit.map(|n| n.max(0) as usize),
169 sort,
170 filter: filter_map,
171 fields,
172 verbose,
173 };
174
175 AppService::pkg_list(self, project_root, opts)
176 .await
177 .map_err(|e| e.to_string())
178 }
179
180 async fn pkg_install(
181 &self,
182 url: String,
183 name: Option<String>,
184 force: Option<bool>,
185 ) -> Result<String, String> {
186 AppService::pkg_install(self, url, name, force).await
187 }
188
189 async fn pkg_remove(
190 &self,
191 name: &str,
192 project_root: Option<String>,
193 version: Option<String>,
194 scope: Option<String>,
195 ) -> Result<String, String> {
196 AppService::pkg_remove(self, name, project_root, version, scope).await
197 }
198
199 async fn pkg_repair(
200 &self,
201 name: Option<String>,
202 project_root: Option<String>,
203 ) -> Result<String, String> {
204 AppService::pkg_repair(self, name, project_root).await
205 }
206
207 async fn pkg_doctor(
208 &self,
209 name: Option<String>,
210 project_root: Option<String>,
211 ) -> Result<String, String> {
212 AppService::pkg_doctor(self, name, project_root).await
213 }
214
215 async fn add_note(
218 &self,
219 session_id: &str,
220 content: &str,
221 title: Option<&str>,
222 ) -> Result<String, String> {
223 AppService::add_note(self, session_id, content, title).await
224 }
225
226 async fn log_view(
227 &self,
228 session_id: Option<&str>,
229 limit: Option<usize>,
230 max_chars: Option<usize>,
231 ) -> Result<String, String> {
232 AppService::log_view(self, session_id, limit, max_chars).await
233 }
234
235 async fn stats(
236 &self,
237 strategy_filter: Option<&str>,
238 days: Option<u64>,
239 ) -> Result<String, String> {
240 AppService::stats(self, strategy_filter, days)
241 }
242
243 async fn init(&self, project_root: Option<String>) -> Result<String, String> {
246 AppService::init(self, project_root).await
247 }
248
249 async fn update(&self, project_root: Option<String>) -> Result<String, String> {
250 AppService::update(self, project_root).await
251 }
252
253 async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
254 AppService::migrate(self, project_root).await
255 }
256
257 async fn session_new(
260 &self,
261 project_root: Option<String>,
262 mode: Option<String>,
263 ) -> Result<String, String> {
264 let session = self.activate_session(project_root.as_deref(), mode.as_deref())?;
265 let result = serde_json::json!({
266 "session_id": session.session_id,
267 "project_root": session
268 .project_root
269 .as_ref()
270 .map(|p| p.to_string_lossy().to_string()),
271 "mode": session.mode.as_str(),
272 });
273 serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
274 }
275
276 async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
279 AppService::card_list(self, pkg.as_deref())
280 }
281
282 async fn card_get(&self, card_id: &str) -> Result<String, String> {
283 AppService::card_get(self, card_id)
284 }
285
286 async fn card_find(
287 &self,
288 pkg: Option<String>,
289 where_: Option<serde_json::Value>,
290 order_by: Option<serde_json::Value>,
291 limit: Option<usize>,
292 offset: Option<usize>,
293 ) -> Result<String, String> {
294 AppService::card_find(self, pkg, where_, order_by, limit, offset)
295 }
296
297 async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
298 AppService::card_alias_list(self, pkg.as_deref())
299 }
300
301 async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
302 AppService::card_get_by_alias(self, name)
303 }
304
305 async fn card_alias_set(
306 &self,
307 name: &str,
308 card_id: &str,
309 pkg: Option<String>,
310 note: Option<String>,
311 ) -> Result<String, String> {
312 AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
313 }
314
315 async fn card_append(
316 &self,
317 card_id: &str,
318 fields: serde_json::Value,
319 ) -> Result<String, String> {
320 AppService::card_append(self, card_id, fields)
321 }
322
323 async fn card_install(&self, url: String) -> Result<String, String> {
324 AppService::card_install(self, url).await
325 }
326
327 async fn card_samples(
328 &self,
329 card_id: &str,
330 offset: Option<usize>,
331 limit: Option<usize>,
332 where_: Option<serde_json::Value>,
333 ) -> Result<String, String> {
334 AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
335 }
336
337 async fn card_lineage(
338 &self,
339 card_id: &str,
340 direction: Option<String>,
341 depth: Option<usize>,
342 include_stats: Option<bool>,
343 relation_filter: Option<Vec<String>>,
344 ) -> Result<String, String> {
345 AppService::card_lineage(
346 self,
347 card_id,
348 direction.as_deref(),
349 depth,
350 include_stats,
351 relation_filter,
352 )
353 }
354
355 async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
356 AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
357 }
358
359 async fn card_analyze(&self, card_id: &str, pkg: Option<String>) -> Result<String, String> {
360 AppService::card_analyze(self, card_id, pkg).await
361 }
362
363 async fn hub_reindex(
366 &self,
367 output_path: Option<String>,
368 source_dir: Option<String>,
369 ) -> Result<String, String> {
370 let svc = self.clone();
371 tokio::task::spawn_blocking(move || {
372 AppService::hub_reindex(&svc, output_path.as_deref(), source_dir.as_deref())
373 })
374 .await
375 .map_err(|e| format!("hub_reindex task panicked: {e}"))?
376 }
377
378 async fn hub_gendoc(
379 &self,
380 source_dir: String,
381 out_dir: Option<String>,
382 projections: Option<Vec<String>>,
383 config_path: Option<String>,
384 lint_strict: Option<bool>,
385 ) -> Result<String, String> {
386 let svc = self.clone();
387 tokio::task::spawn_blocking(move || {
388 crate::AppService::hub_gendoc(
389 &svc,
390 &source_dir,
391 out_dir.as_deref(),
392 projections.as_deref(),
393 config_path.as_deref(),
394 lint_strict,
395 )
396 })
397 .await
398 .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
399 }
400
401 async fn hub_dist(
402 &self,
403 source_dir: String,
404 output_path: Option<String>,
405 out_dir: Option<String>,
406 preset: Option<String>,
407 project_root: Option<String>,
408 projections: Option<Vec<String>>,
409 config_path: Option<String>,
410 lint_strict: Option<bool>,
411 ) -> Result<String, String> {
412 let svc = self.clone();
413 tokio::task::spawn_blocking(move || {
414 AppService::hub_dist(
415 &svc,
416 &source_dir,
417 output_path.as_deref(),
418 out_dir.as_deref(),
419 preset.as_deref(),
420 project_root.as_deref(),
421 projections.as_deref(),
422 config_path.as_deref(),
423 lint_strict,
424 )
425 })
426 .await
427 .map_err(|e| format!("hub_dist task panicked: {e}"))?
428 }
429
430 async fn hub_info(&self, pkg: String) -> Result<String, String> {
431 let svc = self.clone();
432 tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
433 .await
434 .map_err(|e| format!("hub_info task panicked: {e}"))?
435 }
436
437 #[allow(clippy::too_many_arguments)]
438 async fn hub_search(
439 &self,
440 query: Option<String>,
441 category: Option<String>,
442 installed_only: Option<bool>,
443 limit: Option<i32>,
444 sort: Option<String>,
445 filter: Option<serde_json::Value>,
446 fields: Option<Vec<String>>,
447 verbose: Option<String>,
448 local_indices: Option<Vec<String>>,
449 ) -> Result<String, String> {
450 let svc = self.clone();
451
452 let filter_map = match filter {
460 None => None,
461 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
462 Ok(map) => Some(map),
463 Err(e) => {
464 tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
465 None
466 }
467 },
468 };
469
470 let opts = ListOpts {
475 limit: limit.map(|n| n.max(0) as usize),
476 sort,
477 filter: filter_map,
478 fields,
479 verbose,
480 };
481
482 tokio::task::spawn_blocking(move || {
483 AppService::hub_search(
484 &svc,
485 query.as_deref(),
486 category.as_deref(),
487 installed_only,
488 opts,
489 local_indices,
490 )
491 })
492 .await
493 .map_err(|e| format!("hub_search task panicked: {e}"))?
494 }
495
496 async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
499 AppService::pkg_read_init_lua(self, name, None)
500 }
501
502 async fn pkg_get_narrative_md(&self, name: &str) -> Result<Option<String>, String> {
503 AppService::pkg_get_narrative_md(self, name).await
504 }
505
506 async fn pkg_meta(&self, name: &str) -> Result<String, String> {
507 let filter = serde_json::json!({ "name": name });
508 let json_str = EngineApi::pkg_list(
509 self,
510 None,
511 None,
512 None,
513 Some(filter),
514 None,
515 Some("full".to_string()),
516 )
517 .await?;
518 let val: serde_json::Value = serde_json::from_str(&json_str)
519 .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
520 let pkgs = val
521 .get("packages")
522 .and_then(|p| p.as_array())
523 .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
524 if pkgs.is_empty() {
525 return Err(format!("pkg not found: {name}"));
526 }
527 serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
528 }
529
530 async fn pkg_scaffold(
533 &self,
534 name: String,
535 target_dir: Option<String>,
536 category: Option<String>,
537 description: Option<String>,
538 ) -> Result<String, String> {
539 let svc = self.clone();
540 tokio::task::spawn_blocking(move || {
541 AppService::pkg_scaffold(
542 &svc,
543 &name,
544 target_dir.as_deref(),
545 category.as_deref(),
546 description.as_deref(),
547 )
548 })
549 .await
550 .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
551 }
552
553 async fn hub_index_aggregate(&self) -> Result<String, String> {
563 let svc = self.clone();
564 let (index, warnings) = tokio::task::spawn_blocking(move || {
565 AppService::aggregate_index(&svc).map_err(|e| e.to_string())
566 })
567 .await
568 .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
569
570 let mut json = serde_json::to_value(&index)
571 .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
572 if !warnings.is_empty() {
573 if let Some(obj) = json.as_object_mut() {
574 obj.insert("warnings".to_string(), serde_json::json!(warnings));
575 }
576 }
577 serde_json::to_string(&json)
578 .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
579 }
580
581 async fn info(&self) -> String {
584 AppService::info(self)
585 }
586
587 async fn pool_ensure(&self) -> Result<String, String> {
590 AppService::pool_ensure_impl(self).await
591 }
592
593 async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
594 AppService::pool_status_impl(self, sid).await
595 }
596
597 async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
598 AppService::pool_stop_impl(self, sid).await
599 }
600}
601
602impl AppService {
605 pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
610 let reg_path = self.pool_reg_path.clone();
611 let lock_path = self.pool_lock_path.clone();
612
613 let sessions =
614 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
615 with_registry_lock(&lock_path, || {
616 let mut reg = PoolRegistry::load_or_default(®_path)?;
617 let survivors = reg.scan_and_gc()?;
618 reg.save(®_path)?;
620 let entries = survivors
621 .iter()
622 .map(|e| {
623 serde_json::json!({
624 "sid": e.sid,
625 "pid": e.pid,
626 "sock": e.sock.to_string_lossy(),
627 "version": e.version,
628 })
629 })
630 .collect::<Vec<_>>();
631 Ok(entries)
632 })
633 })
634 .await
635 .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
636 .map_err(|e| e.to_string())?;
637
638 let pool_version = env!("CARGO_PKG_VERSION");
639 serde_json::to_string(&serde_json::json!({
640 "sessions": sessions,
641 "pool_version": pool_version,
642 }))
643 .map_err(|e| format!("pool_ensure: serialize: {e}"))
644 }
645
646 pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
651 let reg_path = self.pool_reg_path.clone();
652 let lock_path = self.pool_lock_path.clone();
653
654 let sessions =
655 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
656 with_registry_lock(&lock_path, || {
657 let mut reg = PoolRegistry::load_or_default(®_path)?;
658 let _ = reg.scan_and_gc()?;
660 reg.save(®_path)?;
661
662 let entries: Vec<serde_json::Value> = reg
663 .sessions
664 .iter()
665 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
666 .map(|e| {
667 serde_json::json!({
668 "sid": e.sid,
669 "pid": e.pid,
670 "sock": e.sock.to_string_lossy(),
671 "version": e.version,
672 "created_at": e.created_at,
673 "status": "running",
675 })
676 })
677 .collect();
678 Ok(entries)
679 })
680 })
681 .await
682 .map_err(|e| format!("pool_status: task panicked: {e}"))?
683 .map_err(|e| e.to_string())?;
684
685 let pool_version = env!("CARGO_PKG_VERSION");
686 serde_json::to_string(&serde_json::json!({
687 "sessions": sessions,
688 "pool_version": pool_version,
689 }))
690 .map_err(|e| format!("pool_status: serialize: {e}"))
691 }
692
693 pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
699 let reg_path = self.pool_reg_path.clone();
700 let lock_path = self.pool_lock_path.clone();
701
702 let result = tokio::task::spawn_blocking(
703 move || -> Result<(Vec<String>, Vec<String>), PoolError> {
704 with_registry_lock(&lock_path, || {
705 let mut reg = PoolRegistry::load_or_default(®_path)?;
706
707 let targets: Vec<_> = reg
709 .sessions
710 .iter()
711 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
712 .cloned()
713 .collect();
714
715 let mut stopped: Vec<String> = Vec::new();
716 let mut errors: Vec<String> = Vec::new();
717
718 for entry in &targets {
719 #[cfg(unix)]
720 {
721 let pid_t = match i32::try_from(entry.pid) {
725 Ok(p) if p > 0 => p,
726 Ok(_) => {
727 errors.push(format!(
728 "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
729 entry.sid, entry.pid
730 ));
731 reg.remove(&entry.sid);
732 continue;
733 }
734 Err(_) => {
735 errors.push(format!(
736 "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
737 entry.sid, entry.pid
738 ));
739 reg.remove(&entry.sid);
741 continue;
742 }
743 };
744
745 let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
749 if ret == 0 {
750 stopped.push(entry.sid.clone());
751 } else {
752 let os_err = std::io::Error::last_os_error();
753 if os_err.raw_os_error() == Some(libc::ESRCH) {
754 stopped.push(entry.sid.clone());
756 } else {
757 errors.push(format!(
758 "sid={}: SIGTERM failed: {}",
759 entry.sid, os_err
760 ));
761 }
762 }
763 }
764 #[cfg(not(unix))]
765 {
766 errors.push(format!(
768 "sid={}: SIGTERM not supported on this platform",
769 entry.sid
770 ));
771 }
772 reg.remove(&entry.sid);
775 }
776
777 reg.save(®_path)?;
779
780 Ok((stopped, errors))
781 })
782 },
783 )
784 .await
785 .map_err(|e| format!("pool_stop: task panicked: {e}"))?
786 .map_err(|e| e.to_string())?;
787
788 let (stopped, errors) = result;
789 serde_json::to_string(&serde_json::json!({
790 "stopped": stopped,
791 "errors": errors,
792 }))
793 .map_err(|e| format!("pool_stop: serialize: {e}"))
794 }
795}
796
797#[cfg(test)]
800mod tests {
801 use super::super::test_support::make_app_service_at;
802
803 #[tokio::test]
810 #[cfg(unix)]
811 async fn pool_stop_pid_zero_is_rejected() {
812 let tmp = tempfile::tempdir().expect("tempdir");
815 let root = tmp.path().to_path_buf();
816 let svc = make_app_service_at(root.clone()).await;
817
818 let pool_reg_path = root.join("state").join("pool").join("registry.json");
821 std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
822
823 let seeded = serde_json::json!({
824 "sessions": [{
825 "sid": "zero-pid-session",
826 "pid": 0u32,
827 "sock": "/tmp/alc-pool/zero.sock",
828 "version": "0.30.0",
829 "created_at": "2026-01-01T00:00:00Z"
830 }]
831 });
832 std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
833
834 let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
836 let result: serde_json::Value =
837 serde_json::from_str(&json_str).expect("response is valid JSON");
838
839 let errors = result["errors"].as_array().expect("errors array");
841 assert!(
842 !errors.is_empty(),
843 "expected at least one error for pid=0 entry"
844 );
845 let err_msg = errors[0].as_str().unwrap_or("");
846 assert!(
847 err_msg.contains("not a valid POSIX target pid"),
848 "unexpected error message: {err_msg}"
849 );
850
851 let stopped = result["stopped"].as_array().expect("stopped array");
853 assert!(
854 stopped.is_empty(),
855 "pid=0 entry must not appear in stopped list"
856 );
857
858 let on_disk: serde_json::Value =
860 serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
861 .expect("parse registry");
862 let sessions = on_disk["sessions"].as_array().expect("sessions array");
863 assert!(
864 sessions.is_empty(),
865 "pid=0 entry must be removed from on-disk registry"
866 );
867
868 }
871}