1use std::collections::HashMap;
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use algocline_core::{
12 ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
13 TerminalState,
14};
15use mlua_isle::{AsyncIsleDriver, AsyncTask};
16use serde_json::json;
17use tokio::sync::Mutex;
18
19use crate::llm_bridge::LlmRequest;
20
21#[derive(Debug, thiserror::Error)]
24pub enum SessionError {
25 #[error("session '{0}' not found")]
26 NotFound(String),
27 #[error(transparent)]
28 Feed(#[from] algocline_core::FeedError),
29 #[error("invalid transition: {0}")]
30 InvalidTransition(String),
31}
32
33#[derive(serde::Serialize)]
37pub struct ExecutionResult {
38 pub state: TerminalState,
39 pub metrics: ExecutionMetrics,
40}
41
42#[derive(serde::Serialize)]
44pub enum FeedResult {
45 Accepted { remaining: usize },
47 Paused { queries: Vec<LlmQuery> },
49 Finished(ExecutionResult),
51}
52
53impl FeedResult {
54 pub fn to_json(&self, session_id: &str) -> serde_json::Value {
56 match self {
57 Self::Accepted { remaining } => json!({
58 "status": "accepted",
59 "remaining": remaining,
60 }),
61 Self::Paused { queries } => {
62 if queries.len() == 1 {
63 let q = &queries[0];
64 let mut obj = json!({
65 "status": "needs_response",
66 "session_id": session_id,
67 "query_id": q.id.as_str(),
68 "prompt": q.prompt,
69 "system": q.system,
70 "max_tokens": q.max_tokens,
71 });
72 if q.grounded {
73 obj["grounded"] = json!(true);
74 }
75 if q.underspecified {
76 obj["underspecified"] = json!(true);
77 }
78 obj
79 } else {
80 let qs: Vec<_> = queries
81 .iter()
82 .map(|q| {
83 let mut obj = json!({
84 "id": q.id.as_str(),
85 "prompt": q.prompt,
86 "system": q.system,
87 "max_tokens": q.max_tokens,
88 });
89 if q.grounded {
90 obj["grounded"] = json!(true);
91 }
92 if q.underspecified {
93 obj["underspecified"] = json!(true);
94 }
95 obj
96 })
97 .collect();
98 json!({
99 "status": "needs_response",
100 "session_id": session_id,
101 "queries": qs,
102 })
103 }
104 }
105 Self::Finished(result) => match &result.state {
106 TerminalState::Completed { result: val } => json!({
107 "status": "completed",
108 "result": val,
109 "stats": result.metrics.to_json(),
110 }),
111 TerminalState::Failed { error } => json!({
112 "status": "error",
113 "error": error,
114 }),
115 TerminalState::Cancelled => json!({
116 "status": "cancelled",
117 "stats": result.metrics.to_json(),
118 }),
119 },
120 }
121 }
122}
123
124pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
130
131#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
138pub struct PendingFilter {
139 #[serde(default)]
140 pub query_id: bool,
141 #[serde(default)]
142 pub max_tokens: bool,
143 #[serde(default)]
144 pub system: bool,
145 #[serde(default)]
146 pub grounded: bool,
147 #[serde(default)]
148 pub underspecified: bool,
149 #[serde(default)]
150 pub prompt: PromptProjection,
151}
152
153#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
159#[serde(tag = "mode", rename_all = "snake_case")]
160pub enum PromptProjection {
161 #[default]
162 Off,
163 Preview {
164 chars: usize,
165 },
166 Full,
167}
168
169impl PendingFilter {
170 pub fn preset_meta() -> Self {
172 Self {
173 query_id: true,
174 max_tokens: true,
175 ..Self::default()
176 }
177 }
178
179 pub fn preset_preview() -> Self {
182 Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
183 }
184
185 pub fn preset_preview_with(chars: usize) -> Self {
188 Self {
189 query_id: true,
190 max_tokens: true,
191 prompt: PromptProjection::Preview { chars },
192 ..Self::default()
193 }
194 }
195
196 pub fn preset_full() -> Self {
198 Self {
199 query_id: true,
200 max_tokens: true,
201 system: true,
202 grounded: true,
203 underspecified: true,
204 prompt: PromptProjection::Full,
205 }
206 }
207
208 pub fn from_preset(name: &str) -> Option<Self> {
212 match name {
213 "meta" => Some(Self::preset_meta()),
214 "preview" => Some(Self::preset_preview()),
215 "full" => Some(Self::preset_full()),
216 _ => None,
217 }
218 }
219
220 pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
223 match name {
224 "meta" => Some(Self::preset_meta()),
225 "preview" => Some(Self::preset_preview_with(preview_chars)),
226 "full" => Some(Self::preset_full()),
227 _ => None,
228 }
229 }
230}
231
232fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
237 let mut obj = serde_json::Map::new();
238 if f.query_id {
239 obj.insert("query_id".into(), q.id.as_str().into());
240 }
241 if f.max_tokens {
242 obj.insert("max_tokens".into(), q.max_tokens.into());
243 }
244 if f.system {
245 obj.insert(
246 "system".into(),
247 match &q.system {
248 Some(s) => serde_json::Value::String(s.clone()),
249 None => serde_json::Value::Null,
250 },
251 );
252 }
253 if f.grounded {
254 obj.insert("grounded".into(), q.grounded.into());
255 }
256 if f.underspecified {
257 obj.insert("underspecified".into(), q.underspecified.into());
258 }
259 match &f.prompt {
260 PromptProjection::Off => {}
261 PromptProjection::Full => {
262 obj.insert("prompt".into(), q.prompt.clone().into());
263 }
264 PromptProjection::Preview { chars } => {
265 let preview: String = q.prompt.chars().take(*chars).collect();
266 obj.insert("prompt_preview".into(), preview.into());
267 }
268 }
269 serde_json::Value::Object(obj)
270}
271
272pub struct Session {
280 state: ExecutionState,
281 metrics: ExecutionMetrics,
282 observer: MetricsObserver,
283 llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
284 exec_task: AsyncTask,
285 resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
287 _vm_driver: AsyncIsleDriver,
290 last_active: std::time::Instant,
293 started_at_ms: i64,
295 last_activity_ms: Arc<AtomicI64>,
298}
299
300impl Session {
301 pub fn new(
317 llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
318 exec_task: AsyncTask,
319 metrics: ExecutionMetrics,
320 vm_driver: AsyncIsleDriver,
321 ) -> Self {
322 let observer = metrics.create_observer();
323 let started_at_ms = SystemTime::now()
326 .duration_since(UNIX_EPOCH)
327 .unwrap_or_default()
328 .as_millis() as i64;
329 Self {
330 state: ExecutionState::Running,
331 metrics,
332 observer,
333 llm_rx,
334 exec_task,
335 resp_txs: HashMap::new(),
336 _vm_driver: vm_driver,
337 last_active: std::time::Instant::now(),
338 started_at_ms,
339 last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
340 }
341 }
342
343 async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
348 tokio::select! {
349 result = &mut self.exec_task => {
350 match result {
351 Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
352 Ok(v) => {
353 self.state.complete(v.clone()).map_err(|e| {
354 SessionError::InvalidTransition(e.to_string())
355 })?;
356 self.observer.on_completed(&v);
357 Ok(FeedResult::Finished(ExecutionResult {
358 state: TerminalState::Completed { result: v },
359 metrics: self.take_metrics(),
360 }))
361 }
362 Err(e) => self.fail_with(format!("JSON parse: {e}")),
363 },
364 Err(e) => self.fail_with(e.to_string()),
365 }
366 }
367 Some(req) = self.llm_rx.recv() => {
368 let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
369 id: qr.id.clone(),
370 prompt: qr.prompt.clone(),
371 system: qr.system.clone(),
372 max_tokens: qr.max_tokens,
373 grounded: qr.grounded,
374 underspecified: qr.underspecified,
375 }).collect();
376
377 for qr in req.queries {
378 self.resp_txs.insert(qr.id, qr.resp_tx);
379 }
380
381 self.state.pause(queries.clone()).map_err(|e| {
382 SessionError::InvalidTransition(e.to_string())
383 })?;
384 self.observer.on_paused(&queries);
385 Ok(FeedResult::Paused { queries })
386 }
387 }
388 }
389
390 fn feed_one(
406 &mut self,
407 query_id: &QueryId,
408 response: String,
409 usage: Option<&algocline_core::TokenUsage>,
410 ) -> Result<bool, SessionError> {
411 self.last_active = std::time::Instant::now();
413 let now_ms = SystemTime::now()
416 .duration_since(UNIX_EPOCH)
417 .unwrap_or_default()
418 .as_millis() as i64;
419 self.last_activity_ms.store(now_ms, Ordering::Relaxed);
420
421 self.observer.on_response_fed(query_id, &response, usage);
423
424 if let Some(tx) = self.resp_txs.remove(query_id) {
426 let _ = tx.send(Ok(response.clone()));
427 }
428
429 let complete = self
431 .state
432 .feed(query_id, response)
433 .map_err(SessionError::Feed)?;
434
435 if complete {
436 self.state
438 .take_responses()
439 .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
440 self.observer.on_resumed();
441 } else {
442 self.observer
443 .on_partial_feed(query_id, self.state.remaining());
444 }
445
446 Ok(complete)
447 }
448
449 fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
450 self.state
451 .fail(msg.clone())
452 .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
453 self.observer.on_failed(&msg);
454 Ok(FeedResult::Finished(ExecutionResult {
455 state: TerminalState::Failed { error: msg },
456 metrics: self.take_metrics(),
457 }))
458 }
459
460 fn take_metrics(&mut self) -> ExecutionMetrics {
461 std::mem::take(&mut self.metrics)
462 }
463
464 pub fn snapshot(
491 &self,
492 pending_filter: Option<&PendingFilter>,
493 include_history: bool,
494 ) -> serde_json::Value {
495 let state_label = match &self.state {
496 ExecutionState::Running => "running",
497 ExecutionState::Paused(_) => "paused",
498 _ => "terminal",
499 };
500
501 let phase = match &self.state {
502 ExecutionState::Running => "running",
503 ExecutionState::Paused(_) => "paused",
504 ExecutionState::Completed { .. } => "completed",
505 ExecutionState::Failed { .. } => "failed",
506 ExecutionState::Cancelled => "cancelled",
507 };
508
509 let mut json = serde_json::json!({
510 "state": state_label,
511 "phase": phase,
512 "started_at": self.started_at_ms,
513 "last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
514 });
515
516 let metrics = self.metrics.snapshot(include_history);
517 if !metrics.is_null() {
518 json["metrics"] = metrics;
519 }
520
521 if let ExecutionState::Paused(pending) = &self.state {
523 json["pending_queries"] = pending.remaining().into();
524
525 if let Some(filter) = pending_filter {
526 let items: Vec<serde_json::Value> = pending
527 .pending_queries()
528 .iter()
529 .map(|q| project_query(q, filter))
530 .collect();
531 json["pending"] = serde_json::Value::Array(items);
532 }
533 }
534
535 json
536 }
537
538 pub fn is_expired(&self, ttl: Duration) -> bool {
543 is_expired_impl(self.last_active, ttl)
544 }
545}
546
547fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
549 std::time::Instant::now().saturating_duration_since(last_active) >= ttl
550}
551
552pub struct SessionRegistry {
587 sessions: Arc<Mutex<HashMap<String, Session>>>,
588}
589
590impl Default for SessionRegistry {
591 fn default() -> Self {
592 Self::new()
593 }
594}
595
596impl SessionRegistry {
597 pub fn new() -> Self {
598 Self {
599 sessions: Arc::new(Mutex::new(HashMap::new())),
600 }
601 }
602
603 pub async fn start_execution(
605 &self,
606 mut session: Session,
607 ) -> Result<(String, FeedResult), SessionError> {
608 let session_id = gen_session_id();
609 let result = session.wait_event().await?;
610
611 if matches!(result, FeedResult::Paused { .. }) {
612 self.sessions
613 .lock()
614 .await
615 .insert(session_id.clone(), session);
616 }
617
618 Ok((session_id, result))
619 }
620
621 pub async fn feed_response(
627 &self,
628 session_id: &str,
629 query_id: &QueryId,
630 response: String,
631 usage: Option<&algocline_core::TokenUsage>,
632 ) -> Result<FeedResult, SessionError> {
633 let complete = {
635 let mut map = self.sessions.lock().await;
636 let session = map
637 .get_mut(session_id)
638 .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
639
640 let complete = session.feed_one(query_id, response, usage)?;
641
642 if !complete {
643 return Ok(FeedResult::Accepted {
644 remaining: session.state.remaining(),
645 });
646 }
647
648 complete
649 };
650
651 debug_assert!(complete);
653 let mut session = {
654 let mut map = self.sessions.lock().await;
655 map.remove(session_id)
656 .ok_or_else(|| SessionError::NotFound(session_id.into()))?
657 };
658
659 let result = session.wait_event().await?;
660
661 if matches!(result, FeedResult::Paused { .. }) {
662 self.sessions
663 .lock()
664 .await
665 .insert(session_id.into(), session);
666 }
667
668 Ok(result)
669 }
670
671 pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
677 let map = self.sessions.lock().await;
678 let session = map
679 .get(session_id)
680 .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
681 let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
682 match keys.len() {
683 0 => Err(SessionError::InvalidTransition("no pending queries".into())),
684 1 => keys
685 .into_iter()
686 .next()
687 .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
688 n => Err(SessionError::InvalidTransition(format!(
689 "{n} queries pending; specify query_id explicitly"
690 ))),
691 }
692 }
693
694 pub async fn list_snapshots(
710 &self,
711 pending_filter: Option<&PendingFilter>,
712 include_history: bool,
713 ) -> HashMap<String, serde_json::Value> {
714 let map = self.sessions.lock().await;
715 map.iter()
716 .map(|(id, session)| {
717 (
718 id.clone(),
719 session.snapshot(pending_filter, include_history),
720 )
721 })
722 .collect()
723 }
724
725 pub fn spawn_gc_task(&self, ttl: Duration) {
731 let sessions = Arc::clone(&self.sessions);
732 tokio::spawn(async move {
733 let mut interval = tokio::time::interval(Duration::from_secs(60));
734 loop {
735 interval.tick().await;
736 let mut map = sessions.lock().await;
737 let expired: Vec<String> = map
738 .iter()
739 .filter(|(_, s)| s.is_expired(ttl))
740 .map(|(id, _)| id.clone())
741 .collect();
742 for id in &expired {
743 tracing::info!(session_id = %id, "GC: reaping expired session");
744 map.remove(id);
745 }
746 }
747 });
748 }
749}
750
751fn gen_session_id() -> String {
770 use std::time::{SystemTime, UNIX_EPOCH};
771 let ts = SystemTime::now()
772 .duration_since(UNIX_EPOCH)
773 .unwrap_or_default()
774 .as_nanos();
775 let random: u64 = {
777 use std::collections::hash_map::RandomState;
778 use std::hash::{BuildHasher, Hasher};
779 let s = RandomState::new();
780 let mut h = s.build_hasher();
781 h.write_u128(ts);
782 h.finish()
783 };
784 format!("s-{ts:x}-{random:016x}")
785}
786
787#[cfg(test)]
788mod tests {
789 use super::*;
790 use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
791 use serde_json::json;
792
793 fn make_query(index: usize) -> LlmQuery {
794 LlmQuery {
795 id: QueryId::batch(index),
796 prompt: format!("prompt-{index}"),
797 system: None,
798 max_tokens: 100,
799 grounded: false,
800 underspecified: false,
801 }
802 }
803
804 #[test]
807 fn to_json_accepted() {
808 let result = FeedResult::Accepted { remaining: 3 };
809 let json = result.to_json("s-123");
810 assert_eq!(json["status"], "accepted");
811 assert_eq!(json["remaining"], 3);
812 }
813
814 #[test]
815 fn to_json_paused_single_query() {
816 let query = LlmQuery {
817 id: QueryId::single(),
818 prompt: "What is 2+2?".into(),
819 system: Some("You are a calculator.".into()),
820 max_tokens: 50,
821 grounded: false,
822 underspecified: false,
823 };
824 let result = FeedResult::Paused {
825 queries: vec![query],
826 };
827 let json = result.to_json("s-abc");
828
829 assert_eq!(json["status"], "needs_response");
830 assert_eq!(json["session_id"], "s-abc");
831 assert_eq!(json["prompt"], "What is 2+2?");
832 assert_eq!(json["system"], "You are a calculator.");
833 assert_eq!(json["max_tokens"], 50);
834 assert!(json.get("queries").is_none());
836 assert!(
838 json.get("grounded").is_none(),
839 "grounded key must be absent when false"
840 );
841 assert!(
843 json.get("underspecified").is_none(),
844 "underspecified key must be absent when false"
845 );
846 }
847
848 #[test]
849 fn to_json_paused_single_query_grounded() {
850 let query = LlmQuery {
851 id: QueryId::single(),
852 prompt: "verify this claim".into(),
853 system: None,
854 max_tokens: 200,
855 grounded: true,
856 underspecified: false,
857 };
858 let result = FeedResult::Paused {
859 queries: vec![query],
860 };
861 let json = result.to_json("s-grounded");
862
863 assert_eq!(json["status"], "needs_response");
864 assert_eq!(
865 json["grounded"], true,
866 "grounded must appear in single-query MCP JSON"
867 );
868 }
869
870 #[test]
871 fn to_json_paused_single_query_underspecified() {
872 let query = LlmQuery {
873 id: QueryId::single(),
874 prompt: "what output format do you need?".into(),
875 system: None,
876 max_tokens: 200,
877 grounded: false,
878 underspecified: true,
879 };
880 let result = FeedResult::Paused {
881 queries: vec![query],
882 };
883 let json = result.to_json("s-underspec");
884
885 assert_eq!(json["status"], "needs_response");
886 assert_eq!(
887 json["underspecified"], true,
888 "underspecified must appear in single-query MCP JSON"
889 );
890 assert!(
891 json.get("grounded").is_none(),
892 "grounded must be absent when false"
893 );
894 }
895
896 #[test]
897 fn to_json_paused_multiple_queries_mixed_grounded() {
898 let grounded_query = LlmQuery {
899 id: QueryId::batch(0),
900 prompt: "verify".into(),
901 system: None,
902 max_tokens: 100,
903 grounded: true,
904 underspecified: false,
905 };
906 let normal_query = LlmQuery {
907 id: QueryId::batch(1),
908 prompt: "generate".into(),
909 system: None,
910 max_tokens: 100,
911 grounded: false,
912 underspecified: false,
913 };
914 let result = FeedResult::Paused {
915 queries: vec![grounded_query, normal_query],
916 };
917 let json = result.to_json("s-batch");
918
919 let qs = json["queries"].as_array().expect("queries should be array");
920 assert_eq!(
921 qs[0]["grounded"], true,
922 "grounded query must have grounded=true"
923 );
924 assert!(
925 qs[1].get("grounded").is_none(),
926 "non-grounded query must omit grounded key"
927 );
928 }
929
930 #[test]
931 fn to_json_paused_multiple_queries_mixed_underspecified() {
932 let underspec_query = LlmQuery {
933 id: QueryId::batch(0),
934 prompt: "clarify intent".into(),
935 system: None,
936 max_tokens: 100,
937 grounded: false,
938 underspecified: true,
939 };
940 let normal_query = LlmQuery {
941 id: QueryId::batch(1),
942 prompt: "generate".into(),
943 system: None,
944 max_tokens: 100,
945 grounded: false,
946 underspecified: false,
947 };
948 let result = FeedResult::Paused {
949 queries: vec![underspec_query, normal_query],
950 };
951 let json = result.to_json("s-batch-us");
952
953 let qs = json["queries"].as_array().expect("queries should be array");
954 assert_eq!(
955 qs[0]["underspecified"], true,
956 "underspecified query must have underspecified=true"
957 );
958 assert!(
959 qs[1].get("underspecified").is_none(),
960 "non-underspecified query must omit underspecified key"
961 );
962 }
963
964 #[test]
965 fn to_json_paused_single_query_no_system() {
966 let query = LlmQuery {
967 id: QueryId::single(),
968 prompt: "hello".into(),
969 system: None,
970 max_tokens: 1024,
971 grounded: false,
972 underspecified: false,
973 };
974 let result = FeedResult::Paused {
975 queries: vec![query],
976 };
977 let json = result.to_json("s-x");
978
979 assert_eq!(json["status"], "needs_response");
980 assert!(json["system"].is_null());
981 }
982
983 #[test]
984 fn to_json_paused_multiple_queries() {
985 let queries = vec![make_query(0), make_query(1), make_query(2)];
986 let result = FeedResult::Paused { queries };
987 let json = result.to_json("s-multi");
988
989 assert_eq!(json["status"], "needs_response");
990 assert_eq!(json["session_id"], "s-multi");
991
992 let qs = json["queries"].as_array().expect("queries should be array");
993 assert_eq!(qs.len(), 3);
994 assert_eq!(qs[0]["id"], "q-0");
995 assert_eq!(qs[0]["prompt"], "prompt-0");
996 assert_eq!(qs[1]["id"], "q-1");
997 assert_eq!(qs[2]["id"], "q-2");
998 }
999
1000 #[test]
1001 fn to_json_finished_completed() {
1002 let result = FeedResult::Finished(ExecutionResult {
1003 state: TerminalState::Completed {
1004 result: json!({"answer": 42}),
1005 },
1006 metrics: ExecutionMetrics::new(),
1007 });
1008 let json = result.to_json("s-done");
1009
1010 assert_eq!(json["status"], "completed");
1011 assert_eq!(json["result"]["answer"], 42);
1012 assert!(json.get("stats").is_some());
1013 }
1014
1015 #[test]
1016 fn to_json_finished_failed() {
1017 let result = FeedResult::Finished(ExecutionResult {
1018 state: TerminalState::Failed {
1019 error: "lua error: bad argument".into(),
1020 },
1021 metrics: ExecutionMetrics::new(),
1022 });
1023 let json = result.to_json("s-err");
1024
1025 assert_eq!(json["status"], "error");
1026 assert_eq!(json["error"], "lua error: bad argument");
1027 }
1028
1029 #[test]
1030 fn to_json_finished_cancelled() {
1031 let result = FeedResult::Finished(ExecutionResult {
1032 state: TerminalState::Cancelled,
1033 metrics: ExecutionMetrics::new(),
1034 });
1035 let json = result.to_json("s-cancel");
1036
1037 assert_eq!(json["status"], "cancelled");
1038 assert!(json.get("stats").is_some());
1039 }
1040
1041 #[test]
1044 fn session_id_starts_with_prefix() {
1045 let id = gen_session_id();
1046 assert!(id.starts_with("s-"), "id should start with 's-': {id}");
1047 }
1048
1049 #[test]
1050 fn session_id_uniqueness() {
1051 let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
1052 let set: std::collections::HashSet<&String> = ids.iter().collect();
1053 assert_eq!(set.len(), 10, "10 IDs should all be unique");
1054 }
1055
1056 #[test]
1063 fn is_expired_impl_fresh_instant_not_expired() {
1064 let now = std::time::Instant::now();
1066 assert!(!is_expired_impl(now, Duration::from_secs(1)));
1067 }
1068
1069 #[test]
1070 fn is_expired_impl_old_instant_expired() {
1071 let two_hours_ago = std::time::Instant::now()
1073 .checked_sub(Duration::from_secs(7200))
1074 .expect("checked_sub should succeed with sane duration");
1075 assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
1077 }
1078
1079 #[test]
1080 fn is_expired_impl_not_yet_expired() {
1081 let one_hour_ago = std::time::Instant::now()
1083 .checked_sub(Duration::from_secs(3600))
1084 .expect("checked_sub should succeed with sane duration");
1085 assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
1087 }
1088
1089 #[test]
1090 fn is_expired_impl_zero_ttl_always_expired() {
1091 let now = std::time::Instant::now();
1093 assert!(is_expired_impl(now, Duration::ZERO));
1094 }
1095
1096 #[test]
1099 fn pending_filter_default_is_all_off() {
1100 let f = PendingFilter::default();
1101 assert!(!f.query_id);
1102 assert!(!f.max_tokens);
1103 assert!(!f.system);
1104 assert!(!f.grounded);
1105 assert!(!f.underspecified);
1106 assert!(matches!(f.prompt, PromptProjection::Off));
1107 }
1108
1109 #[test]
1110 fn pending_filter_preset_meta_flags() {
1111 let f = PendingFilter::preset_meta();
1112 assert!(f.query_id);
1113 assert!(f.max_tokens);
1114 assert!(!f.system);
1115 assert!(!f.grounded);
1116 assert!(!f.underspecified);
1117 assert!(
1118 matches!(f.prompt, PromptProjection::Off),
1119 "meta preset must not project prompt content"
1120 );
1121 }
1122
1123 #[test]
1124 fn pending_filter_preset_preview_uses_default_chars() {
1125 let f = PendingFilter::preset_preview();
1126 assert!(f.query_id);
1127 assert!(f.max_tokens);
1128 match f.prompt {
1129 PromptProjection::Preview { chars } => {
1130 assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1131 }
1132 other => panic!("expected Preview, got {other:?}"),
1133 }
1134 }
1135
1136 #[test]
1137 fn pending_filter_preset_preview_with_custom_chars() {
1138 let f = PendingFilter::preset_preview_with(42);
1139 match f.prompt {
1140 PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1141 other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1142 }
1143 }
1144
1145 #[test]
1146 fn pending_filter_preset_full_flags_all_on() {
1147 let f = PendingFilter::preset_full();
1148 assert!(f.query_id);
1149 assert!(f.max_tokens);
1150 assert!(f.system);
1151 assert!(f.grounded);
1152 assert!(f.underspecified);
1153 assert!(matches!(f.prompt, PromptProjection::Full));
1154 }
1155
1156 #[test]
1157 fn pending_filter_from_preset_known_names() {
1158 assert!(PendingFilter::from_preset("meta").is_some());
1159 assert!(PendingFilter::from_preset("preview").is_some());
1160 assert!(PendingFilter::from_preset("full").is_some());
1161 }
1162
1163 #[test]
1164 fn pending_filter_from_preset_unknown_returns_none() {
1165 assert!(PendingFilter::from_preset("").is_none());
1168 assert!(PendingFilter::from_preset("META").is_none());
1169 assert!(PendingFilter::from_preset("bogus").is_none());
1170 }
1171
1172 #[test]
1173 fn pending_filter_from_preset_with_overrides_preview_chars() {
1174 let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1177 match f.prompt {
1178 PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1179 other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1180 }
1181
1182 let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1183 assert!(matches!(f_meta.prompt, PromptProjection::Off));
1184
1185 let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1186 assert!(matches!(f_full.prompt, PromptProjection::Full));
1187 }
1188
1189 #[test]
1192 fn project_query_default_filter_produces_empty_object() {
1193 let q = make_query(0);
1194 let v = project_query(&q, &PendingFilter::default());
1195 let obj = v.as_object().expect("object");
1196 assert!(obj.is_empty(), "default filter should project nothing");
1197 }
1198
1199 #[test]
1200 fn project_query_meta_preset_has_id_and_max_tokens_only() {
1201 let q = make_query(0);
1202 let v = project_query(&q, &PendingFilter::preset_meta());
1203 let obj = v.as_object().expect("object");
1204 assert_eq!(obj.len(), 2);
1205 assert_eq!(v["query_id"], "q-0");
1206 assert_eq!(v["max_tokens"], 100);
1207 assert!(obj.get("prompt").is_none());
1208 assert!(obj.get("prompt_preview").is_none());
1209 assert!(obj.get("system").is_none());
1210 assert!(obj.get("grounded").is_none());
1211 assert!(obj.get("underspecified").is_none());
1212 }
1213
1214 #[test]
1215 fn project_query_full_preset_has_all_fields() {
1216 let q = LlmQuery {
1217 id: QueryId::batch(0),
1218 prompt: "hi".into(),
1219 system: Some("sys".into()),
1220 max_tokens: 100,
1221 grounded: true,
1222 underspecified: true,
1223 };
1224 let v = project_query(&q, &PendingFilter::preset_full());
1225 assert_eq!(v["query_id"], "q-0");
1226 assert_eq!(v["max_tokens"], 100);
1227 assert_eq!(v["system"], "sys");
1228 assert_eq!(v["grounded"], true);
1229 assert_eq!(v["underspecified"], true);
1230 assert_eq!(v["prompt"], "hi");
1231 assert!(v.get("prompt_preview").is_none());
1232 }
1233
1234 #[test]
1235 fn project_query_preview_truncates_at_char_count() {
1236 let q = LlmQuery {
1237 id: QueryId::batch(0),
1238 prompt: "abcdefghij".into(),
1239 system: None,
1240 max_tokens: 10,
1241 grounded: false,
1242 underspecified: false,
1243 };
1244 let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1245 assert_eq!(v["prompt_preview"], "abcde");
1246 assert!(v.get("prompt").is_none());
1247 }
1248
1249 #[test]
1250 fn project_query_preview_utf8_multibyte_safe() {
1251 let prompt = "あいうえお";
1255 let q = LlmQuery {
1256 id: QueryId::batch(0),
1257 prompt: prompt.to_string(),
1258 system: None,
1259 max_tokens: 10,
1260 grounded: false,
1261 underspecified: false,
1262 };
1263 let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1264 let preview = v["prompt_preview"].as_str().expect("str");
1265 assert_eq!(preview, "あいう");
1266 assert_eq!(preview.chars().count(), 3);
1267 }
1268
1269 #[test]
1270 fn project_query_preview_chars_over_length_returns_whole_prompt() {
1271 let q = LlmQuery {
1272 id: QueryId::batch(0),
1273 prompt: "abc".into(),
1274 system: None,
1275 max_tokens: 10,
1276 grounded: false,
1277 underspecified: false,
1278 };
1279 let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1280 assert_eq!(v["prompt_preview"], "abc");
1281 }
1282
1283 #[test]
1284 fn project_query_system_field_null_when_absent() {
1285 let q = LlmQuery {
1286 id: QueryId::batch(0),
1287 prompt: "p".into(),
1288 system: None,
1289 max_tokens: 10,
1290 grounded: false,
1291 underspecified: false,
1292 };
1293 let filter = PendingFilter {
1294 system: true,
1295 ..Default::default()
1296 };
1297 let v = project_query(&q, &filter);
1298 assert!(
1299 v["system"].is_null(),
1300 "absent system must serialize as null"
1301 );
1302 }
1303
1304 #[test]
1307 fn pending_filter_deserialize_custom_object_preview() {
1308 let raw = serde_json::json!({
1310 "query_id": true,
1311 "prompt": { "mode": "preview", "chars": 50 }
1312 });
1313 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1314 assert!(f.query_id);
1315 match f.prompt {
1316 PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1317 other => panic!("expected Preview, got {other:?}"),
1318 }
1319 }
1320
1321 #[test]
1322 fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1323 let raw = serde_json::json!({});
1326 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1327 assert!(!f.query_id);
1328 assert!(matches!(f.prompt, PromptProjection::Off));
1329 }
1330
1331 #[test]
1332 fn pending_filter_deserialize_prompt_full_tag() {
1333 let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1334 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1335 assert!(matches!(f.prompt, PromptProjection::Full));
1336 }
1337
1338 fn tmp_dirs() -> (
1346 std::sync::Arc<crate::state::JsonFileStore>,
1347 std::sync::Arc<crate::card::FileCardStore>,
1348 std::path::PathBuf,
1349 ) {
1350 let tmp = tempfile::tempdir().expect("test tempdir");
1351 let root = tmp.path().to_path_buf();
1352 std::mem::forget(tmp);
1353 (
1354 std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
1355 std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
1356 root.join("scenarios"),
1357 )
1358 }
1359
1360 #[tokio::test]
1367 async fn snapshot_v2_contains_phase_and_timestamps() {
1368 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1369 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1370
1371 let code = r#"
1373 local response = alc.llm("what is 2+2?")
1374 return response
1375 "#
1376 .to_string();
1377
1378 let session = executor
1379 .start_session(
1380 code,
1381 serde_json::json!({}),
1382 vec![],
1383 vec![],
1384 state_store,
1385 card_store,
1386 scenarios_dir,
1387 )
1388 .await
1389 .unwrap();
1390
1391 let snap = session.snapshot(None, false);
1394
1395 assert!(
1397 snap.get("phase").is_some(),
1398 "snapshot must have 'phase' field"
1399 );
1400 assert_eq!(snap["phase"], "running", "initial state must be running");
1401
1402 assert_eq!(snap["state"], "running");
1404
1405 let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
1407 assert!(started_at > 0, "started_at must be > 0 (unix ms)");
1408
1409 let last_activity = snap["last_activity_at"]
1411 .as_i64()
1412 .expect("last_activity_at must be i64");
1413 assert_eq!(
1414 started_at, last_activity,
1415 "last_activity_at should equal started_at before any feed"
1416 );
1417 }
1418
1419 #[test]
1423 fn snapshot_phase_running_state_label() {
1424 let cases: &[(&str, &str)] = &[
1428 ("running", "running"),
1429 ("paused", "paused"),
1430 ("completed", "completed"),
1431 ("failed", "failed"),
1432 ("cancelled", "cancelled"),
1433 ];
1434 for (state_str, expected_phase) in cases {
1435 let three_value_state = match *state_str {
1439 "running" => "running",
1440 "paused" => "paused",
1441 _ => "terminal",
1442 };
1443 assert_eq!(
1445 *expected_phase, *state_str,
1446 "phase for {state_str} must be the same string"
1447 );
1448 if *state_str != "running" && *state_str != "paused" {
1449 assert_eq!(
1450 three_value_state, "terminal",
1451 "{state_str} must map to 'terminal' in 3-value state"
1452 );
1453 }
1454 }
1455 }
1456
1457 #[tokio::test]
1459 async fn snapshot_conversation_history_opt_in() {
1460 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1461 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1462
1463 let code = r#"
1464 local response = alc.llm("explain recursion")
1465 return response
1466 "#
1467 .to_string();
1468
1469 let session = executor
1470 .start_session(
1471 code,
1472 serde_json::json!({}),
1473 vec![],
1474 vec![],
1475 state_store,
1476 card_store,
1477 scenarios_dir,
1478 )
1479 .await
1480 .unwrap();
1481
1482 let snap_false = session.snapshot(None, false);
1484 assert!(
1485 snap_false
1486 .get("metrics")
1487 .and_then(|m| m.get("conversation_history"))
1488 .is_none(),
1489 "conversation_history must be absent with include_history=false"
1490 );
1491
1492 let snap_true = session.snapshot(None, true);
1494 if let Some(metrics) = snap_true.get("metrics") {
1497 let _ = metrics.get("conversation_history");
1501 }
1502 }
1503
1504 #[tokio::test]
1506 async fn snapshot_last_activity_at_starts_equal_to_started_at() {
1507 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1508 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1509
1510 let code = r#"
1511 local response = alc.llm("test query")
1512 return response
1513 "#
1514 .to_string();
1515
1516 let session = executor
1517 .start_session(
1518 code,
1519 serde_json::json!({}),
1520 vec![],
1521 vec![],
1522 state_store,
1523 card_store,
1524 scenarios_dir,
1525 )
1526 .await
1527 .unwrap();
1528
1529 let snap = session.snapshot(None, false);
1530 let started_at = snap["started_at"].as_i64().unwrap_or(-1);
1531 let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
1532
1533 assert_eq!(
1534 started_at, last_activity,
1535 "last_activity_at must equal started_at before any feed_one"
1536 );
1537 assert!(started_at > 0, "started_at must be positive unix ms");
1538 }
1539}