1use std::collections::HashMap;
2
3use algocline_core::{EngineApi, QueryResponse};
4use async_trait::async_trait;
5
6use crate::pool::{registry::with_registry_lock, PoolError, PoolRegistry};
7
8use super::list_opts::ListOpts;
9use super::AppService;
10
11#[async_trait]
18impl EngineApi for AppService {
19 async fn run(
22 &self,
23 code: Option<String>,
24 code_file: Option<String>,
25 ctx: Option<serde_json::Value>,
26 project_root: Option<String>,
27 host_mode: Option<bool>,
28 ) -> Result<String, String> {
29 AppService::run(self, code, code_file, ctx, project_root, host_mode).await
30 }
31
32 async fn advice(
33 &self,
34 strategy: &str,
35 task: Option<String>,
36 opts: Option<serde_json::Value>,
37 project_root: Option<String>,
38 ) -> Result<String, String> {
39 AppService::advice(self, strategy, task, opts, project_root).await
40 }
41
42 async fn continue_single(
43 &self,
44 session_id: &str,
45 response: String,
46 query_id: Option<&str>,
47 usage: Option<algocline_core::TokenUsage>,
48 ) -> Result<String, String> {
49 AppService::continue_single(self, session_id, response, query_id, usage).await
50 }
51
52 async fn continue_batch(
53 &self,
54 session_id: &str,
55 responses: Vec<QueryResponse>,
56 ) -> Result<String, String> {
57 AppService::continue_batch(self, session_id, responses).await
58 }
59
60 async fn status(
63 &self,
64 session_id: Option<&str>,
65 pending_filter: Option<serde_json::Value>,
66 include_history: bool,
67 ) -> Result<String, String> {
68 AppService::status(self, session_id, pending_filter, include_history).await
69 }
70
71 async fn eval(
74 &self,
75 scenario: Option<String>,
76 scenario_file: Option<String>,
77 scenario_name: Option<String>,
78 strategy: &str,
79 strategy_opts: Option<serde_json::Value>,
80 auto_card: bool,
81 ) -> Result<String, String> {
82 AppService::eval(
83 self,
84 scenario,
85 scenario_file,
86 scenario_name,
87 strategy,
88 strategy_opts,
89 auto_card,
90 )
91 .await
92 }
93
94 async fn eval_history(&self, strategy: Option<&str>, limit: usize) -> Result<String, String> {
95 AppService::eval_history(self, strategy, limit)
96 }
97
98 async fn eval_detail(&self, eval_id: &str) -> Result<String, String> {
99 AppService::eval_detail(self, eval_id)
100 }
101
102 async fn eval_compare(&self, eval_id_a: &str, eval_id_b: &str) -> Result<String, String> {
103 AppService::eval_compare(self, eval_id_a, eval_id_b).await
104 }
105
106 async fn scenario_list(&self) -> Result<String, String> {
109 AppService::scenario_list(self)
110 }
111
112 async fn scenario_show(&self, name: &str) -> Result<String, String> {
113 AppService::scenario_show(self, name)
114 }
115
116 async fn scenario_install(&self, url: String) -> Result<String, String> {
117 AppService::scenario_install(self, url).await
118 }
119
120 async fn pkg_link(
123 &self,
124 path: String,
125 name: Option<String>,
126 force: Option<bool>,
127 scope: Option<String>,
128 project_root: Option<String>,
129 ) -> Result<String, String> {
130 AppService::pkg_link(self, path, name, force, scope, project_root).await
131 }
132
133 async fn pkg_unlink(&self, name: String) -> Result<String, String> {
134 AppService::pkg_unlink(self, name).await
135 }
136
137 #[allow(clippy::too_many_arguments)]
138 async fn pkg_list(
139 &self,
140 project_root: Option<String>,
141 limit: Option<i32>,
142 sort: Option<String>,
143 filter: Option<serde_json::Value>,
144 fields: Option<Vec<String>>,
145 verbose: Option<String>,
146 ) -> Result<String, String> {
147 let filter_map = match filter {
153 None => None,
154 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
155 Ok(map) => Some(map),
156 Err(e) => {
157 tracing::warn!(error = %e, "pkg_list: filter value is not a JSON object — treating as no filter");
158 None
159 }
160 },
161 };
162
163 let opts = ListOpts {
168 limit: limit.map(|n| n.max(0) as usize),
169 sort,
170 filter: filter_map,
171 fields,
172 verbose,
173 };
174
175 AppService::pkg_list(self, project_root, opts)
176 .await
177 .map_err(|e| e.to_string())
178 }
179
180 async fn pkg_install(&self, url: String, name: Option<String>) -> Result<String, String> {
181 AppService::pkg_install(self, url, name).await
182 }
183
184 async fn pkg_remove(
185 &self,
186 name: &str,
187 project_root: Option<String>,
188 version: Option<String>,
189 scope: Option<String>,
190 ) -> Result<String, String> {
191 AppService::pkg_remove(self, name, project_root, version, scope).await
192 }
193
194 async fn pkg_repair(
195 &self,
196 name: Option<String>,
197 project_root: Option<String>,
198 ) -> Result<String, String> {
199 AppService::pkg_repair(self, name, project_root).await
200 }
201
202 async fn pkg_doctor(
203 &self,
204 name: Option<String>,
205 project_root: Option<String>,
206 ) -> Result<String, String> {
207 AppService::pkg_doctor(self, name, project_root).await
208 }
209
210 async fn add_note(
213 &self,
214 session_id: &str,
215 content: &str,
216 title: Option<&str>,
217 ) -> Result<String, String> {
218 AppService::add_note(self, session_id, content, title).await
219 }
220
221 async fn log_view(
222 &self,
223 session_id: Option<&str>,
224 limit: Option<usize>,
225 max_chars: Option<usize>,
226 ) -> Result<String, String> {
227 AppService::log_view(self, session_id, limit, max_chars).await
228 }
229
230 async fn stats(
231 &self,
232 strategy_filter: Option<&str>,
233 days: Option<u64>,
234 ) -> Result<String, String> {
235 AppService::stats(self, strategy_filter, days)
236 }
237
238 async fn init(&self, project_root: Option<String>) -> Result<String, String> {
241 AppService::init(self, project_root).await
242 }
243
244 async fn update(&self, project_root: Option<String>) -> Result<String, String> {
245 AppService::update(self, project_root).await
246 }
247
248 async fn migrate(&self, project_root: Option<String>) -> Result<String, String> {
249 AppService::migrate(self, project_root).await
250 }
251
252 async fn card_list(&self, pkg: Option<String>) -> Result<String, String> {
255 AppService::card_list(self, pkg.as_deref())
256 }
257
258 async fn card_get(&self, card_id: &str) -> Result<String, String> {
259 AppService::card_get(self, card_id)
260 }
261
262 async fn card_find(
263 &self,
264 pkg: Option<String>,
265 where_: Option<serde_json::Value>,
266 order_by: Option<serde_json::Value>,
267 limit: Option<usize>,
268 offset: Option<usize>,
269 ) -> Result<String, String> {
270 AppService::card_find(self, pkg, where_, order_by, limit, offset)
271 }
272
273 async fn card_alias_list(&self, pkg: Option<String>) -> Result<String, String> {
274 AppService::card_alias_list(self, pkg.as_deref())
275 }
276
277 async fn card_get_by_alias(&self, name: &str) -> Result<String, String> {
278 AppService::card_get_by_alias(self, name)
279 }
280
281 async fn card_alias_set(
282 &self,
283 name: &str,
284 card_id: &str,
285 pkg: Option<String>,
286 note: Option<String>,
287 ) -> Result<String, String> {
288 AppService::card_alias_set(self, name, card_id, pkg.as_deref(), note.as_deref())
289 }
290
291 async fn card_append(
292 &self,
293 card_id: &str,
294 fields: serde_json::Value,
295 ) -> Result<String, String> {
296 AppService::card_append(self, card_id, fields)
297 }
298
299 async fn card_install(&self, url: String) -> Result<String, String> {
300 AppService::card_install(self, url).await
301 }
302
303 async fn card_samples(
304 &self,
305 card_id: &str,
306 offset: Option<usize>,
307 limit: Option<usize>,
308 where_: Option<serde_json::Value>,
309 ) -> Result<String, String> {
310 AppService::card_samples(self, card_id, offset.unwrap_or(0), limit, where_)
311 }
312
313 async fn card_lineage(
314 &self,
315 card_id: &str,
316 direction: Option<String>,
317 depth: Option<usize>,
318 include_stats: Option<bool>,
319 relation_filter: Option<Vec<String>>,
320 ) -> Result<String, String> {
321 AppService::card_lineage(
322 self,
323 card_id,
324 direction.as_deref(),
325 depth,
326 include_stats,
327 relation_filter,
328 )
329 }
330
331 async fn card_sink_backfill(&self, sink: String, dry_run: bool) -> Result<String, String> {
332 AppService::card_sink_backfill(self, super::card::SinkBackfillParams { sink, dry_run })
333 }
334
335 async fn hub_reindex(
338 &self,
339 output_path: Option<String>,
340 source_dir: Option<String>,
341 ) -> Result<String, String> {
342 let svc = self.clone();
343 tokio::task::spawn_blocking(move || {
344 AppService::hub_reindex(&svc, output_path.as_deref(), source_dir.as_deref())
345 })
346 .await
347 .map_err(|e| format!("hub_reindex task panicked: {e}"))?
348 }
349
350 async fn hub_gendoc(
351 &self,
352 source_dir: String,
353 out_dir: Option<String>,
354 projections: Option<Vec<String>>,
355 config_path: Option<String>,
356 lint_strict: Option<bool>,
357 ) -> Result<String, String> {
358 let svc = self.clone();
359 tokio::task::spawn_blocking(move || {
360 crate::AppService::hub_gendoc(
361 &svc,
362 &source_dir,
363 out_dir.as_deref(),
364 projections.as_deref(),
365 config_path.as_deref(),
366 lint_strict,
367 )
368 })
369 .await
370 .map_err(|e| format!("hub_gendoc task panicked: {e}"))?
371 }
372
373 async fn hub_dist(
374 &self,
375 source_dir: String,
376 output_path: Option<String>,
377 out_dir: Option<String>,
378 preset: Option<String>,
379 project_root: Option<String>,
380 projections: Option<Vec<String>>,
381 config_path: Option<String>,
382 lint_strict: Option<bool>,
383 ) -> Result<String, String> {
384 let svc = self.clone();
385 tokio::task::spawn_blocking(move || {
386 AppService::hub_dist(
387 &svc,
388 &source_dir,
389 output_path.as_deref(),
390 out_dir.as_deref(),
391 preset.as_deref(),
392 project_root.as_deref(),
393 projections.as_deref(),
394 config_path.as_deref(),
395 lint_strict,
396 )
397 })
398 .await
399 .map_err(|e| format!("hub_dist task panicked: {e}"))?
400 }
401
402 async fn hub_info(&self, pkg: String) -> Result<String, String> {
403 let svc = self.clone();
404 tokio::task::spawn_blocking(move || AppService::hub_info(&svc, &pkg))
405 .await
406 .map_err(|e| format!("hub_info task panicked: {e}"))?
407 }
408
409 #[allow(clippy::too_many_arguments)]
410 async fn hub_search(
411 &self,
412 query: Option<String>,
413 category: Option<String>,
414 installed_only: Option<bool>,
415 limit: Option<i32>,
416 sort: Option<String>,
417 filter: Option<serde_json::Value>,
418 fields: Option<Vec<String>>,
419 verbose: Option<String>,
420 ) -> Result<String, String> {
421 let svc = self.clone();
422
423 let filter_map = match filter {
431 None => None,
432 Some(v) => match serde_json::from_value::<HashMap<String, serde_json::Value>>(v) {
433 Ok(map) => Some(map),
434 Err(e) => {
435 tracing::warn!(error = %e, "hub_search: filter value is not a JSON object — treating as no filter");
436 None
437 }
438 },
439 };
440
441 let opts = ListOpts {
446 limit: limit.map(|n| n.max(0) as usize),
447 sort,
448 filter: filter_map,
449 fields,
450 verbose,
451 };
452
453 tokio::task::spawn_blocking(move || {
454 AppService::hub_search(
455 &svc,
456 query.as_deref(),
457 category.as_deref(),
458 installed_only,
459 opts,
460 )
461 })
462 .await
463 .map_err(|e| format!("hub_search task panicked: {e}"))?
464 }
465
466 async fn pkg_read_init_lua(&self, name: &str) -> Result<String, String> {
469 AppService::pkg_read_init_lua(self, name, None)
470 }
471
472 async fn pkg_meta(&self, name: &str) -> Result<String, String> {
473 let filter = serde_json::json!({ "name": name });
474 let json_str = EngineApi::pkg_list(
475 self,
476 None,
477 None,
478 None,
479 Some(filter),
480 None,
481 Some("full".to_string()),
482 )
483 .await?;
484 let val: serde_json::Value = serde_json::from_str(&json_str)
485 .map_err(|e| format!("pkg_meta: failed to parse pkg_list response: {e}"))?;
486 let pkgs = val
487 .get("packages")
488 .and_then(|p| p.as_array())
489 .ok_or_else(|| "pkg_meta: pkg_list response missing 'packages' field".to_string())?;
490 if pkgs.is_empty() {
491 return Err(format!("pkg not found: {name}"));
492 }
493 serde_json::to_string(&pkgs[0]).map_err(|e| format!("pkg_meta: serialize entry: {e}"))
494 }
495
496 async fn pkg_scaffold(
499 &self,
500 name: String,
501 target_dir: Option<String>,
502 category: Option<String>,
503 description: Option<String>,
504 ) -> Result<String, String> {
505 let svc = self.clone();
506 tokio::task::spawn_blocking(move || {
507 AppService::pkg_scaffold(
508 &svc,
509 &name,
510 target_dir.as_deref(),
511 category.as_deref(),
512 description.as_deref(),
513 )
514 })
515 .await
516 .map_err(|e| format!("pkg_scaffold task panicked: {e}"))?
517 }
518
519 async fn hub_index_aggregate(&self) -> Result<String, String> {
529 let svc = self.clone();
530 let (index, warnings) = tokio::task::spawn_blocking(move || {
531 AppService::aggregate_index(&svc).map_err(|e| e.to_string())
532 })
533 .await
534 .map_err(|e| format!("hub_index_aggregate task panicked: {e}"))??;
535
536 let mut json = serde_json::to_value(&index)
537 .map_err(|e| format!("hub_index_aggregate: serialize index: {e}"))?;
538 if !warnings.is_empty() {
539 if let Some(obj) = json.as_object_mut() {
540 obj.insert("warnings".to_string(), serde_json::json!(warnings));
541 }
542 }
543 serde_json::to_string(&json)
544 .map_err(|e| format!("hub_index_aggregate: serialize final: {e}"))
545 }
546
547 async fn info(&self) -> String {
550 AppService::info(self)
551 }
552
553 async fn pool_ensure(&self) -> Result<String, String> {
556 AppService::pool_ensure_impl(self).await
557 }
558
559 async fn pool_status(&self, sid: Option<String>) -> Result<String, String> {
560 AppService::pool_status_impl(self, sid).await
561 }
562
563 async fn pool_stop(&self, sid: Option<String>) -> Result<String, String> {
564 AppService::pool_stop_impl(self, sid).await
565 }
566}
567
568impl AppService {
571 pub(crate) async fn pool_ensure_impl(&self) -> Result<String, String> {
576 let reg_path = self.pool_reg_path.clone();
577 let lock_path = self.pool_lock_path.clone();
578
579 let sessions =
580 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
581 with_registry_lock(&lock_path, || {
582 let mut reg = PoolRegistry::load_or_default(®_path)?;
583 let survivors = reg.scan_and_gc()?;
584 reg.save(®_path)?;
586 let entries = survivors
587 .iter()
588 .map(|e| {
589 serde_json::json!({
590 "sid": e.sid,
591 "pid": e.pid,
592 "sock": e.sock.to_string_lossy(),
593 "version": e.version,
594 })
595 })
596 .collect::<Vec<_>>();
597 Ok(entries)
598 })
599 })
600 .await
601 .map_err(|e| format!("pool_ensure: task panicked: {e}"))?
602 .map_err(|e| e.to_string())?;
603
604 let pool_version = env!("CARGO_PKG_VERSION");
605 serde_json::to_string(&serde_json::json!({
606 "sessions": sessions,
607 "pool_version": pool_version,
608 }))
609 .map_err(|e| format!("pool_ensure: serialize: {e}"))
610 }
611
612 pub(crate) async fn pool_status_impl(&self, sid: Option<String>) -> Result<String, String> {
617 let reg_path = self.pool_reg_path.clone();
618 let lock_path = self.pool_lock_path.clone();
619
620 let sessions =
621 tokio::task::spawn_blocking(move || -> Result<Vec<serde_json::Value>, PoolError> {
622 with_registry_lock(&lock_path, || {
623 let mut reg = PoolRegistry::load_or_default(®_path)?;
624 let _ = reg.scan_and_gc()?;
626 reg.save(®_path)?;
627
628 let entries: Vec<serde_json::Value> = reg
629 .sessions
630 .iter()
631 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
632 .map(|e| {
633 serde_json::json!({
634 "sid": e.sid,
635 "pid": e.pid,
636 "sock": e.sock.to_string_lossy(),
637 "version": e.version,
638 "created_at": e.created_at,
639 "status": "running",
641 })
642 })
643 .collect();
644 Ok(entries)
645 })
646 })
647 .await
648 .map_err(|e| format!("pool_status: task panicked: {e}"))?
649 .map_err(|e| e.to_string())?;
650
651 let pool_version = env!("CARGO_PKG_VERSION");
652 serde_json::to_string(&serde_json::json!({
653 "sessions": sessions,
654 "pool_version": pool_version,
655 }))
656 .map_err(|e| format!("pool_status: serialize: {e}"))
657 }
658
659 pub(crate) async fn pool_stop_impl(&self, sid: Option<String>) -> Result<String, String> {
665 let reg_path = self.pool_reg_path.clone();
666 let lock_path = self.pool_lock_path.clone();
667
668 let result = tokio::task::spawn_blocking(
669 move || -> Result<(Vec<String>, Vec<String>), PoolError> {
670 with_registry_lock(&lock_path, || {
671 let mut reg = PoolRegistry::load_or_default(®_path)?;
672
673 let targets: Vec<_> = reg
675 .sessions
676 .iter()
677 .filter(|e| sid.as_deref().map(|s| e.sid == s).unwrap_or(true))
678 .cloned()
679 .collect();
680
681 let mut stopped: Vec<String> = Vec::new();
682 let mut errors: Vec<String> = Vec::new();
683
684 for entry in &targets {
685 #[cfg(unix)]
686 {
687 let pid_t = match i32::try_from(entry.pid) {
691 Ok(p) if p > 0 => p,
692 Ok(_) => {
693 errors.push(format!(
694 "sid={}: pid={} is not a valid POSIX target pid (must be > 0); skipping SIGTERM",
695 entry.sid, entry.pid
696 ));
697 reg.remove(&entry.sid);
698 continue;
699 }
700 Err(_) => {
701 errors.push(format!(
702 "sid={}: pid={} exceeds i32::MAX, cannot send SIGTERM (K-52)",
703 entry.sid, entry.pid
704 ));
705 reg.remove(&entry.sid);
707 continue;
708 }
709 };
710
711 let ret = unsafe { libc::kill(pid_t, libc::SIGTERM) };
715 if ret == 0 {
716 stopped.push(entry.sid.clone());
717 } else {
718 let os_err = std::io::Error::last_os_error();
719 if os_err.raw_os_error() == Some(libc::ESRCH) {
720 stopped.push(entry.sid.clone());
722 } else {
723 errors.push(format!(
724 "sid={}: SIGTERM failed: {}",
725 entry.sid, os_err
726 ));
727 }
728 }
729 }
730 #[cfg(not(unix))]
731 {
732 errors.push(format!(
734 "sid={}: SIGTERM not supported on this platform",
735 entry.sid
736 ));
737 }
738 reg.remove(&entry.sid);
741 }
742
743 reg.save(®_path)?;
745
746 Ok((stopped, errors))
747 })
748 },
749 )
750 .await
751 .map_err(|e| format!("pool_stop: task panicked: {e}"))?
752 .map_err(|e| e.to_string())?;
753
754 let (stopped, errors) = result;
755 serde_json::to_string(&serde_json::json!({
756 "stopped": stopped,
757 "errors": errors,
758 }))
759 .map_err(|e| format!("pool_stop: serialize: {e}"))
760 }
761}
762
763#[cfg(test)]
766mod tests {
767 use super::super::test_support::make_app_service_at;
768
769 #[tokio::test]
776 #[cfg(unix)]
777 async fn pool_stop_pid_zero_is_rejected() {
778 let tmp = tempfile::tempdir().expect("tempdir");
781 let root = tmp.path().to_path_buf();
782 let svc = make_app_service_at(root.clone()).await;
783
784 let pool_reg_path = root.join("state").join("pool").join("registry.json");
787 std::fs::create_dir_all(pool_reg_path.parent().unwrap()).expect("create pool dir");
788
789 let seeded = serde_json::json!({
790 "sessions": [{
791 "sid": "zero-pid-session",
792 "pid": 0u32,
793 "sock": "/tmp/alc-pool/zero.sock",
794 "version": "0.30.0",
795 "created_at": "2026-01-01T00:00:00Z"
796 }]
797 });
798 std::fs::write(&pool_reg_path, seeded.to_string()).expect("seed registry.json");
799
800 let json_str = svc.pool_stop_impl(None).await.expect("pool_stop_impl");
802 let result: serde_json::Value =
803 serde_json::from_str(&json_str).expect("response is valid JSON");
804
805 let errors = result["errors"].as_array().expect("errors array");
807 assert!(
808 !errors.is_empty(),
809 "expected at least one error for pid=0 entry"
810 );
811 let err_msg = errors[0].as_str().unwrap_or("");
812 assert!(
813 err_msg.contains("not a valid POSIX target pid"),
814 "unexpected error message: {err_msg}"
815 );
816
817 let stopped = result["stopped"].as_array().expect("stopped array");
819 assert!(
820 stopped.is_empty(),
821 "pid=0 entry must not appear in stopped list"
822 );
823
824 let on_disk: serde_json::Value =
826 serde_json::from_str(&std::fs::read_to_string(&pool_reg_path).expect("read registry"))
827 .expect("parse registry");
828 let sessions = on_disk["sessions"].as_array().expect("sessions array");
829 assert!(
830 sessions.is_empty(),
831 "pid=0 entry must be removed from on-disk registry"
832 );
833
834 }
837}