1use std::sync::Arc;
2
3use algocline_core::QueryId;
4use algocline_engine::{FeedResult, VariantPkg};
5
6use super::eval_store::{splice_response_string, splice_response_warnings};
7use super::resolve::{is_package_installed, make_require_code, resolve_code, QueryResponse};
8use super::transcript::write_transcript_log;
9use super::AppService;
10use crate::pool::dispatch::{continue_via_pool, run_via_pool};
11
12pub(crate) fn normalize_stringified_json_object(v: serde_json::Value) -> serde_json::Value {
23 match v {
24 serde_json::Value::String(ref s) => match serde_json::from_str::<serde_json::Value>(s) {
25 Ok(parsed @ serde_json::Value::Object(_)) => parsed,
26 Ok(parsed @ serde_json::Value::Array(_)) => parsed,
27 _ => v,
28 },
29 other => other,
30 }
31}
32
33fn splice_save_warning(result_json: &str, warning: Option<String>) -> String {
37 match warning {
38 Some(msg) => splice_response_string(result_json, "save_warning", &msg),
39 None => result_json.to_string(),
40 }
41}
42
43fn splice_transcript_warning(result_json: &str, warning: Option<String>) -> String {
47 match warning {
48 Some(msg) => splice_response_string(result_json, "transcript_warning", &msg),
49 None => result_json.to_string(),
50 }
51}
52
53impl AppService {
54 pub async fn run(
74 &self,
75 code: Option<String>,
76 code_file: Option<String>,
77 ctx: Option<serde_json::Value>,
78 project_root: Option<String>,
79 host_mode: Option<bool>,
80 ) -> Result<String, String> {
81 let code = resolve_code(code, code_file)?;
82 let ctx = normalize_stringified_json_object(ctx.unwrap_or(serde_json::Value::Null));
83 let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
84 let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
85 let mut warnings: Vec<String> = extra_warnings;
86 warnings.extend(variant_warnings);
87
88 if host_mode == Some(true) {
89 let (session_id, json, pool_save_error) = run_via_pool(
93 &self.pool_dir,
94 &self.pool_reg_path,
95 &self.pool_lock_path,
96 extra,
97 code,
98 ctx,
99 )
100 .await
101 .map_err(|e| e.to_string())?;
102
103 let cache_reload_warning: Option<String> =
113 match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
114 Ok(reg) => {
115 let mut guard = self.pool_registry.write().await;
116 *guard = reg;
117 None
118 }
119 Err(e) => {
120 tracing::warn!(
121 error = %e,
122 "failed to reload pool registry after run; in-memory cache may be stale"
123 );
124 Some(e.to_string())
125 }
126 };
127
128 let json = splice_response_warnings(&json, "lib_path_warnings", &warnings);
129 let json = match pool_save_error {
130 Some(msg) => splice_response_string(&json, "pool_save_error", &msg),
131 None => json,
132 };
133 let json = match cache_reload_warning {
134 Some(msg) => splice_response_string(&json, "pool_cache_reload_warning", &msg),
135 None => json,
136 };
137 let _ = session_id; return Ok(json);
139 }
140
141 let json = self
143 .start_and_tick(code, ctx, None, extra, variants)
144 .await?;
145 Ok(splice_response_warnings(
146 &json,
147 "lib_path_warnings",
148 &warnings,
149 ))
150 }
151
152 pub async fn advice(
160 &self,
161 strategy: &str,
162 task: Option<String>,
163 opts: Option<serde_json::Value>,
164 project_root: Option<String>,
165 ) -> Result<String, String> {
166 let app_dir = self.log_config.app_dir();
168 if !is_package_installed(&app_dir, strategy) {
169 self.auto_install_bundled_packages().await?;
170 if !is_package_installed(&app_dir, strategy) {
171 return Err(format!(
172 "Package '{strategy}' not found after installing bundled collection. \
173 Use alc_pkg_install to install it manually."
174 ));
175 }
176 }
177
178 let code = make_require_code(strategy);
179
180 let opts = opts.map(normalize_stringified_json_object);
181 let mut ctx_map = match opts {
182 Some(serde_json::Value::Object(m)) => m,
183 _ => serde_json::Map::new(),
184 };
185 if let Some(task) = task {
186 ctx_map.insert("task".into(), serde_json::Value::String(task));
187 }
188 let ctx = serde_json::Value::Object(ctx_map);
189
190 let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
191 let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
192 let mut warnings: Vec<String> = extra_warnings;
193 warnings.extend(variant_warnings);
194 let json = self
195 .start_and_tick(code, ctx, Some(strategy), extra, variants)
196 .await?;
197 Ok(splice_response_warnings(
198 &json,
199 "lib_path_warnings",
200 &warnings,
201 ))
202 }
203
204 pub async fn continue_batch(
211 &self,
212 session_id: &str,
213 responses: Vec<QueryResponse>,
214 ) -> Result<String, String> {
215 let pool_entry = {
217 let reg = self.pool_registry.read().await;
218 reg.find(session_id).cloned()
219 };
220
221 let pool_entry = if pool_entry.is_some() {
222 pool_entry
223 } else {
224 match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
225 Ok(reg) => {
226 let entry = reg.find(session_id).cloned();
227 if entry.is_some() {
228 let mut guard = self.pool_registry.write().await;
229 *guard = reg;
230 }
231 entry
232 }
233 Err(e) => {
234 return Err(format!("Continue failed: {e}"));
235 }
236 }
237 };
238
239 if let Some(entry) = pool_entry {
240 let mut last_json = None;
242 for qr in responses {
243 let json =
244 continue_via_pool(&entry, session_id, qr.response, Some(qr.query_id), qr.usage)
245 .await
246 .map_err(|e| format!("Continue failed: {e}"))?;
247 last_json = Some(json);
248 }
249 return last_json.ok_or_else(|| "Empty responses array".to_string());
250 }
251
252 let mut last_result = None;
254 for qr in responses {
255 let qid = QueryId::parse(&qr.query_id);
256 let result = self
257 .registry
258 .feed_response(session_id, &qid, qr.response, qr.usage.as_ref())
259 .await
260 .map_err(|e| format!("Continue failed: {e}"))?;
261 last_result = Some(result);
262 }
263 let result = last_result.ok_or("Empty responses array")?;
264 let transcript_warning = self.maybe_log_transcript(&result, session_id);
265 let json = result.to_json(session_id).to_string();
266 let json = splice_transcript_warning(&json, transcript_warning);
267 let save_warning = self.maybe_save_eval(&result, session_id, &json);
268 Ok(splice_save_warning(&json, save_warning))
269 }
270
271 pub async fn continue_single(
292 &self,
293 session_id: &str,
294 response: String,
295 query_id: Option<&str>,
296 usage: Option<algocline_core::TokenUsage>,
297 ) -> Result<String, String> {
298 let pool_entry = {
301 let reg = self.pool_registry.read().await;
302 reg.find(session_id).cloned()
303 }; let pool_entry = if pool_entry.is_some() {
307 pool_entry
308 } else {
309 match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
310 Ok(reg) => {
311 let entry = reg.find(session_id).cloned();
312 if entry.is_some() {
313 let mut guard = self.pool_registry.write().await;
315 *guard = reg;
316 }
317 entry
318 }
319 Err(e) => {
320 return Err(format!("Continue failed: {e}"));
322 }
323 }
324 };
325
326 if let Some(entry) = pool_entry {
327 let json = continue_via_pool(
329 &entry,
330 session_id,
331 response,
332 query_id.map(str::to_string),
333 usage,
334 )
335 .await
336 .map_err(|e| format!("Continue failed: {e}"))?;
337 return Ok(json);
338 }
339
340 let query_id = match query_id {
342 Some(qid) => QueryId::parse(qid),
343 None => self
344 .registry
345 .resolve_sole_pending_id(session_id)
346 .await
347 .map_err(|e| format!("Continue failed: {e}"))?,
348 };
349
350 let result = self
351 .registry
352 .feed_response(session_id, &query_id, response, usage.as_ref())
353 .await
354 .map_err(|e| format!("Continue failed: {e}"))?;
355
356 let transcript_warning = self.maybe_log_transcript(&result, session_id);
357 let json = result.to_json(session_id).to_string();
358 let json = splice_transcript_warning(&json, transcript_warning);
359 let save_warning = self.maybe_save_eval(&result, session_id, &json);
360 Ok(splice_save_warning(&json, save_warning))
361 }
362
363 pub(super) fn maybe_log_transcript(
366 &self,
367 result: &FeedResult,
368 session_id: &str,
369 ) -> Option<String> {
370 if let FeedResult::Finished(exec_result) = result {
371 let strategy = match self.session_strategies.lock() {
376 Ok(mut map) => map.remove(session_id),
377 Err(e) => {
378 tracing::warn!(
379 "session_strategies mutex poisoned for '{}': {}",
380 session_id,
381 e
382 );
383 return Some(format!(
386 "session_strategies mutex poisoned for '{session_id}': {e}"
387 ));
388 }
389 };
390 match write_transcript_log(
394 &self.log_config,
395 session_id,
396 &exec_result.metrics,
397 strategy.as_deref(),
398 ) {
399 Err(e) => Some(e.to_string()),
400 Ok(meta_warning) => meta_warning,
401 }
402 } else {
403 None
404 }
405 }
406
407 pub(super) fn maybe_save_eval(
413 &self,
414 result: &FeedResult,
415 session_id: &str,
416 result_json: &str,
417 ) -> Option<String> {
418 if !matches!(result, FeedResult::Finished(_)) {
419 return None;
420 }
421 let strategy = {
422 let mut map = self.eval_sessions.lock().unwrap_or_else(|e| e.into_inner());
423 map.remove(session_id)
424 };
425 strategy.and_then(|s| {
426 super::eval_store::save_eval_result(&self.log_config.app_dir(), &s, result_json).err()
427 })
428 }
429
430 pub(super) async fn start_and_tick(
431 &self,
432 code: String,
433 ctx: serde_json::Value,
434 strategy: Option<&str>,
435 extra_lib_paths: Vec<std::path::PathBuf>,
436 variant_pkgs: Vec<VariantPkg>,
437 ) -> Result<String, String> {
438 let scenarios_dir = self.log_config.app_dir().scenarios_dir();
439 let session = self
440 .executor
441 .start_session(
442 code,
443 ctx,
444 extra_lib_paths,
445 variant_pkgs,
446 Arc::clone(&self.state_store),
447 Arc::clone(&self.card_store),
448 scenarios_dir,
449 )
450 .await?;
451 let (session_id, result) = self
452 .registry
453 .start_execution(session)
454 .await
455 .map_err(|e| format!("Execution failed: {e}"))?;
456 if let Some(s) = strategy {
457 if let Ok(mut map) = self.session_strategies.lock() {
458 map.insert(session_id.clone(), s.to_string());
459 }
460 }
461 let transcript_warning = self.maybe_log_transcript(&result, &session_id);
462 let json = result.to_json(&session_id).to_string();
463 Ok(splice_transcript_warning(&json, transcript_warning))
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use std::path::PathBuf;
470 use std::sync::Arc;
471
472 use algocline_core::{
473 AppDir, ExecutionMetrics, ExecutionObserver, LlmQuery, QueryId, TerminalState,
474 };
475 use algocline_engine::{ExecutionResult, FeedResult};
476
477 use super::super::config::{AppConfig, LogDirSource};
478 use super::{splice_transcript_warning, AppService};
479
480 fn make_metrics_with_transcript() -> ExecutionMetrics {
481 let metrics = ExecutionMetrics::new();
482 let observer = metrics.create_observer();
483 observer.on_paused(&[LlmQuery {
484 id: QueryId::single(),
485 prompt: "test prompt".into(),
486 system: None,
487 max_tokens: 100,
488 grounded: false,
489 underspecified: false,
490 }]);
491 metrics
492 }
493
494 fn make_finished_result(metrics: ExecutionMetrics) -> FeedResult {
495 FeedResult::Finished(ExecutionResult {
496 state: TerminalState::Completed {
497 result: serde_json::json!({"ok": true}),
498 },
499 metrics,
500 })
501 }
502
503 async fn make_app_service_with_log_dir(log_dir: PathBuf) -> AppService {
505 let executor = Arc::new(
506 algocline_engine::Executor::new(vec![])
507 .await
508 .expect("executor"),
509 );
510 let tmp_app = tempfile::tempdir().expect("test tempdir");
511 let log_config = AppConfig {
512 log_dir: Some(log_dir),
513 log_dir_source: LogDirSource::EnvVar,
514 log_enabled: true,
515 prompt_preview_chars: 200,
516 app_dir: Arc::new(AppDir::new(tmp_app.path().to_path_buf())),
517 };
518 std::mem::forget(tmp_app);
519 AppService::new(executor, log_config, vec![])
520 }
521
522 #[tokio::test]
525 async fn maybe_log_transcript_returns_some_on_write_failure() {
526 let tmp = tempfile::tempdir().expect("test tempdir");
527 let log_dir = tmp.path().to_path_buf();
528 std::fs::create_dir_all(log_dir.join("fail-session.json"))
530 .expect("pre-create dir to block write");
531 let svc = make_app_service_with_log_dir(log_dir).await;
532 let metrics = make_metrics_with_transcript();
533 let result = make_finished_result(metrics);
534 let warning = svc.maybe_log_transcript(&result, "fail-session");
535 assert!(warning.is_some(), "expected Some warning on write failure");
536 let msg = warning.unwrap();
537 assert!(
538 msg.contains("transcript"),
539 "warning should mention 'transcript', got: {msg}"
540 );
541 }
542
543 #[tokio::test]
544 async fn maybe_log_transcript_returns_none_on_non_finished() {
545 let tmp = tempfile::tempdir().expect("test tempdir");
546 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
547 let result = FeedResult::Accepted { remaining: 1 };
548 let warning = svc.maybe_log_transcript(&result, "any-session");
549 assert!(warning.is_none(), "Accepted result should return None");
550 }
551
552 #[test]
555 fn splice_transcript_warning_injects_field_when_some() {
556 let json = r#"{"status":"finished","result":{}}"#;
557 let out = splice_transcript_warning(json, Some("write failed".to_string()));
558 let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
559 assert_eq!(
560 v["transcript_warning"].as_str(),
561 Some("write failed"),
562 "transcript_warning field should be present"
563 );
564 assert_eq!(v["status"].as_str(), Some("finished"));
566 }
567
568 #[test]
569 fn splice_transcript_warning_passthrough_when_none() {
570 let json = r#"{"status":"finished"}"#;
571 let out = splice_transcript_warning(json, None);
572 let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
573 assert!(
574 v.get("transcript_warning").is_none(),
575 "transcript_warning must be absent when warning is None"
576 );
577 }
578
579 use crate::pool::PoolSessionEntry;
582
583 #[tokio::test]
589 async fn continue_single_in_mcp_path_on_registry_miss() {
590 let tmp = tempfile::tempdir().expect("test tempdir");
591 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
592
593 let result = svc
597 .continue_single(
598 "nonexistent-session-id",
599 "some response".to_string(),
600 None,
601 None,
602 )
603 .await;
604 assert!(
605 result.is_err(),
606 "unknown session must return Err on in-MCP path"
607 );
608 let msg = result.unwrap_err();
609 assert!(
610 msg.contains("not found") || msg.contains("Continue failed"),
611 "error must indicate session not found, got: {msg}"
612 );
613 }
614
615 #[tokio::test]
620 async fn app_service_new_initialises_empty_pool_registry() {
621 let tmp = tempfile::tempdir().expect("test tempdir");
622 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
623
624 let reg = svc.pool_registry.read().await;
625 assert!(
626 reg.sessions.is_empty(),
627 "pool registry must be empty on first-run (no registry.json)"
628 );
629 }
630
631 #[tokio::test]
636 async fn app_service_pool_paths_correctly_derived() {
637 let tmp = tempfile::tempdir().expect("test tempdir");
638 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
639
640 assert!(
641 svc.pool_dir.ends_with("pool"),
642 "pool_dir must end in 'pool', got: {}",
643 svc.pool_dir.display()
644 );
645 assert!(
646 svc.pool_reg_path.ends_with("pool/registry.json"),
647 "pool_reg_path must end in 'pool/registry.json', got: {}",
648 svc.pool_reg_path.display()
649 );
650 assert!(
651 svc.pool_lock_path.ends_with("pool/registry.lock"),
652 "pool_lock_path must end in 'pool/registry.lock', got: {}",
653 svc.pool_lock_path.display()
654 );
655 }
656
657 #[tokio::test]
663 async fn continue_single_propagates_corrupted_registry_error() {
664 let tmp = tempfile::tempdir().expect("test tempdir");
665 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
666
667 let pool_dir = svc.pool_dir.clone();
669 std::fs::create_dir_all(&pool_dir).expect("create pool dir");
670 std::fs::write(pool_dir.join("registry.json"), b"{ not valid json !!!")
671 .expect("write corrupt registry");
672
673 let result = svc
677 .continue_single("any-session-id", "response".to_string(), None, None)
678 .await;
679 assert!(
680 result.is_err(),
681 "corrupted registry must cause Err, not silent empty fallback"
682 );
683 let msg = result.unwrap_err();
684 assert!(
685 msg.contains("corrupted") || msg.contains("parse") || msg.contains("Continue failed"),
686 "error must mention registry problem, got: {msg}"
687 );
688 }
689
690 use super::super::eval_store::splice_response_string;
693
694 #[test]
700 fn splice_response_string_injects_cache_reload_warning() {
701 let json = r#"{"status":"finished","result":{"ok":true}}"#;
702 let msg = "failed to reload pool registry: No such file or directory";
703 let out = splice_response_string(json, "pool_cache_reload_warning", msg);
704 let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
705 assert_eq!(
706 v["pool_cache_reload_warning"].as_str(),
707 Some(msg),
708 "pool_cache_reload_warning must be present in response"
709 );
710 assert_eq!(v["status"].as_str(), Some("finished"));
712 }
713
714 #[test]
719 fn splice_response_string_passthrough_on_non_object_json() {
720 let non_object = r#""just a string""#;
721 let out = splice_response_string(non_object, "pool_cache_reload_warning", "err");
722 assert_eq!(out, non_object);
724 }
725
726 #[test]
732 fn splice_response_string_not_called_when_none() {
733 let json = r#"{"status":"finished"}"#;
734 let v: serde_json::Value = serde_json::from_str(json).expect("valid JSON");
736 assert!(
737 v.get("pool_cache_reload_warning").is_none(),
738 "pool_cache_reload_warning must be absent when no cache-reload error occurred"
739 );
740 }
741
742 #[tokio::test]
748 async fn continue_single_routes_to_pool_on_registry_hit() {
749 let tmp = tempfile::tempdir().expect("test tempdir");
750 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
751
752 let fake_sock = tmp.path().join("nonexistent.sock");
755 let entry = PoolSessionEntry::new(
756 "test-pool-session",
757 std::process::id(), fake_sock.clone(),
759 env!("CARGO_PKG_VERSION"),
760 );
761 {
762 let mut reg = svc.pool_registry.write().await;
763 reg.add(entry);
764 }
765
766 let result = svc
770 .continue_single("test-pool-session", "response".to_string(), None, None)
771 .await;
772 assert!(
773 result.is_err(),
774 "pool path must fail with connect error (no real worker)"
775 );
776 let msg = result.unwrap_err();
777 assert!(
780 !msg.contains("session not found") || msg.contains("Continue failed"),
781 "error must be from pool path (UDS connect), got: {msg}"
782 );
783 }
784}