1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11#[async_trait]
18impl EngineApi for AppService {
19 async fn run(
22 &self,
23 code: Option<String>,
24 code_file: Option<String>,
25 ctx: Option<serde_json::Value>,
26 project_root: Option<String>,
27 host_mode: Option<bool>,
28 ) -> Result<String, String> {
29 AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30 }
31
32 async fn advice(
33 &self,
34 strategy: &str,
35 task: Option<String>,
36 opts: Option<serde_json::Value>,
37 project_root: Option<String>,
38 ) -> Result<String, String> {
39 AppService::advice(self, strategy, task, opts, project_root).await
40 }
41
42 async fn continue_single(
43 &self,
44 session_id: &str,
45 response: String,
46 query_id: Option<&str>,
47 usage: Option<algocline_core::TokenUsage>,
48 ) -> Result<String, String> {
49 AppService::continue_single(self, session_id, response, query_id, usage).await
50 }
51
52 async fn continue_batch(
53 &self,
54 session_id: &str,
55 responses: Vec<QueryResponse>,
56 ) -> Result<String, String> {
57 AppService::continue_batch(self, session_id, responses).await
58 }
59
60 async fn status(
63 &self,
64 session_id: Option<&str>,
65 pending_filter: Option<serde_json::Value>,
66 include_history: bool,
67 ) -> Result<String, String> {
68 AppService::status(self, session_id, pending_filter, include_history).await
69 }
70
71 async fn eval(
74 &self,
75 scenario: Option<String>,
76 scenario_file: Option<String>,
77 scenario_name: Option<String>,
78 strategy: &str,
79 strategy_opts: Option<serde_json::Value>,
80 auto_card: bool,
81 ) -> Result<String, String> {
82 AppService::eval(
83 self,
84 scenario,
85 scenario_file,
86 scenario_name,
87 strategy,
88 strategy_opts,
89 auto_card,
90 )
91 .await
92 }
93
94 async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95 AppService::eval_history(self, strategy, limit)
96 }
97
98 async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99 AppService::eval_detail(self, eval_id)
100 }
101
102 async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103 AppService::eval_compare(self, eval_id_a, eval_id_b).await
104 }
105
106 async fn scenario_list(&self) -> Result<String, String> {
109 AppService::scenario_list(self)
110 }
111
112 async fn scenario_show(&self, name: &str) -> Result<String, String> {
113 AppService::scenario_show(self, name)
114 }
115
116 async fn scenario_install(&self, url: String) -> Result<String, String> {
117 AppService::scenario_install(self, url).await
118 }
119
120 async fn pkg_link(
123 &self,
124 path: String,
125 name: Option<String>,
126 force: Option<bool>,
127 scope: Option<String>,
128 project_root: Option<String>,
129 ) -> Result<String, String> {
130 AppService::pkg_link(self, path, name, force, scope, project_root).await
131 }
132
133 async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134 AppService::pkg_unlink(self, name).await
135 }
136
137 #[allow(clippy::too_many_arguments)]
138 async fn pkg_list(
139 &self,
140 project_root: Option<String>,
141 limit: Option<i32>,
142 sort: Option<String>,
143 filter: Option<serde_json::Value>,
144 fields: Option<Vec<String>>,
145 verbose: Option<String>,
146 ) -> Result<String, String> {
147 let filter_map = match filter {
153 None => None,
154 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155 Ok(map) => Some(map),
156 Err(e) => {
157 tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158 None
159 }
160 },
161 };
162
163 let opts = ListOpts {
168 limit: limit.map(|n| n.max(0) as usize),
169 sort,
170 filter: filter_map,
171 fields,
172 verbose,
173 };
174
175 AppService::pkg_list(self, project_root, opts)
176 .await
177 .map_err(|e| e.to_string())
178 }
179
180 async fn pkg_install(
181 &self,
182 url: String,
183 name: Option<String>,
184 force: Option<bool>,
185 ) -> Result<String, String> {
186 AppService::pkg_install(self, url, name, force).await
187 }
188
189 async fn pkg_remove(
190 &self,
191 name: &str,
192 project_root: Option<String>,
193 version: Option<String>,
194 scope: Option<String>,
195 ) -> Result<String, String> {
196 AppService::pkg_remove(self, name, project_root, version, scope).await
197 }
198
199 async fn pkg_repair(
200 &self,
201 name: Option<String>,
202 project_root: Option<String>,
203 ) -> Result<String, String> {
204 AppService::pkg_repair(self, name, project_root).await
205 }
206
207 async fn pkg_doctor(
208 &self,
209 name: Option<String>,
210 project_root: Option<String>,
211 ) -> Result<String, String> {
212 AppService::pkg_doctor(self, name, project_root).await
213 }
214
215 #[allow(clippy::too_many_arguments)]
219 async fn pkg_test(
220 &self,
221 pkg: Option<String>,
222 code_file: Option<String>,
223 code: Option<String>,
224 spec_dir: Option<String>,
225 filter: Option<String>,
226 search_paths: Option<Vec<String>>,
227 project_root: Option<String>,
228 auto_search_paths: Option<bool>,
229 ) -> Result<String, String> {
230 AppService::pkg_test(
231 self,
232 pkg,
233 code_file,
234 code,
235 spec_dir,
236 filter,
237 search_paths,
238 project_root,
239 auto_search_paths,
240 )
241 .await
242 }
243
244 async fn add_note(
247 &self,
248 session_id: &str,
249 content: &str,
250 title: Option<&str>,
251 ) -> Result<String, String> {
252 AppService::add_note(self, session_id, content, title).await
253 }
254
255 async fn log_view(
256 &self,
257 session_id: Option<&str>,
258 limit: Option<usize>,
259 max_chars: Option<usize>,
260 ) -> Result<String, String> {
261 AppService::log_view(self, session_id, limit, max_chars).await
262 }
263
264 async fn stats(
265 &self,
266 strategy_filter: Option<&str>,
267 days: Option<u64>,
268 ) -> Result<String, String> {
269 AppService::stats(self, strategy_filter, days)
270 }
271
272 async fn init(&self, project_root: Option<String>) -> Result<String, String> {
275 AppService::init(self, project_root).await
276 }
277
278 async fn update(&self, project_root: Option<String>) -> Result<String, String> {
279 AppService::update(self, project_root).await
280 }
281
282 async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
283 AppService::migrate(self, project_root).await
284 }
285
286 async fn session_new(
289 &self,
290 project_root: Option<String>,
291 mode: Option<String>,
292 ) -> Result<String, String> {
293 let session = self.activate_session(project_root.as_deref(), mode.as_deref())?;
294 let result = serde_json::json!({
295 "session_id": session.session_id,
296 "project_root": session
297 .project_root
298 .as_ref()
299 .map(|p| p.to_string_lossy().to_string()),
300 "mode": session.mode.as_str(),
301 });
302 serde_json::to_string_pretty(&result).map_err(|e| e.to_string())
303 }
304
305 async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
308 AppService::card_list(self, pkg.as_deref())
309 }
310
311 async fn card_get(&self, card_id: &str) -> Result<String, String> {
312 AppService::card_get(self, card_id)
313 }
314
315 async fn card_find(
316 &self,
317 pkg: Option<String>,
318 where_: Option<serde_json::Value>,
319 order_by: Option<serde_json::Value>,
320 limit: Option<usize>,
321 offset: Option<usize>,
322 ) -> Result<String, String> {
323 AppService::card_find(self, pkg, where_, order_by, limit, offset)
324 }
325
326 async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
327 AppService::card_alias_list(self, pkg.as_deref())
328 }
329
330 async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
331 AppService::card_get_by_alias(self, name)
332 }
333
334 async fn card_alias_set(
335 &self,
336 name: &str,
337 card_id: &str,
338 pkg: Option<String>,
339 note: Option<String>,
340 ) -> Result<String, String> {
341 AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
342 }
343
344 async fn card_append(
345 &self,
346 card_id: &str,
347 fields: serde_json::Value,
348 ) -> Result<String, String> {
349 AppService::card_append(self, card_id, fields)
350 }
351
352 async fn card_install(&self, url: String) -> Result<String, String> {
353 AppService::card_install(self, url).await
354 }
355
356 async fn card_samples(
357 &self,
358 card_id: &str,
359 offset: Option<usize>,
360 limit: Option<usize>,
361 where_: Option<serde_json::Value>,
362 ) -> Result<String, String> {
363 AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
364 }
365
366 async fn card_lineage(
367 &self,
368 card_id: &str,
369 direction: Option<String>,
370 depth: Option<usize>,
371 include_stats: Option<bool>,
372 relation_filter: Option<Vec<String>>,
373 ) -> Result<String, String> {
374 AppService::card_lineage(
375 self,
376 card_id,
377 direction.as_deref(),
378 depth,
379 include_stats,
380 relation_filter,
381 )
382 }
383
384 async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
385 AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
386 }
387
388 async fn card_analyze(&self, card_id: &str, pkg: Option<String>) -> Result<String, String> {
389 AppService::card_analyze(self, card_id, pkg).await
390 }
391
392 async fn hub_reindex(
395 &self,
396 output_path: Option<String>,
397 source_dir: Option<String>,
398 ) -> Result<String, String> {
399 self.hub_reindex(output_path.as_deref(), source_dir.as_deref())
400 .await
401 }
402
403 async fn hub_gendoc(
404 &self,
405 source_dir: String,
406 out_dir: Option<String>,
407 projections: Option<Vec<String>>,
408 config_path: Option<String>,
409 lint_strict: Option<bool>,
410 ) -> Result<String, String> {
411 let svc = self.clone();
412 tokio::task::spawn_blocking(move || {
413 crate::AppService::hub_gendoc(
414 &svc,
415 &source_dir,
416 out_dir.as_deref(),
417 projections.as_deref(),
418 config_path.as_deref(),
419 lint_strict,
420 )
421 })
422 .await
423 .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
424 }
425
426 async fn hub_dist(
427 &self,
428 source_dir: String,
429 output_path: Option<String>,
430 out_dir: Option<String>,
431 preset: Option<String>,
432 project_root: Option<String>,
433 projections: Option<Vec<String>>,
434 config_path: Option<String>,
435 lint_strict: Option<bool>,
436 ) -> Result<String, String> {
437 self.hub_dist(
438 &source_dir,
439 output_path.as_deref(),
440 out_dir.as_deref(),
441 preset.as_deref(),
442 project_root.as_deref(),
443 projections.as_deref(),
444 config_path.as_deref(),
445 lint_strict,
446 )
447 .await
448 }
449
450 async fn hub_info(&self, pkg: String) -> Result<String, String> {
451 let svc = self.clone();
452 tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
453 .await
454 .map_err(|e| format!("hub_info task panicked: {e}"))?
455 }
456
457 #[allow(clippy::too_many_arguments)]
458 async fn hub_search(
459 &self,
460 query: Option<String>,
461 category: Option<String>,
462 installed_only: Option<bool>,
463 limit: Option<i32>,
464 sort: Option<String>,
465 filter: Option<serde_json::Value>,
466 fields: Option<Vec<String>>,
467 verbose: Option<String>,
468 local_indices: Option<Vec<String>>,
469 ) -> Result<String, String> {
470 let svc = self.clone();
471
472 let filter_map = match filter {
480 None => None,
481 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
482 Ok(map) => Some(map),
483 Err(e) => {
484 tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
485 None
486 }
487 },
488 };
489
490 let opts = ListOpts {
495 limit: limit.map(|n| n.max(0) as usize),
496 sort,
497 filter: filter_map,
498 fields,
499 verbose,
500 };
501
502 tokio::task::spawn_blocking(move || {
503 AppService::hub_search(
504 &svc,
505 query.as_deref(),
506 category.as_deref(),
507 installed_only,
508 opts,
509 local_indices,
510 )
511 })
512 .await
513 .map_err(|e| format!("hub_search task panicked: {e}"))?
514 }
515
516 async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
519 AppService::pkg_read_init_lua(self, name, None)
520 }
521
522 async fn pkg_get_narrative_md(&self, name: &str) -> Result<Option<String>, String> {
523 AppService::pkg_get_narrative_md(self, name).await
524 }
525
526 async fn pkg_meta(&self, name: &str) -> Result<String, String> {
527 let filter = serde_json::json!({ "name": name });
528 let json_str = EngineApi::pkg_list(
529 self,
530 None,
531 None,
532 None,
533 Some(filter),
534 None,
535 Some("full".to_string()),
536 )
537 .await?;
538 let val: serde_json::Value = serde_json::from_str(&json_str)
539 .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
540 let pkgs = val
541 .get("packages")
542 .and_then(|p| p.as_array())
543 .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
544 if pkgs.is_empty() {
545 return Err(format!("pkg not found: {name}"));
546 }
547 serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
548 }
549
550 async fn pkg_scaffold(
553 &self,
554 name: String,
555 target_dir: Option<String>,
556 category: Option<String>,
557 description: Option<String>,
558 ) -> Result<String, String> {
559 let svc = self.clone();
560 tokio::task::spawn_blocking(move || {
561 AppService::pkg_scaffold(
562 &svc,
563 &name,
564 target_dir.as_deref(),
565 category.as_deref(),
566 description.as_deref(),
567 )
568 })
569 .await
570 .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
571 }
572
573 async fn hub_index_aggregate(&self) -> Result<String, String> {
583 let svc = self.clone();
584 let (index, warnings) = tokio::task::spawn_blocking(move || {
585 AppService::aggregate_index(&svc).map_err(|e| e.to_string())
586 })
587 .await
588 .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
589
590 let mut json = serde_json::to_value(&index)
591 .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
592 if !warnings.is_empty() {
593 if let Some(obj) = json.as_object_mut() {
594 obj.insert("warnings".to_string(), serde_json::json!(warnings));
595 }
596 }
597 serde_json::to_string(&json)
598 .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
599 }
600
601 async fn setting_resolve(&self, target: Option<String>) -> Result<String, String> {
604 let app_dir = self.log_config.app_dir();
605 let project_root = self.resolve_root(None);
606 tokio::task::spawn_blocking(move || {
607 crate::service::setting::resolve_setting(
608 &app_dir,
609 project_root.as_deref(),
610 target.as_deref(),
611 )
612 .map_err(|e| e.to_string())
613 .and_then(|r| {
614 serde_json::to_string(&r).map_err(|e| format!("setting_resolve: serialize: {e}"))
615 })
616 })
617 .await
618 .map_err(|e| format!("setting_resolve: task panicked: {e}"))?
619 }
620
621 async fn info(&self) -> String {
624 AppService::info(self)
625 }
626
627 async fn pool_ensure(&self) -> Result<String, String> {
630 AppService::pool_ensure_impl(self).await
631 }
632
633 async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
634 AppService::pool_status_impl(self, sid).await
635 }
636
637 async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
638 AppService::pool_stop_impl(self, sid).await
639 }
640}
641
642impl AppService {
645 pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
650 let reg_path = self.pool_reg_path.clone();
651 let lock_path = self.pool_lock_path.clone();
652
653 let sessions =
654 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
655 with_registry_lock(&lock_path, || {
656 let mut reg = PoolRegistry::load_or_default(®_path)?;
657 let survivors = reg.scan_and_gc()?;
658 reg.save(®_path)?;
660 let entries = survivors
661 .iter()
662 .map(|e| {
663 serde_json::json!({
664 "sid": e.sid,
665 "pid": e.pid,
666 "sock": e.sock.to_string_lossy(),
667 "version": e.version,
668 })
669 })
670 .collect::<Vec<_>>();
671 Ok(entries)
672 })
673 })
674 .await
675 .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
676 .map_err(|e| e.to_string())?;
677
678 let pool_version = env!("CARGO_PKG_VERSION");
679 serde_json::to_string(&serde_json::json!({
680 "sessions": sessions,
681 "pool_version": pool_version,
682 }))
683 .map_err(|e| format!("pool_ensure: serialize: {e}"))
684 }
685
686 pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
691 let reg_path = self.pool_reg_path.clone();
692 let lock_path = self.pool_lock_path.clone();
693
694 let sessions =
695 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
696 with_registry_lock(&lock_path, || {
697 let mut reg = PoolRegistry::load_or_default(®_path)?;
698 let _ = reg.scan_and_gc()?;
700 reg.save(®_path)?;
701
702 let entries: Vec<serde_json::Value> = reg
703 .sessions
704 .iter()
705 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
706 .map(|e| {
707 serde_json::json!({
708 "sid": e.sid,
709 "pid": e.pid,
710 "sock": e.sock.to_string_lossy(),
711 "version": e.version,
712 "created_at": e.created_at,
713 "status": "running",
715 })
716 })
717 .collect();
718 Ok(entries)
719 })
720 })
721 .await
722 .map_err(|e| format!("pool_status: task panicked: {e}"))?
723 .map_err(|e| e.to_string())?;
724
725 let pool_version = env!("CARGO_PKG_VERSION");
726 serde_json::to_string(&serde_json::json!({
727 "sessions": sessions,
728 "pool_version": pool_version,
729 }))
730 .map_err(|e| format!("pool_status: serialize: {e}"))
731 }
732
733 pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
739 let reg_path = self.pool_reg_path.clone();
740 let lock_path = self.pool_lock_path.clone();
741
742 let result = tokio::task::spawn_blocking(
743 move || -> Result<(Vec<String>, Vec<String>), PoolError> {
744 with_registry_lock(&lock_path, || {
745 let mut reg = PoolRegistry::load_or_default(®_path)?;
746
747 let targets: Vec<_> = reg
749 .sessions
750 .iter()
751 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
752 .cloned()
753 .collect();
754
755 let mut stopped: Vec<String> = Vec::new();
756 let mut errors: Vec<String> = Vec::new();
757
758 for entry in &targets {
759 #[cfg(unix)]
760 {
761 let pid_t = match i32::try_from(entry.pid) {
765 Ok(p) if p > 0 => p,
766 Ok(_) => {
767 errors.push(format!(
768 "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
769 entry.sid, entry.pid
770 ));
771 reg.remove(&entry.sid);
772 continue;
773 }
774 Err(_) => {
775 errors.push(format!(
776 "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
777 entry.sid, entry.pid
778 ));
779 reg.remove(&entry.sid);
781 continue;
782 }
783 };
784
785 let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
789 if ret == 0 {
790 stopped.push(entry.sid.clone());
791 } else {
792 let os_err = std::io::Error::last_os_error();
793 if os_err.raw_os_error() == Some(libc::ESRCH) {
794 stopped.push(entry.sid.clone());
796 } else {
797 errors.push(format!(
798 "sid={}: SIGTERM failed: {}",
799 entry.sid, os_err
800 ));
801 }
802 }
803 }
804 #[cfg(not(unix))]
805 {
806 errors.push(format!(
808 "sid={}: SIGTERM not supported on this platform",
809 entry.sid
810 ));
811 }
812 reg.remove(&entry.sid);
815 }
816
817 reg.save(®_path)?;
819
820 Ok((stopped, errors))
821 })
822 },
823 )
824 .await
825 .map_err(|e| format!("pool_stop: task panicked: {e}"))?
826 .map_err(|e| e.to_string())?;
827
828 let (stopped, errors) = result;
829 serde_json::to_string(&serde_json::json!({
830 "stopped": stopped,
831 "errors": errors,
832 }))
833 .map_err(|e| format!("pool_stop: serialize: {e}"))
834 }
835}
836
837#[cfg(test)]
840mod tests {
841 use super::super::test_support::make_app_service_at;
842
843 #[tokio::test]
850 #[cfg(unix)]
851 async fn pool_stop_pid_zero_is_rejected() {
852 let tmp = tempfile::tempdir().expect("tempdir");
855 let root = tmp.path().to_path_buf();
856 let svc = make_app_service_at(root.clone()).await;
857
858 let pool_reg_path = root.join("state").join("pool").join("registry.json");
861 std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
862
863 let seeded = serde_json::json!({
864 "sessions": [{
865 "sid": "zero-pid-session",
866 "pid": 0u32,
867 "sock": "/tmp/alc-pool/zero.sock",
868 "version": "0.30.0",
869 "created_at": "2026-01-01T00:00:00Z"
870 }]
871 });
872 std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
873
874 let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
876 let result: serde_json::Value =
877 serde_json::from_str(&json_str).expect("response is valid JSON");
878
879 let errors = result["errors"].as_array().expect("errors array");
881 assert!(
882 !errors.is_empty(),
883 "expected at least one error for pid=0 entry"
884 );
885 let err_msg = errors[0].as_str().unwrap_or("");
886 assert!(
887 err_msg.contains("not a valid POSIX target pid"),
888 "unexpected error message: {err_msg}"
889 );
890
891 let stopped = result["stopped"].as_array().expect("stopped array");
893 assert!(
894 stopped.is_empty(),
895 "pid=0 entry must not appear in stopped list"
896 );
897
898 let on_disk: serde_json::Value =
900 serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
901 .expect("parse registry");
902 let sessions = on_disk["sessions"].as_array().expect("sessions array");
903 assert!(
904 sessions.is_empty(),
905 "pid=0 entry must be removed from on-disk registry"
906 );
907
908 }
911}