1use std::collections::HashMap;
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use algocline_core::{
12 ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
13 TerminalState,
14};
15use mlua_isle::{AsyncIsleDriver, AsyncTask};
16use serde_json::json;
17use tokio::sync::Mutex;
18
19use crate::llm_bridge::LlmRequest;
20
21#[derive(Debug, thiserror::Error)]
24pub enum SessionError {
25 #[error("session '{0}' not found")]
26 NotFound(String),
27 #[error(transparent)]
28 Feed(#[from] algocline_core::FeedError),
29 #[error("invalid transition: {0}")]
30 InvalidTransition(String),
31}
32
33#[derive(serde::Serialize)]
37pub struct ExecutionResult {
38 pub state: TerminalState,
39 pub metrics: ExecutionMetrics,
40}
41
42#[derive(serde::Serialize)]
44pub enum FeedResult {
45 Accepted { remaining: usize },
47 Paused { queries: Vec<LlmQuery> },
49 Finished(ExecutionResult),
51}
52
53impl FeedResult {
54 pub fn to_json(&self, session_id: &str) -> serde_json::Value {
56 match self {
57 Self::Accepted { remaining } => json!({
58 "status": "accepted",
59 "remaining": remaining,
60 }),
61 Self::Paused { queries } => {
62 if queries.len() == 1 {
63 let q = &queries[0];
64 let mut obj = json!({
65 "status": "needs_response",
66 "session_id": session_id,
67 "query_id": q.id.as_str(),
68 "prompt": q.prompt,
69 "system": q.system,
70 "max_tokens": q.max_tokens,
71 });
72 if q.grounded {
73 obj["grounded"] = json!(true);
74 }
75 if q.underspecified {
76 obj["underspecified"] = json!(true);
77 }
78 obj
79 } else {
80 let qs: Vec<_> = queries
81 .iter()
82 .map(|q| {
83 let mut obj = json!({
84 "id": q.id.as_str(),
85 "prompt": q.prompt,
86 "system": q.system,
87 "max_tokens": q.max_tokens,
88 });
89 if q.grounded {
90 obj["grounded"] = json!(true);
91 }
92 if q.underspecified {
93 obj["underspecified"] = json!(true);
94 }
95 obj
96 })
97 .collect();
98 json!({
99 "status": "needs_response",
100 "session_id": session_id,
101 "queries": qs,
102 })
103 }
104 }
105 Self::Finished(result) => match &result.state {
106 TerminalState::Completed { result: val } => json!({
107 "status": "completed",
108 "result": val,
109 "stats": result.metrics.to_json(),
110 }),
111 TerminalState::Failed { error } => json!({
112 "status": "error",
113 "error": error,
114 }),
115 TerminalState::Cancelled => json!({
116 "status": "cancelled",
117 "stats": result.metrics.to_json(),
118 }),
119 },
120 }
121 }
122}
123
124pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
130
131#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
138pub struct PendingFilter {
139 #[serde(default)]
140 pub query_id: bool,
141 #[serde(default)]
142 pub max_tokens: bool,
143 #[serde(default)]
144 pub system: bool,
145 #[serde(default)]
146 pub grounded: bool,
147 #[serde(default)]
148 pub underspecified: bool,
149 #[serde(default)]
150 pub prompt: PromptProjection,
151}
152
153#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
159#[serde(tag = "mode", rename_all = "snake_case")]
160pub enum PromptProjection {
161 #[default]
162 Off,
163 Preview {
164 chars: usize,
165 },
166 Full,
167}
168
169impl PendingFilter {
170 pub fn preset_meta() -> Self {
172 Self {
173 query_id: true,
174 max_tokens: true,
175 ..Self::default()
176 }
177 }
178
179 pub fn preset_preview() -> Self {
182 Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
183 }
184
185 pub fn preset_preview_with(chars: usize) -> Self {
188 Self {
189 query_id: true,
190 max_tokens: true,
191 prompt: PromptProjection::Preview { chars },
192 ..Self::default()
193 }
194 }
195
196 pub fn preset_full() -> Self {
198 Self {
199 query_id: true,
200 max_tokens: true,
201 system: true,
202 grounded: true,
203 underspecified: true,
204 prompt: PromptProjection::Full,
205 }
206 }
207
208 pub fn from_preset(name: &str) -> Option<Self> {
212 match name {
213 "meta" => Some(Self::preset_meta()),
214 "preview" => Some(Self::preset_preview()),
215 "full" => Some(Self::preset_full()),
216 _ => None,
217 }
218 }
219
220 pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
223 match name {
224 "meta" => Some(Self::preset_meta()),
225 "preview" => Some(Self::preset_preview_with(preview_chars)),
226 "full" => Some(Self::preset_full()),
227 _ => None,
228 }
229 }
230}
231
232fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
237 let mut obj = serde_json::Map::new();
238 if f.query_id {
239 obj.insert("query_id".into(), q.id.as_str().into());
240 }
241 if f.max_tokens {
242 obj.insert("max_tokens".into(), q.max_tokens.into());
243 }
244 if f.system {
245 obj.insert(
246 "system".into(),
247 match &q.system {
248 Some(s) => serde_json::Value::String(s.clone()),
249 None => serde_json::Value::Null,
250 },
251 );
252 }
253 if f.grounded {
254 obj.insert("grounded".into(), q.grounded.into());
255 }
256 if f.underspecified {
257 obj.insert("underspecified".into(), q.underspecified.into());
258 }
259 match &f.prompt {
260 PromptProjection::Off => {}
261 PromptProjection::Full => {
262 obj.insert("prompt".into(), q.prompt.clone().into());
263 }
264 PromptProjection::Preview { chars } => {
265 let preview: String = q.prompt.chars().take(*chars).collect();
266 obj.insert("prompt_preview".into(), preview.into());
267 }
268 }
269 serde_json::Value::Object(obj)
270}
271
272pub struct Session {
280 state: ExecutionState,
281 metrics: ExecutionMetrics,
282 observer: MetricsObserver,
283 llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
284 exec_task: AsyncTask,
285 resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
287 _vm_driver: AsyncIsleDriver,
290 last_active: std::time::Instant,
293 started_at_ms: i64,
295 last_activity_ms: Arc<AtomicI64>,
298}
299
300impl Session {
301 pub fn new(
317 llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
318 exec_task: AsyncTask,
319 metrics: ExecutionMetrics,
320 vm_driver: AsyncIsleDriver,
321 ) -> Self {
322 let observer = metrics.create_observer();
323 let started_at_ms = SystemTime::now()
326 .duration_since(UNIX_EPOCH)
327 .unwrap_or_default()
328 .as_millis() as i64;
329 Self {
330 state: ExecutionState::Running,
331 metrics,
332 observer,
333 llm_rx,
334 exec_task,
335 resp_txs: HashMap::new(),
336 _vm_driver: vm_driver,
337 last_active: std::time::Instant::now(),
338 started_at_ms,
339 last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
340 }
341 }
342
343 async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
348 tokio::select! {
349 result = &mut self.exec_task => {
350 match result {
351 Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
352 Ok(v) => {
353 self.state.complete(v.clone()).map_err(|e| {
354 SessionError::InvalidTransition(e.to_string())
355 })?;
356 self.observer.on_completed(&v);
357 Ok(FeedResult::Finished(ExecutionResult {
358 state: TerminalState::Completed { result: v },
359 metrics: self.take_metrics(),
360 }))
361 }
362 Err(e) => self.fail_with(format!("JSON parse: {e}")),
363 },
364 Err(e) => self.fail_with(e.to_string()),
365 }
366 }
367 Some(req) = self.llm_rx.recv() => {
368 let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
369 id: qr.id.clone(),
370 prompt: qr.prompt.clone(),
371 system: qr.system.clone(),
372 max_tokens: qr.max_tokens,
373 grounded: qr.grounded,
374 underspecified: qr.underspecified,
375 }).collect();
376
377 for qr in req.queries {
378 self.resp_txs.insert(qr.id, qr.resp_tx);
379 }
380
381 self.state.pause(queries.clone()).map_err(|e| {
382 SessionError::InvalidTransition(e.to_string())
383 })?;
384 self.observer.on_paused(&queries);
385 Ok(FeedResult::Paused { queries })
386 }
387 }
388 }
389
390 fn feed_one(
406 &mut self,
407 query_id: &QueryId,
408 response: String,
409 usage: Option<&algocline_core::TokenUsage>,
410 ) -> Result<bool, SessionError> {
411 self.last_active = std::time::Instant::now();
413 let now_ms = SystemTime::now()
416 .duration_since(UNIX_EPOCH)
417 .unwrap_or_default()
418 .as_millis() as i64;
419 self.last_activity_ms.store(now_ms, Ordering::Relaxed);
420
421 self.observer.on_response_fed(query_id, &response, usage);
423
424 if let Some(tx) = self.resp_txs.remove(query_id) {
426 let _ = tx.send(Ok(response.clone()));
427 }
428
429 let complete = self
431 .state
432 .feed(query_id, response)
433 .map_err(SessionError::Feed)?;
434
435 if complete {
436 self.state
438 .take_responses()
439 .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
440 self.observer.on_resumed();
441 } else {
442 self.observer
443 .on_partial_feed(query_id, self.state.remaining());
444 }
445
446 Ok(complete)
447 }
448
449 fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
450 self.state
451 .fail(msg.clone())
452 .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
453 self.observer.on_failed(&msg);
454 Ok(FeedResult::Finished(ExecutionResult {
455 state: TerminalState::Failed { error: msg },
456 metrics: self.take_metrics(),
457 }))
458 }
459
460 fn take_metrics(&mut self) -> ExecutionMetrics {
461 std::mem::take(&mut self.metrics)
462 }
463
464 pub fn snapshot(
491 &self,
492 pending_filter: Option<&PendingFilter>,
493 include_history: bool,
494 ) -> serde_json::Value {
495 let state_label = match &self.state {
496 ExecutionState::Running => "running",
497 ExecutionState::Paused(_) => "paused",
498 _ => "terminal",
499 };
500
501 let phase = match &self.state {
502 ExecutionState::Running => "running",
503 ExecutionState::Paused(_) => "paused",
504 ExecutionState::Completed { .. } => "completed",
505 ExecutionState::Failed { .. } => "failed",
506 ExecutionState::Cancelled => "cancelled",
507 };
508
509 let mut json = serde_json::json!({
510 "state": state_label,
511 "phase": phase,
512 "started_at": self.started_at_ms,
513 "last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
514 });
515
516 let metrics = self.metrics.snapshot(include_history);
517 if !metrics.is_null() {
518 json["metrics"] = metrics;
519 }
520
521 if let ExecutionState::Paused(pending) = &self.state {
523 json["pending_queries"] = pending.remaining().into();
524
525 if let Some(filter) = pending_filter {
526 let items: Vec<serde_json::Value> = pending
527 .pending_queries()
528 .iter()
529 .map(|q| project_query(q, filter))
530 .collect();
531 json["pending"] = serde_json::Value::Array(items);
532 }
533 }
534
535 json
536 }
537
538 pub fn is_expired(&self, ttl: Duration) -> bool {
543 is_expired_impl(self.last_active, ttl)
544 }
545
546 pub(crate) fn into_driver_parts(
555 self,
556 ) -> (
557 AsyncTask,
558 tokio::sync::mpsc::Receiver<LlmRequest>,
559 AsyncIsleDriver,
560 ) {
561 (self.exec_task, self.llm_rx, self._vm_driver)
562 }
563}
564
565fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
567 std::time::Instant::now().saturating_duration_since(last_active) >= ttl
568}
569
570pub struct SessionRegistry {
605 sessions: Arc<Mutex<HashMap<String, Session>>>,
606}
607
608impl Default for SessionRegistry {
609 fn default() -> Self {
610 Self::new()
611 }
612}
613
614impl SessionRegistry {
615 pub fn new() -> Self {
616 Self {
617 sessions: Arc::new(Mutex::new(HashMap::new())),
618 }
619 }
620
621 pub async fn start_execution(
623 &self,
624 mut session: Session,
625 ) -> Result<(String, FeedResult), SessionError> {
626 let session_id = gen_session_id();
627 let result = session.wait_event().await?;
628
629 if matches!(result, FeedResult::Paused { .. }) {
630 self.sessions
631 .lock()
632 .await
633 .insert(session_id.clone(), session);
634 }
635
636 Ok((session_id, result))
637 }
638
639 pub async fn feed_response(
645 &self,
646 session_id: &str,
647 query_id: &QueryId,
648 response: String,
649 usage: Option<&algocline_core::TokenUsage>,
650 ) -> Result<FeedResult, SessionError> {
651 let complete = {
653 let mut map = self.sessions.lock().await;
654 let session = map
655 .get_mut(session_id)
656 .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
657
658 let complete = session.feed_one(query_id, response, usage)?;
659
660 if !complete {
661 return Ok(FeedResult::Accepted {
662 remaining: session.state.remaining(),
663 });
664 }
665
666 complete
667 };
668
669 debug_assert!(complete);
671 let mut session = {
672 let mut map = self.sessions.lock().await;
673 map.remove(session_id)
674 .ok_or_else(|| SessionError::NotFound(session_id.into()))?
675 };
676
677 let result = session.wait_event().await?;
678
679 if matches!(result, FeedResult::Paused { .. }) {
680 self.sessions
681 .lock()
682 .await
683 .insert(session_id.into(), session);
684 }
685
686 Ok(result)
687 }
688
689 pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
695 let map = self.sessions.lock().await;
696 let session = map
697 .get(session_id)
698 .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
699 let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
700 match keys.len() {
701 0 => Err(SessionError::InvalidTransition("no pending queries".into())),
702 1 => keys
703 .into_iter()
704 .next()
705 .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
706 n => Err(SessionError::InvalidTransition(format!(
707 "{n} queries pending; specify query_id explicitly"
708 ))),
709 }
710 }
711
712 pub async fn list_snapshots(
728 &self,
729 pending_filter: Option<&PendingFilter>,
730 include_history: bool,
731 ) -> HashMap<String, serde_json::Value> {
732 let map = self.sessions.lock().await;
733 map.iter()
734 .map(|(id, session)| {
735 (
736 id.clone(),
737 session.snapshot(pending_filter, include_history),
738 )
739 })
740 .collect()
741 }
742
743 pub fn spawn_gc_task(&self, ttl: Duration) {
749 let sessions = Arc::clone(&self.sessions);
750 tokio::spawn(async move {
751 let mut interval = tokio::time::interval(Duration::from_secs(60));
752 loop {
753 interval.tick().await;
754 let mut map = sessions.lock().await;
755 let expired: Vec<String> = map
756 .iter()
757 .filter(|(_, s)| s.is_expired(ttl))
758 .map(|(id, _)| id.clone())
759 .collect();
760 for id in &expired {
761 tracing::info!(session_id = %id, "GC: reaping expired session");
762 map.remove(id);
763 }
764 }
765 });
766 }
767}
768
769fn gen_session_id() -> String {
788 use std::time::{SystemTime, UNIX_EPOCH};
789 let ts = SystemTime::now()
790 .duration_since(UNIX_EPOCH)
791 .unwrap_or_default()
792 .as_nanos();
793 let random: u64 = {
795 use std::collections::hash_map::RandomState;
796 use std::hash::{BuildHasher, Hasher};
797 let s = RandomState::new();
798 let mut h = s.build_hasher();
799 h.write_u128(ts);
800 h.finish()
801 };
802 format!("s-{ts:x}-{random:016x}")
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808 use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
809 use serde_json::json;
810
811 fn make_query(index: usize) -> LlmQuery {
812 LlmQuery {
813 id: QueryId::batch(index),
814 prompt: format!("prompt-{index}"),
815 system: None,
816 max_tokens: 100,
817 grounded: false,
818 underspecified: false,
819 }
820 }
821
822 #[test]
825 fn to_json_accepted() {
826 let result = FeedResult::Accepted { remaining: 3 };
827 let json = result.to_json("s-123");
828 assert_eq!(json["status"], "accepted");
829 assert_eq!(json["remaining"], 3);
830 }
831
832 #[test]
833 fn to_json_paused_single_query() {
834 let query = LlmQuery {
835 id: QueryId::single(),
836 prompt: "What is 2+2?".into(),
837 system: Some("You are a calculator.".into()),
838 max_tokens: 50,
839 grounded: false,
840 underspecified: false,
841 };
842 let result = FeedResult::Paused {
843 queries: vec![query],
844 };
845 let json = result.to_json("s-abc");
846
847 assert_eq!(json["status"], "needs_response");
848 assert_eq!(json["session_id"], "s-abc");
849 assert_eq!(json["prompt"], "What is 2+2?");
850 assert_eq!(json["system"], "You are a calculator.");
851 assert_eq!(json["max_tokens"], 50);
852 assert!(json.get("queries").is_none());
854 assert!(
856 json.get("grounded").is_none(),
857 "grounded key must be absent when false"
858 );
859 assert!(
861 json.get("underspecified").is_none(),
862 "underspecified key must be absent when false"
863 );
864 }
865
866 #[test]
867 fn to_json_paused_single_query_grounded() {
868 let query = LlmQuery {
869 id: QueryId::single(),
870 prompt: "verify this claim".into(),
871 system: None,
872 max_tokens: 200,
873 grounded: true,
874 underspecified: false,
875 };
876 let result = FeedResult::Paused {
877 queries: vec![query],
878 };
879 let json = result.to_json("s-grounded");
880
881 assert_eq!(json["status"], "needs_response");
882 assert_eq!(
883 json["grounded"], true,
884 "grounded must appear in single-query MCP JSON"
885 );
886 }
887
888 #[test]
889 fn to_json_paused_single_query_underspecified() {
890 let query = LlmQuery {
891 id: QueryId::single(),
892 prompt: "what output format do you need?".into(),
893 system: None,
894 max_tokens: 200,
895 grounded: false,
896 underspecified: true,
897 };
898 let result = FeedResult::Paused {
899 queries: vec![query],
900 };
901 let json = result.to_json("s-underspec");
902
903 assert_eq!(json["status"], "needs_response");
904 assert_eq!(
905 json["underspecified"], true,
906 "underspecified must appear in single-query MCP JSON"
907 );
908 assert!(
909 json.get("grounded").is_none(),
910 "grounded must be absent when false"
911 );
912 }
913
914 #[test]
915 fn to_json_paused_multiple_queries_mixed_grounded() {
916 let grounded_query = LlmQuery {
917 id: QueryId::batch(0),
918 prompt: "verify".into(),
919 system: None,
920 max_tokens: 100,
921 grounded: true,
922 underspecified: false,
923 };
924 let normal_query = LlmQuery {
925 id: QueryId::batch(1),
926 prompt: "generate".into(),
927 system: None,
928 max_tokens: 100,
929 grounded: false,
930 underspecified: false,
931 };
932 let result = FeedResult::Paused {
933 queries: vec![grounded_query, normal_query],
934 };
935 let json = result.to_json("s-batch");
936
937 let qs = json["queries"].as_array().expect("queries should be array");
938 assert_eq!(
939 qs[0]["grounded"], true,
940 "grounded query must have grounded=true"
941 );
942 assert!(
943 qs[1].get("grounded").is_none(),
944 "non-grounded query must omit grounded key"
945 );
946 }
947
948 #[test]
949 fn to_json_paused_multiple_queries_mixed_underspecified() {
950 let underspec_query = LlmQuery {
951 id: QueryId::batch(0),
952 prompt: "clarify intent".into(),
953 system: None,
954 max_tokens: 100,
955 grounded: false,
956 underspecified: true,
957 };
958 let normal_query = LlmQuery {
959 id: QueryId::batch(1),
960 prompt: "generate".into(),
961 system: None,
962 max_tokens: 100,
963 grounded: false,
964 underspecified: false,
965 };
966 let result = FeedResult::Paused {
967 queries: vec![underspec_query, normal_query],
968 };
969 let json = result.to_json("s-batch-us");
970
971 let qs = json["queries"].as_array().expect("queries should be array");
972 assert_eq!(
973 qs[0]["underspecified"], true,
974 "underspecified query must have underspecified=true"
975 );
976 assert!(
977 qs[1].get("underspecified").is_none(),
978 "non-underspecified query must omit underspecified key"
979 );
980 }
981
982 #[test]
983 fn to_json_paused_single_query_no_system() {
984 let query = LlmQuery {
985 id: QueryId::single(),
986 prompt: "hello".into(),
987 system: None,
988 max_tokens: 1024,
989 grounded: false,
990 underspecified: false,
991 };
992 let result = FeedResult::Paused {
993 queries: vec![query],
994 };
995 let json = result.to_json("s-x");
996
997 assert_eq!(json["status"], "needs_response");
998 assert!(json["system"].is_null());
999 }
1000
1001 #[test]
1002 fn to_json_paused_multiple_queries() {
1003 let queries = vec![make_query(0), make_query(1), make_query(2)];
1004 let result = FeedResult::Paused { queries };
1005 let json = result.to_json("s-multi");
1006
1007 assert_eq!(json["status"], "needs_response");
1008 assert_eq!(json["session_id"], "s-multi");
1009
1010 let qs = json["queries"].as_array().expect("queries should be array");
1011 assert_eq!(qs.len(), 3);
1012 assert_eq!(qs[0]["id"], "q-0");
1013 assert_eq!(qs[0]["prompt"], "prompt-0");
1014 assert_eq!(qs[1]["id"], "q-1");
1015 assert_eq!(qs[2]["id"], "q-2");
1016 }
1017
1018 #[test]
1019 fn to_json_finished_completed() {
1020 let result = FeedResult::Finished(ExecutionResult {
1021 state: TerminalState::Completed {
1022 result: json!({"answer": 42}),
1023 },
1024 metrics: ExecutionMetrics::new(),
1025 });
1026 let json = result.to_json("s-done");
1027
1028 assert_eq!(json["status"], "completed");
1029 assert_eq!(json["result"]["answer"], 42);
1030 assert!(json.get("stats").is_some());
1031 }
1032
1033 #[test]
1034 fn to_json_finished_failed() {
1035 let result = FeedResult::Finished(ExecutionResult {
1036 state: TerminalState::Failed {
1037 error: "lua error: bad argument".into(),
1038 },
1039 metrics: ExecutionMetrics::new(),
1040 });
1041 let json = result.to_json("s-err");
1042
1043 assert_eq!(json["status"], "error");
1044 assert_eq!(json["error"], "lua error: bad argument");
1045 }
1046
1047 #[test]
1048 fn to_json_finished_cancelled() {
1049 let result = FeedResult::Finished(ExecutionResult {
1050 state: TerminalState::Cancelled,
1051 metrics: ExecutionMetrics::new(),
1052 });
1053 let json = result.to_json("s-cancel");
1054
1055 assert_eq!(json["status"], "cancelled");
1056 assert!(json.get("stats").is_some());
1057 }
1058
1059 #[test]
1062 fn session_id_starts_with_prefix() {
1063 let id = gen_session_id();
1064 assert!(id.starts_with("s-"), "id should start with 's-': {id}");
1065 }
1066
1067 #[test]
1068 fn session_id_uniqueness() {
1069 let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
1070 let set: std::collections::HashSet<&String> = ids.iter().collect();
1071 assert_eq!(set.len(), 10, "10 IDs should all be unique");
1072 }
1073
1074 #[test]
1081 fn is_expired_impl_fresh_instant_not_expired() {
1082 let now = std::time::Instant::now();
1084 assert!(!is_expired_impl(now, Duration::from_secs(1)));
1085 }
1086
1087 #[test]
1088 fn is_expired_impl_old_instant_expired() {
1089 let two_hours_ago = std::time::Instant::now()
1091 .checked_sub(Duration::from_secs(7200))
1092 .expect("checked_sub should succeed with sane duration");
1093 assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
1095 }
1096
1097 #[test]
1098 fn is_expired_impl_not_yet_expired() {
1099 let one_hour_ago = std::time::Instant::now()
1101 .checked_sub(Duration::from_secs(3600))
1102 .expect("checked_sub should succeed with sane duration");
1103 assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
1105 }
1106
1107 #[test]
1108 fn is_expired_impl_zero_ttl_always_expired() {
1109 let now = std::time::Instant::now();
1111 assert!(is_expired_impl(now, Duration::ZERO));
1112 }
1113
1114 #[test]
1117 fn pending_filter_default_is_all_off() {
1118 let f = PendingFilter::default();
1119 assert!(!f.query_id);
1120 assert!(!f.max_tokens);
1121 assert!(!f.system);
1122 assert!(!f.grounded);
1123 assert!(!f.underspecified);
1124 assert!(matches!(f.prompt, PromptProjection::Off));
1125 }
1126
1127 #[test]
1128 fn pending_filter_preset_meta_flags() {
1129 let f = PendingFilter::preset_meta();
1130 assert!(f.query_id);
1131 assert!(f.max_tokens);
1132 assert!(!f.system);
1133 assert!(!f.grounded);
1134 assert!(!f.underspecified);
1135 assert!(
1136 matches!(f.prompt, PromptProjection::Off),
1137 "meta preset must not project prompt content"
1138 );
1139 }
1140
1141 #[test]
1142 fn pending_filter_preset_preview_uses_default_chars() {
1143 let f = PendingFilter::preset_preview();
1144 assert!(f.query_id);
1145 assert!(f.max_tokens);
1146 match f.prompt {
1147 PromptProjection::Preview { chars } => {
1148 assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1149 }
1150 other => panic!("expected Preview, got {other:?}"),
1151 }
1152 }
1153
1154 #[test]
1155 fn pending_filter_preset_preview_with_custom_chars() {
1156 let f = PendingFilter::preset_preview_with(42);
1157 match f.prompt {
1158 PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1159 other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1160 }
1161 }
1162
1163 #[test]
1164 fn pending_filter_preset_full_flags_all_on() {
1165 let f = PendingFilter::preset_full();
1166 assert!(f.query_id);
1167 assert!(f.max_tokens);
1168 assert!(f.system);
1169 assert!(f.grounded);
1170 assert!(f.underspecified);
1171 assert!(matches!(f.prompt, PromptProjection::Full));
1172 }
1173
1174 #[test]
1175 fn pending_filter_from_preset_known_names() {
1176 assert!(PendingFilter::from_preset("meta").is_some());
1177 assert!(PendingFilter::from_preset("preview").is_some());
1178 assert!(PendingFilter::from_preset("full").is_some());
1179 }
1180
1181 #[test]
1182 fn pending_filter_from_preset_unknown_returns_none() {
1183 assert!(PendingFilter::from_preset("").is_none());
1186 assert!(PendingFilter::from_preset("META").is_none());
1187 assert!(PendingFilter::from_preset("bogus").is_none());
1188 }
1189
1190 #[test]
1191 fn pending_filter_from_preset_with_overrides_preview_chars() {
1192 let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1195 match f.prompt {
1196 PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1197 other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1198 }
1199
1200 let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1201 assert!(matches!(f_meta.prompt, PromptProjection::Off));
1202
1203 let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1204 assert!(matches!(f_full.prompt, PromptProjection::Full));
1205 }
1206
1207 #[test]
1210 fn project_query_default_filter_produces_empty_object() {
1211 let q = make_query(0);
1212 let v = project_query(&q, &PendingFilter::default());
1213 let obj = v.as_object().expect("object");
1214 assert!(obj.is_empty(), "default filter should project nothing");
1215 }
1216
1217 #[test]
1218 fn project_query_meta_preset_has_id_and_max_tokens_only() {
1219 let q = make_query(0);
1220 let v = project_query(&q, &PendingFilter::preset_meta());
1221 let obj = v.as_object().expect("object");
1222 assert_eq!(obj.len(), 2);
1223 assert_eq!(v["query_id"], "q-0");
1224 assert_eq!(v["max_tokens"], 100);
1225 assert!(obj.get("prompt").is_none());
1226 assert!(obj.get("prompt_preview").is_none());
1227 assert!(obj.get("system").is_none());
1228 assert!(obj.get("grounded").is_none());
1229 assert!(obj.get("underspecified").is_none());
1230 }
1231
1232 #[test]
1233 fn project_query_full_preset_has_all_fields() {
1234 let q = LlmQuery {
1235 id: QueryId::batch(0),
1236 prompt: "hi".into(),
1237 system: Some("sys".into()),
1238 max_tokens: 100,
1239 grounded: true,
1240 underspecified: true,
1241 };
1242 let v = project_query(&q, &PendingFilter::preset_full());
1243 assert_eq!(v["query_id"], "q-0");
1244 assert_eq!(v["max_tokens"], 100);
1245 assert_eq!(v["system"], "sys");
1246 assert_eq!(v["grounded"], true);
1247 assert_eq!(v["underspecified"], true);
1248 assert_eq!(v["prompt"], "hi");
1249 assert!(v.get("prompt_preview").is_none());
1250 }
1251
1252 #[test]
1253 fn project_query_preview_truncates_at_char_count() {
1254 let q = LlmQuery {
1255 id: QueryId::batch(0),
1256 prompt: "abcdefghij".into(),
1257 system: None,
1258 max_tokens: 10,
1259 grounded: false,
1260 underspecified: false,
1261 };
1262 let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1263 assert_eq!(v["prompt_preview"], "abcde");
1264 assert!(v.get("prompt").is_none());
1265 }
1266
1267 #[test]
1268 fn project_query_preview_utf8_multibyte_safe() {
1269 let prompt = "あいうえお";
1273 let q = LlmQuery {
1274 id: QueryId::batch(0),
1275 prompt: prompt.to_string(),
1276 system: None,
1277 max_tokens: 10,
1278 grounded: false,
1279 underspecified: false,
1280 };
1281 let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1282 let preview = v["prompt_preview"].as_str().expect("str");
1283 assert_eq!(preview, "あいう");
1284 assert_eq!(preview.chars().count(), 3);
1285 }
1286
1287 #[test]
1288 fn project_query_preview_chars_over_length_returns_whole_prompt() {
1289 let q = LlmQuery {
1290 id: QueryId::batch(0),
1291 prompt: "abc".into(),
1292 system: None,
1293 max_tokens: 10,
1294 grounded: false,
1295 underspecified: false,
1296 };
1297 let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1298 assert_eq!(v["prompt_preview"], "abc");
1299 }
1300
1301 #[test]
1302 fn project_query_system_field_null_when_absent() {
1303 let q = LlmQuery {
1304 id: QueryId::batch(0),
1305 prompt: "p".into(),
1306 system: None,
1307 max_tokens: 10,
1308 grounded: false,
1309 underspecified: false,
1310 };
1311 let filter = PendingFilter {
1312 system: true,
1313 ..Default::default()
1314 };
1315 let v = project_query(&q, &filter);
1316 assert!(
1317 v["system"].is_null(),
1318 "absent system must serialize as null"
1319 );
1320 }
1321
1322 #[test]
1325 fn pending_filter_deserialize_custom_object_preview() {
1326 let raw = serde_json::json!({
1328 "query_id": true,
1329 "prompt": { "mode": "preview", "chars": 50 }
1330 });
1331 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1332 assert!(f.query_id);
1333 match f.prompt {
1334 PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1335 other => panic!("expected Preview, got {other:?}"),
1336 }
1337 }
1338
1339 #[test]
1340 fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1341 let raw = serde_json::json!({});
1344 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1345 assert!(!f.query_id);
1346 assert!(matches!(f.prompt, PromptProjection::Off));
1347 }
1348
1349 #[test]
1350 fn pending_filter_deserialize_prompt_full_tag() {
1351 let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1352 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1353 assert!(matches!(f.prompt, PromptProjection::Full));
1354 }
1355
1356 fn tmp_dirs() -> (
1364 std::sync::Arc<crate::state::JsonFileStore>,
1365 std::sync::Arc<crate::card::FileCardStore>,
1366 std::path::PathBuf,
1367 ) {
1368 let tmp = tempfile::tempdir().expect("test tempdir");
1369 let root = tmp.path().to_path_buf();
1370 std::mem::forget(tmp);
1371 (
1372 std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
1373 std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
1374 root.join("scenarios"),
1375 )
1376 }
1377
1378 #[tokio::test]
1385 async fn snapshot_v2_contains_phase_and_timestamps() {
1386 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1387 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1388
1389 let code = r#"
1391 local response = alc.llm("what is 2+2?")
1392 return response
1393 "#
1394 .to_string();
1395
1396 let session = executor
1397 .start_session(
1398 code,
1399 serde_json::json!({}),
1400 vec![],
1401 vec![],
1402 state_store,
1403 card_store,
1404 scenarios_dir,
1405 )
1406 .await
1407 .unwrap();
1408
1409 let snap = session.snapshot(None, false);
1412
1413 assert!(
1415 snap.get("phase").is_some(),
1416 "snapshot must have 'phase' field"
1417 );
1418 assert_eq!(snap["phase"], "running", "initial state must be running");
1419
1420 assert_eq!(snap["state"], "running");
1422
1423 let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
1425 assert!(started_at > 0, "started_at must be > 0 (unix ms)");
1426
1427 let last_activity = snap["last_activity_at"]
1429 .as_i64()
1430 .expect("last_activity_at must be i64");
1431 assert_eq!(
1432 started_at, last_activity,
1433 "last_activity_at should equal started_at before any feed"
1434 );
1435 }
1436
1437 #[test]
1441 fn snapshot_phase_running_state_label() {
1442 let cases: &[(&str, &str)] = &[
1446 ("running", "running"),
1447 ("paused", "paused"),
1448 ("completed", "completed"),
1449 ("failed", "failed"),
1450 ("cancelled", "cancelled"),
1451 ];
1452 for (state_str, expected_phase) in cases {
1453 let three_value_state = match *state_str {
1457 "running" => "running",
1458 "paused" => "paused",
1459 _ => "terminal",
1460 };
1461 assert_eq!(
1463 *expected_phase, *state_str,
1464 "phase for {state_str} must be the same string"
1465 );
1466 if *state_str != "running" && *state_str != "paused" {
1467 assert_eq!(
1468 three_value_state, "terminal",
1469 "{state_str} must map to 'terminal' in 3-value state"
1470 );
1471 }
1472 }
1473 }
1474
1475 #[tokio::test]
1477 async fn snapshot_conversation_history_opt_in() {
1478 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1479 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1480
1481 let code = r#"
1482 local response = alc.llm("explain recursion")
1483 return response
1484 "#
1485 .to_string();
1486
1487 let session = executor
1488 .start_session(
1489 code,
1490 serde_json::json!({}),
1491 vec![],
1492 vec![],
1493 state_store,
1494 card_store,
1495 scenarios_dir,
1496 )
1497 .await
1498 .unwrap();
1499
1500 let snap_false = session.snapshot(None, false);
1502 assert!(
1503 snap_false
1504 .get("metrics")
1505 .and_then(|m| m.get("conversation_history"))
1506 .is_none(),
1507 "conversation_history must be absent with include_history=false"
1508 );
1509
1510 let snap_true = session.snapshot(None, true);
1512 if let Some(metrics) = snap_true.get("metrics") {
1515 let _ = metrics.get("conversation_history");
1519 }
1520 }
1521
1522 #[tokio::test]
1524 async fn snapshot_last_activity_at_starts_equal_to_started_at() {
1525 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1526 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1527
1528 let code = r#"
1529 local response = alc.llm("test query")
1530 return response
1531 "#
1532 .to_string();
1533
1534 let session = executor
1535 .start_session(
1536 code,
1537 serde_json::json!({}),
1538 vec![],
1539 vec![],
1540 state_store,
1541 card_store,
1542 scenarios_dir,
1543 )
1544 .await
1545 .unwrap();
1546
1547 let snap = session.snapshot(None, false);
1548 let started_at = snap["started_at"].as_i64().unwrap_or(-1);
1549 let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
1550
1551 assert_eq!(
1552 started_at, last_activity,
1553 "last_activity_at must equal started_at before any feed_one"
1554 );
1555 assert!(started_at > 0, "started_at must be positive unix ms");
1556 }
1557}