1use std::sync::Arc;
2
3use algocline_core::QueryId;
4use algocline_engine::{FeedResult, VariantPkg};
5
6use super::eval_store::{splice_response_string, splice_response_warnings};
7use super::resolve::{is_package_installed, make_require_code, resolve_code, QueryResponse};
8use super::transcript::write_transcript_log;
9use super::AppService;
10use crate::pool::dispatch::{continue_via_pool, run_via_pool};
11
12fn splice_save_warning(result_json: &str, warning: Option<String>) -> String {
16 match warning {
17 Some(msg) => splice_response_string(result_json, "save_warning", &msg),
18 None => result_json.to_string(),
19 }
20}
21
22fn splice_transcript_warning(result_json: &str, warning: Option<String>) -> String {
26 match warning {
27 Some(msg) => splice_response_string(result_json, "transcript_warning", &msg),
28 None => result_json.to_string(),
29 }
30}
31
32impl AppService {
33 pub async fn run(
53 &self,
54 code: Option<String>,
55 code_file: Option<String>,
56 ctx: Option<serde_json::Value>,
57 project_root: Option<String>,
58 host_mode: Option<bool>,
59 ) -> Result<String, String> {
60 let code = resolve_code(code, code_file)?;
61 let ctx = ctx.unwrap_or(serde_json::Value::Null);
62 let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
63 let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
64 let mut warnings: Vec<String> = extra_warnings;
65 warnings.extend(variant_warnings);
66
67 if host_mode == Some(true) {
68 let (session_id, json, pool_save_error) = run_via_pool(
72 &self.pool_dir,
73 &self.pool_reg_path,
74 &self.pool_lock_path,
75 extra,
76 code,
77 ctx,
78 )
79 .await
80 .map_err(|e| e.to_string())?;
81
82 let cache_reload_warning: Option<String> =
92 match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
93 Ok(reg) => {
94 let mut guard = self.pool_registry.write().await;
95 *guard = reg;
96 None
97 }
98 Err(e) => {
99 tracing::warn!(
100 error = %e,
101 "failed to reload pool registry after run; in-memory cache may be stale"
102 );
103 Some(e.to_string())
104 }
105 };
106
107 let json = splice_response_warnings(&json, "lib_path_warnings", &warnings);
108 let json = match pool_save_error {
109 Some(msg) => splice_response_string(&json, "pool_save_error", &msg),
110 None => json,
111 };
112 let json = match cache_reload_warning {
113 Some(msg) => splice_response_string(&json, "pool_cache_reload_warning", &msg),
114 None => json,
115 };
116 let _ = session_id; return Ok(json);
118 }
119
120 let json = self
122 .start_and_tick(code, ctx, None, extra, variants)
123 .await?;
124 Ok(splice_response_warnings(
125 &json,
126 "lib_path_warnings",
127 &warnings,
128 ))
129 }
130
131 pub async fn advice(
139 &self,
140 strategy: &str,
141 task: Option<String>,
142 opts: Option<serde_json::Value>,
143 project_root: Option<String>,
144 ) -> Result<String, String> {
145 let app_dir = self.log_config.app_dir();
147 if !is_package_installed(&app_dir, strategy) {
148 self.auto_install_bundled_packages().await?;
149 if !is_package_installed(&app_dir, strategy) {
150 return Err(format!(
151 "Package '{strategy}' not found after installing bundled collection. \
152 Use alc_pkg_install to install it manually."
153 ));
154 }
155 }
156
157 let code = make_require_code(strategy);
158
159 let mut ctx_map = match opts {
160 Some(serde_json::Value::Object(m)) => m,
161 _ => serde_json::Map::new(),
162 };
163 if let Some(task) = task {
164 ctx_map.insert("task".into(), serde_json::Value::String(task));
165 }
166 let ctx = serde_json::Value::Object(ctx_map);
167
168 let (extra, extra_warnings) = self.resolve_extra_lib_paths(project_root.as_deref());
169 let (variants, variant_warnings) = self.resolve_variant_pkgs(project_root.as_deref());
170 let mut warnings: Vec<String> = extra_warnings;
171 warnings.extend(variant_warnings);
172 let json = self
173 .start_and_tick(code, ctx, Some(strategy), extra, variants)
174 .await?;
175 Ok(splice_response_warnings(
176 &json,
177 "lib_path_warnings",
178 &warnings,
179 ))
180 }
181
182 pub async fn continue_batch(
189 &self,
190 session_id: &str,
191 responses: Vec<QueryResponse>,
192 ) -> Result<String, String> {
193 let pool_entry = {
195 let reg = self.pool_registry.read().await;
196 reg.find(session_id).cloned()
197 };
198
199 let pool_entry = if pool_entry.is_some() {
200 pool_entry
201 } else {
202 match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
203 Ok(reg) => {
204 let entry = reg.find(session_id).cloned();
205 if entry.is_some() {
206 let mut guard = self.pool_registry.write().await;
207 *guard = reg;
208 }
209 entry
210 }
211 Err(e) => {
212 return Err(format!("Continue failed: {e}"));
213 }
214 }
215 };
216
217 if let Some(entry) = pool_entry {
218 let mut last_json = None;
220 for qr in responses {
221 let json =
222 continue_via_pool(&entry, session_id, qr.response, Some(qr.query_id), qr.usage)
223 .await
224 .map_err(|e| format!("Continue failed: {e}"))?;
225 last_json = Some(json);
226 }
227 return last_json.ok_or_else(|| "Empty responses array".to_string());
228 }
229
230 let mut last_result = None;
232 for qr in responses {
233 let qid = QueryId::parse(&qr.query_id);
234 let result = self
235 .registry
236 .feed_response(session_id, &qid, qr.response, qr.usage.as_ref())
237 .await
238 .map_err(|e| format!("Continue failed: {e}"))?;
239 last_result = Some(result);
240 }
241 let result = last_result.ok_or("Empty responses array")?;
242 let transcript_warning = self.maybe_log_transcript(&result, session_id);
243 let json = result.to_json(session_id).to_string();
244 let json = splice_transcript_warning(&json, transcript_warning);
245 let save_warning = self.maybe_save_eval(&result, session_id, &json);
246 Ok(splice_save_warning(&json, save_warning))
247 }
248
249 pub async fn continue_single(
270 &self,
271 session_id: &str,
272 response: String,
273 query_id: Option<&str>,
274 usage: Option<algocline_core::TokenUsage>,
275 ) -> Result<String, String> {
276 let pool_entry = {
279 let reg = self.pool_registry.read().await;
280 reg.find(session_id).cloned()
281 }; let pool_entry = if pool_entry.is_some() {
285 pool_entry
286 } else {
287 match crate::pool::PoolRegistry::load_or_default(&self.pool_reg_path) {
288 Ok(reg) => {
289 let entry = reg.find(session_id).cloned();
290 if entry.is_some() {
291 let mut guard = self.pool_registry.write().await;
293 *guard = reg;
294 }
295 entry
296 }
297 Err(e) => {
298 return Err(format!("Continue failed: {e}"));
300 }
301 }
302 };
303
304 if let Some(entry) = pool_entry {
305 let json = continue_via_pool(
307 &entry,
308 session_id,
309 response,
310 query_id.map(str::to_string),
311 usage,
312 )
313 .await
314 .map_err(|e| format!("Continue failed: {e}"))?;
315 return Ok(json);
316 }
317
318 let query_id = match query_id {
320 Some(qid) => QueryId::parse(qid),
321 None => self
322 .registry
323 .resolve_sole_pending_id(session_id)
324 .await
325 .map_err(|e| format!("Continue failed: {e}"))?,
326 };
327
328 let result = self
329 .registry
330 .feed_response(session_id, &query_id, response, usage.as_ref())
331 .await
332 .map_err(|e| format!("Continue failed: {e}"))?;
333
334 let transcript_warning = self.maybe_log_transcript(&result, session_id);
335 let json = result.to_json(session_id).to_string();
336 let json = splice_transcript_warning(&json, transcript_warning);
337 let save_warning = self.maybe_save_eval(&result, session_id, &json);
338 Ok(splice_save_warning(&json, save_warning))
339 }
340
341 pub(super) fn maybe_log_transcript(
344 &self,
345 result: &FeedResult,
346 session_id: &str,
347 ) -> Option<String> {
348 if let FeedResult::Finished(exec_result) = result {
349 let strategy = match self.session_strategies.lock() {
354 Ok(mut map) => map.remove(session_id),
355 Err(e) => {
356 tracing::warn!(
357 "session_strategies mutex poisoned for '{}': {}",
358 session_id,
359 e
360 );
361 return Some(format!(
364 "session_strategies mutex poisoned for '{session_id}': {e}"
365 ));
366 }
367 };
368 match write_transcript_log(
372 &self.log_config,
373 session_id,
374 &exec_result.metrics,
375 strategy.as_deref(),
376 ) {
377 Err(e) => Some(e.to_string()),
378 Ok(meta_warning) => meta_warning,
379 }
380 } else {
381 None
382 }
383 }
384
385 pub(super) fn maybe_save_eval(
391 &self,
392 result: &FeedResult,
393 session_id: &str,
394 result_json: &str,
395 ) -> Option<String> {
396 if !matches!(result, FeedResult::Finished(_)) {
397 return None;
398 }
399 let strategy = {
400 let mut map = self.eval_sessions.lock().unwrap_or_else(|e| e.into_inner());
401 map.remove(session_id)
402 };
403 strategy.and_then(|s| {
404 super::eval_store::save_eval_result(&self.log_config.app_dir(), &s, result_json).err()
405 })
406 }
407
408 pub(super) async fn start_and_tick(
409 &self,
410 code: String,
411 ctx: serde_json::Value,
412 strategy: Option<&str>,
413 extra_lib_paths: Vec<std::path::PathBuf>,
414 variant_pkgs: Vec<VariantPkg>,
415 ) -> Result<String, String> {
416 let scenarios_dir = self.log_config.app_dir().scenarios_dir();
417 let session = self
418 .executor
419 .start_session(
420 code,
421 ctx,
422 extra_lib_paths,
423 variant_pkgs,
424 Arc::clone(&self.state_store),
425 Arc::clone(&self.card_store),
426 scenarios_dir,
427 )
428 .await?;
429 let (session_id, result) = self
430 .registry
431 .start_execution(session)
432 .await
433 .map_err(|e| format!("Execution failed: {e}"))?;
434 if let Some(s) = strategy {
435 if let Ok(mut map) = self.session_strategies.lock() {
436 map.insert(session_id.clone(), s.to_string());
437 }
438 }
439 let transcript_warning = self.maybe_log_transcript(&result, &session_id);
440 let json = result.to_json(&session_id).to_string();
441 Ok(splice_transcript_warning(&json, transcript_warning))
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use std::path::PathBuf;
448 use std::sync::Arc;
449
450 use algocline_core::{
451 AppDir, ExecutionMetrics, ExecutionObserver, LlmQuery, QueryId, TerminalState,
452 };
453 use algocline_engine::{ExecutionResult, FeedResult};
454
455 use super::super::config::{AppConfig, LogDirSource};
456 use super::{splice_transcript_warning, AppService};
457
458 fn make_metrics_with_transcript() -> ExecutionMetrics {
459 let metrics = ExecutionMetrics::new();
460 let observer = metrics.create_observer();
461 observer.on_paused(&[LlmQuery {
462 id: QueryId::single(),
463 prompt: "test prompt".into(),
464 system: None,
465 max_tokens: 100,
466 grounded: false,
467 underspecified: false,
468 }]);
469 metrics
470 }
471
472 fn make_finished_result(metrics: ExecutionMetrics) -> FeedResult {
473 FeedResult::Finished(ExecutionResult {
474 state: TerminalState::Completed {
475 result: serde_json::json!({"ok": true}),
476 },
477 metrics,
478 })
479 }
480
481 async fn make_app_service_with_log_dir(log_dir: PathBuf) -> AppService {
483 let executor = Arc::new(
484 algocline_engine::Executor::new(vec![])
485 .await
486 .expect("executor"),
487 );
488 let tmp_app = tempfile::tempdir().expect("test tempdir");
489 let log_config = AppConfig {
490 log_dir: Some(log_dir),
491 log_dir_source: LogDirSource::EnvVar,
492 log_enabled: true,
493 prompt_preview_chars: 200,
494 app_dir: Arc::new(AppDir::new(tmp_app.path().to_path_buf())),
495 };
496 std::mem::forget(tmp_app);
497 AppService::new(executor, log_config, vec![])
498 }
499
500 #[tokio::test]
503 async fn maybe_log_transcript_returns_some_on_write_failure() {
504 let tmp = tempfile::tempdir().expect("test tempdir");
505 let log_dir = tmp.path().to_path_buf();
506 std::fs::create_dir_all(log_dir.join("fail-session.json"))
508 .expect("pre-create dir to block write");
509 let svc = make_app_service_with_log_dir(log_dir).await;
510 let metrics = make_metrics_with_transcript();
511 let result = make_finished_result(metrics);
512 let warning = svc.maybe_log_transcript(&result, "fail-session");
513 assert!(warning.is_some(), "expected Some warning on write failure");
514 let msg = warning.unwrap();
515 assert!(
516 msg.contains("transcript"),
517 "warning should mention 'transcript', got: {msg}"
518 );
519 }
520
521 #[tokio::test]
522 async fn maybe_log_transcript_returns_none_on_non_finished() {
523 let tmp = tempfile::tempdir().expect("test tempdir");
524 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
525 let result = FeedResult::Accepted { remaining: 1 };
526 let warning = svc.maybe_log_transcript(&result, "any-session");
527 assert!(warning.is_none(), "Accepted result should return None");
528 }
529
530 #[test]
533 fn splice_transcript_warning_injects_field_when_some() {
534 let json = r#"{"status":"finished","result":{}}"#;
535 let out = splice_transcript_warning(json, Some("write failed".to_string()));
536 let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
537 assert_eq!(
538 v["transcript_warning"].as_str(),
539 Some("write failed"),
540 "transcript_warning field should be present"
541 );
542 assert_eq!(v["status"].as_str(), Some("finished"));
544 }
545
546 #[test]
547 fn splice_transcript_warning_passthrough_when_none() {
548 let json = r#"{"status":"finished"}"#;
549 let out = splice_transcript_warning(json, None);
550 let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
551 assert!(
552 v.get("transcript_warning").is_none(),
553 "transcript_warning must be absent when warning is None"
554 );
555 }
556
557 use crate::pool::PoolSessionEntry;
560
561 #[tokio::test]
567 async fn continue_single_in_mcp_path_on_registry_miss() {
568 let tmp = tempfile::tempdir().expect("test tempdir");
569 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
570
571 let result = svc
575 .continue_single(
576 "nonexistent-session-id",
577 "some response".to_string(),
578 None,
579 None,
580 )
581 .await;
582 assert!(
583 result.is_err(),
584 "unknown session must return Err on in-MCP path"
585 );
586 let msg = result.unwrap_err();
587 assert!(
588 msg.contains("not found") || msg.contains("Continue failed"),
589 "error must indicate session not found, got: {msg}"
590 );
591 }
592
593 #[tokio::test]
598 async fn app_service_new_initialises_empty_pool_registry() {
599 let tmp = tempfile::tempdir().expect("test tempdir");
600 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
601
602 let reg = svc.pool_registry.read().await;
603 assert!(
604 reg.sessions.is_empty(),
605 "pool registry must be empty on first-run (no registry.json)"
606 );
607 }
608
609 #[tokio::test]
614 async fn app_service_pool_paths_correctly_derived() {
615 let tmp = tempfile::tempdir().expect("test tempdir");
616 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
617
618 assert!(
619 svc.pool_dir.ends_with("pool"),
620 "pool_dir must end in 'pool', got: {}",
621 svc.pool_dir.display()
622 );
623 assert!(
624 svc.pool_reg_path.ends_with("pool/registry.json"),
625 "pool_reg_path must end in 'pool/registry.json', got: {}",
626 svc.pool_reg_path.display()
627 );
628 assert!(
629 svc.pool_lock_path.ends_with("pool/registry.lock"),
630 "pool_lock_path must end in 'pool/registry.lock', got: {}",
631 svc.pool_lock_path.display()
632 );
633 }
634
635 #[tokio::test]
641 async fn continue_single_propagates_corrupted_registry_error() {
642 let tmp = tempfile::tempdir().expect("test tempdir");
643 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
644
645 let pool_dir = svc.pool_dir.clone();
647 std::fs::create_dir_all(&pool_dir).expect("create pool dir");
648 std::fs::write(pool_dir.join("registry.json"), b"{ not valid json !!!")
649 .expect("write corrupt registry");
650
651 let result = svc
655 .continue_single("any-session-id", "response".to_string(), None, None)
656 .await;
657 assert!(
658 result.is_err(),
659 "corrupted registry must cause Err, not silent empty fallback"
660 );
661 let msg = result.unwrap_err();
662 assert!(
663 msg.contains("corrupted") || msg.contains("parse") || msg.contains("Continue failed"),
664 "error must mention registry problem, got: {msg}"
665 );
666 }
667
668 use super::super::eval_store::splice_response_string;
671
672 #[test]
678 fn splice_response_string_injects_cache_reload_warning() {
679 let json = r#"{"status":"finished","result":{"ok":true}}"#;
680 let msg = "failed to reload pool registry: No such file or directory";
681 let out = splice_response_string(json, "pool_cache_reload_warning", msg);
682 let v: serde_json::Value = serde_json::from_str(&out).expect("valid JSON");
683 assert_eq!(
684 v["pool_cache_reload_warning"].as_str(),
685 Some(msg),
686 "pool_cache_reload_warning must be present in response"
687 );
688 assert_eq!(v["status"].as_str(), Some("finished"));
690 }
691
692 #[test]
697 fn splice_response_string_passthrough_on_non_object_json() {
698 let non_object = r#""just a string""#;
699 let out = splice_response_string(non_object, "pool_cache_reload_warning", "err");
700 assert_eq!(out, non_object);
702 }
703
704 #[test]
710 fn splice_response_string_not_called_when_none() {
711 let json = r#"{"status":"finished"}"#;
712 let v: serde_json::Value = serde_json::from_str(json).expect("valid JSON");
714 assert!(
715 v.get("pool_cache_reload_warning").is_none(),
716 "pool_cache_reload_warning must be absent when no cache-reload error occurred"
717 );
718 }
719
720 #[tokio::test]
726 async fn continue_single_routes_to_pool_on_registry_hit() {
727 let tmp = tempfile::tempdir().expect("test tempdir");
728 let svc = make_app_service_with_log_dir(tmp.path().to_path_buf()).await;
729
730 let fake_sock = tmp.path().join("nonexistent.sock");
733 let entry = PoolSessionEntry::new(
734 "test-pool-session",
735 std::process::id(), fake_sock.clone(),
737 env!("CARGO_PKG_VERSION"),
738 );
739 {
740 let mut reg = svc.pool_registry.write().await;
741 reg.add(entry);
742 }
743
744 let result = svc
748 .continue_single("test-pool-session", "response".to_string(), None, None)
749 .await;
750 assert!(
751 result.is_err(),
752 "pool path must fail with connect error (no real worker)"
753 );
754 let msg = result.unwrap_err();
755 assert!(
758 !msg.contains("session not found") || msg.contains("Continue failed"),
759 "error must be from pool path (UDS connect), got: {msg}"
760 );
761 }
762}