1use std::collections::HashMap;
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use algocline_core::{
12 ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
13 TerminalState,
14};
15use mlua_isle::{AsyncIsleDriver, AsyncTask};
16use serde_json::json;
17use tokio::sync::Mutex;
18
19use crate::llm_bridge::LlmRequest;
20
21#[derive(Debug, thiserror::Error)]
24pub enum SessionError {
25 #[error("session '{0}' not found")]
26 NotFound(String),
27 #[error(transparent)]
28 Feed(#[from] algocline_core::FeedError),
29 #[error("invalid transition: {0}")]
30 InvalidTransition(String),
31}
32
33#[derive(serde::Serialize)]
37pub struct ExecutionResult {
38 pub state: TerminalState,
39 pub metrics: ExecutionMetrics,
40}
41
42#[derive(serde::Serialize)]
44pub enum FeedResult {
45 Accepted { remaining: usize },
47 Paused { queries: Vec<LlmQuery> },
49 Finished(ExecutionResult),
51}
52
53impl FeedResult {
54 pub fn to_json(&self, session_id: &str) -> serde_json::Value {
56 match self {
57 Self::Accepted { remaining } => json!({
58 "status": "accepted",
59 "remaining": remaining,
60 }),
61 Self::Paused { queries } => {
62 if queries.len() == 1 {
63 let q = &queries[0];
64 let mut obj = json!({
65 "status": "needs_response",
66 "session_id": session_id,
67 "query_id": q.id.as_str(),
68 "prompt": q.prompt,
69 "system": q.system,
70 "max_tokens": q.max_tokens,
71 });
72 if q.grounded {
73 obj["grounded"] = json!(true);
74 }
75 if q.underspecified {
76 obj["underspecified"] = json!(true);
77 }
78 obj
79 } else {
80 let qs: Vec<_> = queries
81 .iter()
82 .map(|q| {
83 let mut obj = json!({
84 "id": q.id.as_str(),
85 "prompt": q.prompt,
86 "system": q.system,
87 "max_tokens": q.max_tokens,
88 });
89 if q.grounded {
90 obj["grounded"] = json!(true);
91 }
92 if q.underspecified {
93 obj["underspecified"] = json!(true);
94 }
95 obj
96 })
97 .collect();
98 json!({
99 "status": "needs_response",
100 "session_id": session_id,
101 "queries": qs,
102 })
103 }
104 }
105 Self::Finished(result) => match &result.state {
106 TerminalState::Completed { result: val } => json!({
107 "status": "completed",
108 "result": val,
109 "stats": result.metrics.to_json(),
110 }),
111 TerminalState::Failed { error } => json!({
112 "status": "error",
113 "error": error,
114 }),
115 TerminalState::Cancelled => json!({
116 "status": "cancelled",
117 "stats": result.metrics.to_json(),
118 }),
119 },
120 }
121 }
122}
123
124pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
130
131#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
138pub struct PendingFilter {
139 #[serde(default)]
140 pub query_id: bool,
141 #[serde(default)]
142 pub max_tokens: bool,
143 #[serde(default)]
144 pub system: bool,
145 #[serde(default)]
146 pub grounded: bool,
147 #[serde(default)]
148 pub underspecified: bool,
149 #[serde(default)]
150 pub prompt: PromptProjection,
151}
152
153#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
159#[serde(tag = "mode", rename_all = "snake_case")]
160pub enum PromptProjection {
161 #[default]
162 Off,
163 Preview {
164 chars: usize,
165 },
166 Full,
167}
168
169impl PendingFilter {
170 pub fn preset_meta() -> Self {
172 Self {
173 query_id: true,
174 max_tokens: true,
175 ..Self::default()
176 }
177 }
178
179 pub fn preset_preview() -> Self {
182 Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
183 }
184
185 pub fn preset_preview_with(chars: usize) -> Self {
188 Self {
189 query_id: true,
190 max_tokens: true,
191 prompt: PromptProjection::Preview { chars },
192 ..Self::default()
193 }
194 }
195
196 pub fn preset_full() -> Self {
198 Self {
199 query_id: true,
200 max_tokens: true,
201 system: true,
202 grounded: true,
203 underspecified: true,
204 prompt: PromptProjection::Full,
205 }
206 }
207
208 pub fn from_preset(name: &str) -> Option<Self> {
212 match name {
213 "meta" => Some(Self::preset_meta()),
214 "preview" => Some(Self::preset_preview()),
215 "full" => Some(Self::preset_full()),
216 _ => None,
217 }
218 }
219
220 pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
223 match name {
224 "meta" => Some(Self::preset_meta()),
225 "preview" => Some(Self::preset_preview_with(preview_chars)),
226 "full" => Some(Self::preset_full()),
227 _ => None,
228 }
229 }
230}
231
232fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
237 let mut obj = serde_json::Map::new();
238 if f.query_id {
239 obj.insert("query_id".into(), q.id.as_str().into());
240 }
241 if f.max_tokens {
242 obj.insert("max_tokens".into(), q.max_tokens.into());
243 }
244 if f.system {
245 obj.insert(
246 "system".into(),
247 match &q.system {
248 Some(s) => serde_json::Value::String(s.clone()),
249 None => serde_json::Value::Null,
250 },
251 );
252 }
253 if f.grounded {
254 obj.insert("grounded".into(), q.grounded.into());
255 }
256 if f.underspecified {
257 obj.insert("underspecified".into(), q.underspecified.into());
258 }
259 match &f.prompt {
260 PromptProjection::Off => {}
261 PromptProjection::Full => {
262 obj.insert("prompt".into(), q.prompt.clone().into());
263 }
264 PromptProjection::Preview { chars } => {
265 let preview: String = q.prompt.chars().take(*chars).collect();
266 obj.insert("prompt_preview".into(), preview.into());
267 }
268 }
269 serde_json::Value::Object(obj)
270}
271
272pub struct Session {
280 state: ExecutionState,
281 metrics: ExecutionMetrics,
282 observer: MetricsObserver,
283 llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
284 exec_task: AsyncTask,
285 resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
287 _vm_driver: AsyncIsleDriver,
290 last_active: std::time::Instant,
293 started_at_ms: i64,
295 last_activity_ms: Arc<AtomicI64>,
298}
299
300impl Session {
301 pub fn new(
317 llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
318 exec_task: AsyncTask,
319 metrics: ExecutionMetrics,
320 vm_driver: AsyncIsleDriver,
321 ) -> Self {
322 let observer = metrics.create_observer();
323 let started_at_ms = SystemTime::now()
326 .duration_since(UNIX_EPOCH)
327 .unwrap_or_default()
328 .as_millis() as i64;
329 Self {
330 state: ExecutionState::Running,
331 metrics,
332 observer,
333 llm_rx,
334 exec_task,
335 resp_txs: HashMap::new(),
336 _vm_driver: vm_driver,
337 last_active: std::time::Instant::now(),
338 started_at_ms,
339 last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
340 }
341 }
342
343 async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
348 tokio::select! {
349 result = &mut self.exec_task => {
350 match result {
351 Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
352 Ok(v) => {
353 self.state.complete(v.clone()).map_err(|e| {
354 SessionError::InvalidTransition(e.to_string())
355 })?;
356 self.observer.on_completed(&v);
357 Ok(FeedResult::Finished(ExecutionResult {
358 state: TerminalState::Completed { result: v },
359 metrics: self.take_metrics(),
360 }))
361 }
362 Err(e) => self.fail_with(format!("JSON parse: {e}")),
363 },
364 Err(e) => self.fail_with(e.to_string()),
365 }
366 }
367 Some(req) = self.llm_rx.recv() => {
368 let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
369 id: qr.id.clone(),
370 prompt: qr.prompt.clone(),
371 system: qr.system.clone(),
372 max_tokens: qr.max_tokens,
373 grounded: qr.grounded,
374 underspecified: qr.underspecified,
375 }).collect();
376
377 for qr in req.queries {
378 self.resp_txs.insert(qr.id, qr.resp_tx);
379 }
380
381 self.state.pause(queries.clone()).map_err(|e| {
382 SessionError::InvalidTransition(e.to_string())
383 })?;
384 self.observer.on_paused(&queries);
385 Ok(FeedResult::Paused { queries })
386 }
387 }
388 }
389
390 fn feed_one(
406 &mut self,
407 query_id: &QueryId,
408 response: String,
409 usage: Option<&algocline_core::TokenUsage>,
410 ) -> Result<bool, SessionError> {
411 self.last_active = std::time::Instant::now();
413 let now_ms = SystemTime::now()
416 .duration_since(UNIX_EPOCH)
417 .unwrap_or_default()
418 .as_millis() as i64;
419 self.last_activity_ms.store(now_ms, Ordering::Relaxed);
420
421 self.observer.on_response_fed(query_id, &response, usage);
423
424 if let Some(tx) = self.resp_txs.remove(query_id) {
426 let _ = tx.send(Ok(response.clone()));
427 }
428
429 let complete = self
431 .state
432 .feed(query_id, response)
433 .map_err(SessionError::Feed)?;
434
435 if complete {
436 self.state
438 .take_responses()
439 .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
440 self.observer.on_resumed();
441 } else {
442 self.observer
443 .on_partial_feed(query_id, self.state.remaining());
444 }
445
446 Ok(complete)
447 }
448
449 fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
450 self.state
451 .fail(msg.clone())
452 .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
453 self.observer.on_failed(&msg);
454 Ok(FeedResult::Finished(ExecutionResult {
455 state: TerminalState::Failed { error: msg },
456 metrics: self.take_metrics(),
457 }))
458 }
459
460 fn take_metrics(&mut self) -> ExecutionMetrics {
461 std::mem::take(&mut self.metrics)
462 }
463
464 pub fn snapshot(
491 &self,
492 pending_filter: Option<&PendingFilter>,
493 include_history: bool,
494 ) -> serde_json::Value {
495 let state_label = match &self.state {
496 ExecutionState::Running => "running",
497 ExecutionState::Paused(_) => "paused",
498 _ => "terminal",
499 };
500
501 let phase = match &self.state {
502 ExecutionState::Running => "running",
503 ExecutionState::Paused(_) => "paused",
504 ExecutionState::Completed { .. } => "completed",
505 ExecutionState::Failed { .. } => "failed",
506 ExecutionState::Cancelled => "cancelled",
507 };
508
509 let mut json = serde_json::json!({
510 "state": state_label,
511 "phase": phase,
512 "started_at": self.started_at_ms,
513 "last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
514 });
515
516 let metrics = self.metrics.snapshot(include_history);
517 if !metrics.is_null() {
518 json["metrics"] = metrics;
519 }
520
521 if let ExecutionState::Paused(pending) = &self.state {
523 json["pending_queries"] = pending.remaining().into();
524
525 if let Some(filter) = pending_filter {
526 let items: Vec<serde_json::Value> = pending
527 .pending_queries()
528 .iter()
529 .map(|q| project_query(q, filter))
530 .collect();
531 json["pending"] = serde_json::Value::Array(items);
532 }
533 }
534
535 json
536 }
537
538 pub fn is_expired(&self, ttl: Duration) -> bool {
543 is_expired_impl(self.last_active, ttl)
544 }
545
546 pub(crate) fn into_driver_parts(
554 self,
555 ) -> (
556 AsyncTask,
557 tokio::sync::mpsc::Receiver<LlmRequest>,
558 AsyncIsleDriver,
559 ExecutionMetrics,
560 ) {
561 (self.exec_task, self.llm_rx, self._vm_driver, self.metrics)
562 }
563}
564
565fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
567 std::time::Instant::now().saturating_duration_since(last_active) >= ttl
568}
569
570pub struct SessionRegistry {
605 sessions: Arc<Mutex<HashMap<String, Session>>>,
606}
607
608impl Default for SessionRegistry {
609 fn default() -> Self {
610 Self::new()
611 }
612}
613
614impl SessionRegistry {
615 pub fn new() -> Self {
616 Self {
617 sessions: Arc::new(Mutex::new(HashMap::new())),
618 }
619 }
620
621 pub async fn start_execution(
623 &self,
624 mut session: Session,
625 ) -> Result<(String, FeedResult), SessionError> {
626 let session_id = gen_session_id();
627 let result = session.wait_event().await?;
628
629 if matches!(result, FeedResult::Paused { .. }) {
630 self.sessions
631 .lock()
632 .await
633 .insert(session_id.clone(), session);
634 }
635
636 Ok((session_id, result))
637 }
638
639 pub async fn feed_response(
645 &self,
646 session_id: &str,
647 query_id: &QueryId,
648 response: String,
649 usage: Option<&algocline_core::TokenUsage>,
650 ) -> Result<FeedResult, SessionError> {
651 let complete = {
653 let mut map = self.sessions.lock().await;
654 let session = map
655 .get_mut(session_id)
656 .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
657
658 let complete = session.feed_one(query_id, response, usage)?;
659
660 if !complete {
661 return Ok(FeedResult::Accepted {
662 remaining: session.state.remaining(),
663 });
664 }
665
666 complete
667 };
668
669 debug_assert!(complete);
671 let mut session = {
672 let mut map = self.sessions.lock().await;
673 map.remove(session_id)
674 .ok_or_else(|| SessionError::NotFound(session_id.into()))?
675 };
676
677 let result = session.wait_event().await?;
678
679 if matches!(result, FeedResult::Paused { .. }) {
680 self.sessions
681 .lock()
682 .await
683 .insert(session_id.into(), session);
684 }
685
686 Ok(result)
687 }
688
689 pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
695 let map = self.sessions.lock().await;
696 let session = map
697 .get(session_id)
698 .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
699 let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
700 match keys.len() {
701 0 => Err(SessionError::InvalidTransition("no pending queries".into())),
702 1 => keys
703 .into_iter()
704 .next()
705 .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
706 n => Err(SessionError::InvalidTransition(format!(
707 "{n} queries pending; specify query_id explicitly"
708 ))),
709 }
710 }
711
712 pub async fn list_snapshots(
728 &self,
729 pending_filter: Option<&PendingFilter>,
730 include_history: bool,
731 ) -> HashMap<String, serde_json::Value> {
732 let map = self.sessions.lock().await;
733 map.iter()
734 .map(|(id, session)| {
735 (
736 id.clone(),
737 session.snapshot(pending_filter, include_history),
738 )
739 })
740 .collect()
741 }
742
743 pub fn spawn_gc_task(&self, ttl: Duration) {
749 let sessions = Arc::clone(&self.sessions);
750 tokio::spawn(async move {
751 let mut interval = tokio::time::interval(Duration::from_secs(60));
752 loop {
753 interval.tick().await;
754 let mut map = sessions.lock().await;
755 let expired: Vec<String> = map
756 .iter()
757 .filter(|(_, s)| s.is_expired(ttl))
758 .map(|(id, _)| id.clone())
759 .collect();
760 for id in &expired {
761 tracing::info!(session_id = %id, "GC: reaping expired session");
762 map.remove(id);
763 }
764 }
765 });
766 }
767}
768
769fn gen_session_id() -> String {
788 use rand::RngExt;
789 use std::time::{SystemTime, UNIX_EPOCH};
790 let ts = SystemTime::now()
791 .duration_since(UNIX_EPOCH)
792 .unwrap_or_default()
793 .as_nanos();
794 let random: u64 = rand::rng().random();
795 format!("s-{ts:x}-{random:016x}")
796}
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801 use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
802 use serde_json::json;
803
804 fn make_query(index: usize) -> LlmQuery {
805 LlmQuery {
806 id: QueryId::batch(index),
807 prompt: format!("prompt-{index}"),
808 system: None,
809 max_tokens: 100,
810 grounded: false,
811 underspecified: false,
812 }
813 }
814
815 #[test]
818 fn to_json_accepted() {
819 let result = FeedResult::Accepted { remaining: 3 };
820 let json = result.to_json("s-123");
821 assert_eq!(json["status"], "accepted");
822 assert_eq!(json["remaining"], 3);
823 }
824
825 #[test]
826 fn to_json_paused_single_query() {
827 let query = LlmQuery {
828 id: QueryId::single(),
829 prompt: "What is 2+2?".into(),
830 system: Some("You are a calculator.".into()),
831 max_tokens: 50,
832 grounded: false,
833 underspecified: false,
834 };
835 let result = FeedResult::Paused {
836 queries: vec![query],
837 };
838 let json = result.to_json("s-abc");
839
840 assert_eq!(json["status"], "needs_response");
841 assert_eq!(json["session_id"], "s-abc");
842 assert_eq!(json["prompt"], "What is 2+2?");
843 assert_eq!(json["system"], "You are a calculator.");
844 assert_eq!(json["max_tokens"], 50);
845 assert!(json.get("queries").is_none());
847 assert!(
849 json.get("grounded").is_none(),
850 "grounded key must be absent when false"
851 );
852 assert!(
854 json.get("underspecified").is_none(),
855 "underspecified key must be absent when false"
856 );
857 }
858
859 #[test]
860 fn to_json_paused_single_query_grounded() {
861 let query = LlmQuery {
862 id: QueryId::single(),
863 prompt: "verify this claim".into(),
864 system: None,
865 max_tokens: 200,
866 grounded: true,
867 underspecified: false,
868 };
869 let result = FeedResult::Paused {
870 queries: vec![query],
871 };
872 let json = result.to_json("s-grounded");
873
874 assert_eq!(json["status"], "needs_response");
875 assert_eq!(
876 json["grounded"], true,
877 "grounded must appear in single-query MCP JSON"
878 );
879 }
880
881 #[test]
882 fn to_json_paused_single_query_underspecified() {
883 let query = LlmQuery {
884 id: QueryId::single(),
885 prompt: "what output format do you need?".into(),
886 system: None,
887 max_tokens: 200,
888 grounded: false,
889 underspecified: true,
890 };
891 let result = FeedResult::Paused {
892 queries: vec![query],
893 };
894 let json = result.to_json("s-underspec");
895
896 assert_eq!(json["status"], "needs_response");
897 assert_eq!(
898 json["underspecified"], true,
899 "underspecified must appear in single-query MCP JSON"
900 );
901 assert!(
902 json.get("grounded").is_none(),
903 "grounded must be absent when false"
904 );
905 }
906
907 #[test]
908 fn to_json_paused_multiple_queries_mixed_grounded() {
909 let grounded_query = LlmQuery {
910 id: QueryId::batch(0),
911 prompt: "verify".into(),
912 system: None,
913 max_tokens: 100,
914 grounded: true,
915 underspecified: false,
916 };
917 let normal_query = LlmQuery {
918 id: QueryId::batch(1),
919 prompt: "generate".into(),
920 system: None,
921 max_tokens: 100,
922 grounded: false,
923 underspecified: false,
924 };
925 let result = FeedResult::Paused {
926 queries: vec![grounded_query, normal_query],
927 };
928 let json = result.to_json("s-batch");
929
930 let qs = json["queries"].as_array().expect("queries should be array");
931 assert_eq!(
932 qs[0]["grounded"], true,
933 "grounded query must have grounded=true"
934 );
935 assert!(
936 qs[1].get("grounded").is_none(),
937 "non-grounded query must omit grounded key"
938 );
939 }
940
941 #[test]
942 fn to_json_paused_multiple_queries_mixed_underspecified() {
943 let underspec_query = LlmQuery {
944 id: QueryId::batch(0),
945 prompt: "clarify intent".into(),
946 system: None,
947 max_tokens: 100,
948 grounded: false,
949 underspecified: true,
950 };
951 let normal_query = LlmQuery {
952 id: QueryId::batch(1),
953 prompt: "generate".into(),
954 system: None,
955 max_tokens: 100,
956 grounded: false,
957 underspecified: false,
958 };
959 let result = FeedResult::Paused {
960 queries: vec![underspec_query, normal_query],
961 };
962 let json = result.to_json("s-batch-us");
963
964 let qs = json["queries"].as_array().expect("queries should be array");
965 assert_eq!(
966 qs[0]["underspecified"], true,
967 "underspecified query must have underspecified=true"
968 );
969 assert!(
970 qs[1].get("underspecified").is_none(),
971 "non-underspecified query must omit underspecified key"
972 );
973 }
974
975 #[test]
976 fn to_json_paused_single_query_no_system() {
977 let query = LlmQuery {
978 id: QueryId::single(),
979 prompt: "hello".into(),
980 system: None,
981 max_tokens: 1024,
982 grounded: false,
983 underspecified: false,
984 };
985 let result = FeedResult::Paused {
986 queries: vec![query],
987 };
988 let json = result.to_json("s-x");
989
990 assert_eq!(json["status"], "needs_response");
991 assert!(json["system"].is_null());
992 }
993
994 #[test]
995 fn to_json_paused_multiple_queries() {
996 let queries = vec![make_query(0), make_query(1), make_query(2)];
997 let result = FeedResult::Paused { queries };
998 let json = result.to_json("s-multi");
999
1000 assert_eq!(json["status"], "needs_response");
1001 assert_eq!(json["session_id"], "s-multi");
1002
1003 let qs = json["queries"].as_array().expect("queries should be array");
1004 assert_eq!(qs.len(), 3);
1005 assert_eq!(qs[0]["id"], "q-0");
1006 assert_eq!(qs[0]["prompt"], "prompt-0");
1007 assert_eq!(qs[1]["id"], "q-1");
1008 assert_eq!(qs[2]["id"], "q-2");
1009 }
1010
1011 #[test]
1012 fn to_json_finished_completed() {
1013 let result = FeedResult::Finished(ExecutionResult {
1014 state: TerminalState::Completed {
1015 result: json!({"answer": 42}),
1016 },
1017 metrics: ExecutionMetrics::new(),
1018 });
1019 let json = result.to_json("s-done");
1020
1021 assert_eq!(json["status"], "completed");
1022 assert_eq!(json["result"]["answer"], 42);
1023 assert!(json.get("stats").is_some());
1024 }
1025
1026 #[test]
1027 fn to_json_finished_failed() {
1028 let result = FeedResult::Finished(ExecutionResult {
1029 state: TerminalState::Failed {
1030 error: "lua error: bad argument".into(),
1031 },
1032 metrics: ExecutionMetrics::new(),
1033 });
1034 let json = result.to_json("s-err");
1035
1036 assert_eq!(json["status"], "error");
1037 assert_eq!(json["error"], "lua error: bad argument");
1038 }
1039
1040 #[test]
1041 fn to_json_finished_cancelled() {
1042 let result = FeedResult::Finished(ExecutionResult {
1043 state: TerminalState::Cancelled,
1044 metrics: ExecutionMetrics::new(),
1045 });
1046 let json = result.to_json("s-cancel");
1047
1048 assert_eq!(json["status"], "cancelled");
1049 assert!(json.get("stats").is_some());
1050 }
1051
1052 #[test]
1055 fn session_id_starts_with_prefix() {
1056 let id = gen_session_id();
1057 assert!(id.starts_with("s-"), "id should start with 's-': {id}");
1058 }
1059
1060 #[test]
1061 fn session_id_uniqueness() {
1062 let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
1063 let set: std::collections::HashSet<&String> = ids.iter().collect();
1064 assert_eq!(set.len(), 10, "10 IDs should all be unique");
1065 }
1066
1067 #[test]
1074 fn is_expired_impl_fresh_instant_not_expired() {
1075 let now = std::time::Instant::now();
1077 assert!(!is_expired_impl(now, Duration::from_secs(1)));
1078 }
1079
1080 #[test]
1081 fn is_expired_impl_old_instant_expired() {
1082 let two_hours_ago = std::time::Instant::now()
1084 .checked_sub(Duration::from_secs(7200))
1085 .expect("checked_sub should succeed with sane duration");
1086 assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
1088 }
1089
1090 #[test]
1091 fn is_expired_impl_not_yet_expired() {
1092 let one_hour_ago = std::time::Instant::now()
1094 .checked_sub(Duration::from_secs(3600))
1095 .expect("checked_sub should succeed with sane duration");
1096 assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
1098 }
1099
1100 #[test]
1101 fn is_expired_impl_zero_ttl_always_expired() {
1102 let now = std::time::Instant::now();
1104 assert!(is_expired_impl(now, Duration::ZERO));
1105 }
1106
1107 #[test]
1110 fn pending_filter_default_is_all_off() {
1111 let f = PendingFilter::default();
1112 assert!(!f.query_id);
1113 assert!(!f.max_tokens);
1114 assert!(!f.system);
1115 assert!(!f.grounded);
1116 assert!(!f.underspecified);
1117 assert!(matches!(f.prompt, PromptProjection::Off));
1118 }
1119
1120 #[test]
1121 fn pending_filter_preset_meta_flags() {
1122 let f = PendingFilter::preset_meta();
1123 assert!(f.query_id);
1124 assert!(f.max_tokens);
1125 assert!(!f.system);
1126 assert!(!f.grounded);
1127 assert!(!f.underspecified);
1128 assert!(
1129 matches!(f.prompt, PromptProjection::Off),
1130 "meta preset must not project prompt content"
1131 );
1132 }
1133
1134 #[test]
1135 fn pending_filter_preset_preview_uses_default_chars() {
1136 let f = PendingFilter::preset_preview();
1137 assert!(f.query_id);
1138 assert!(f.max_tokens);
1139 match f.prompt {
1140 PromptProjection::Preview { chars } => {
1141 assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1142 }
1143 other => panic!("expected Preview, got {other:?}"),
1144 }
1145 }
1146
1147 #[test]
1148 fn pending_filter_preset_preview_with_custom_chars() {
1149 let f = PendingFilter::preset_preview_with(42);
1150 match f.prompt {
1151 PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1152 other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1153 }
1154 }
1155
1156 #[test]
1157 fn pending_filter_preset_full_flags_all_on() {
1158 let f = PendingFilter::preset_full();
1159 assert!(f.query_id);
1160 assert!(f.max_tokens);
1161 assert!(f.system);
1162 assert!(f.grounded);
1163 assert!(f.underspecified);
1164 assert!(matches!(f.prompt, PromptProjection::Full));
1165 }
1166
1167 #[test]
1168 fn pending_filter_from_preset_known_names() {
1169 assert!(PendingFilter::from_preset("meta").is_some());
1170 assert!(PendingFilter::from_preset("preview").is_some());
1171 assert!(PendingFilter::from_preset("full").is_some());
1172 }
1173
1174 #[test]
1175 fn pending_filter_from_preset_unknown_returns_none() {
1176 assert!(PendingFilter::from_preset("").is_none());
1179 assert!(PendingFilter::from_preset("META").is_none());
1180 assert!(PendingFilter::from_preset("bogus").is_none());
1181 }
1182
1183 #[test]
1184 fn pending_filter_from_preset_with_overrides_preview_chars() {
1185 let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1188 match f.prompt {
1189 PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1190 other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1191 }
1192
1193 let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1194 assert!(matches!(f_meta.prompt, PromptProjection::Off));
1195
1196 let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1197 assert!(matches!(f_full.prompt, PromptProjection::Full));
1198 }
1199
1200 #[test]
1203 fn project_query_default_filter_produces_empty_object() {
1204 let q = make_query(0);
1205 let v = project_query(&q, &PendingFilter::default());
1206 let obj = v.as_object().expect("object");
1207 assert!(obj.is_empty(), "default filter should project nothing");
1208 }
1209
1210 #[test]
1211 fn project_query_meta_preset_has_id_and_max_tokens_only() {
1212 let q = make_query(0);
1213 let v = project_query(&q, &PendingFilter::preset_meta());
1214 let obj = v.as_object().expect("object");
1215 assert_eq!(obj.len(), 2);
1216 assert_eq!(v["query_id"], "q-0");
1217 assert_eq!(v["max_tokens"], 100);
1218 assert!(obj.get("prompt").is_none());
1219 assert!(obj.get("prompt_preview").is_none());
1220 assert!(obj.get("system").is_none());
1221 assert!(obj.get("grounded").is_none());
1222 assert!(obj.get("underspecified").is_none());
1223 }
1224
1225 #[test]
1226 fn project_query_full_preset_has_all_fields() {
1227 let q = LlmQuery {
1228 id: QueryId::batch(0),
1229 prompt: "hi".into(),
1230 system: Some("sys".into()),
1231 max_tokens: 100,
1232 grounded: true,
1233 underspecified: true,
1234 };
1235 let v = project_query(&q, &PendingFilter::preset_full());
1236 assert_eq!(v["query_id"], "q-0");
1237 assert_eq!(v["max_tokens"], 100);
1238 assert_eq!(v["system"], "sys");
1239 assert_eq!(v["grounded"], true);
1240 assert_eq!(v["underspecified"], true);
1241 assert_eq!(v["prompt"], "hi");
1242 assert!(v.get("prompt_preview").is_none());
1243 }
1244
1245 #[test]
1246 fn project_query_preview_truncates_at_char_count() {
1247 let q = LlmQuery {
1248 id: QueryId::batch(0),
1249 prompt: "abcdefghij".into(),
1250 system: None,
1251 max_tokens: 10,
1252 grounded: false,
1253 underspecified: false,
1254 };
1255 let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1256 assert_eq!(v["prompt_preview"], "abcde");
1257 assert!(v.get("prompt").is_none());
1258 }
1259
1260 #[test]
1261 fn project_query_preview_utf8_multibyte_safe() {
1262 let prompt = "あいうえお";
1266 let q = LlmQuery {
1267 id: QueryId::batch(0),
1268 prompt: prompt.to_string(),
1269 system: None,
1270 max_tokens: 10,
1271 grounded: false,
1272 underspecified: false,
1273 };
1274 let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1275 let preview = v["prompt_preview"].as_str().expect("str");
1276 assert_eq!(preview, "あいう");
1277 assert_eq!(preview.chars().count(), 3);
1278 }
1279
1280 #[test]
1281 fn project_query_preview_chars_over_length_returns_whole_prompt() {
1282 let q = LlmQuery {
1283 id: QueryId::batch(0),
1284 prompt: "abc".into(),
1285 system: None,
1286 max_tokens: 10,
1287 grounded: false,
1288 underspecified: false,
1289 };
1290 let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1291 assert_eq!(v["prompt_preview"], "abc");
1292 }
1293
1294 #[test]
1295 fn project_query_system_field_null_when_absent() {
1296 let q = LlmQuery {
1297 id: QueryId::batch(0),
1298 prompt: "p".into(),
1299 system: None,
1300 max_tokens: 10,
1301 grounded: false,
1302 underspecified: false,
1303 };
1304 let filter = PendingFilter {
1305 system: true,
1306 ..Default::default()
1307 };
1308 let v = project_query(&q, &filter);
1309 assert!(
1310 v["system"].is_null(),
1311 "absent system must serialize as null"
1312 );
1313 }
1314
1315 #[test]
1318 fn pending_filter_deserialize_custom_object_preview() {
1319 let raw = serde_json::json!({
1321 "query_id": true,
1322 "prompt": { "mode": "preview", "chars": 50 }
1323 });
1324 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1325 assert!(f.query_id);
1326 match f.prompt {
1327 PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1328 other => panic!("expected Preview, got {other:?}"),
1329 }
1330 }
1331
1332 #[test]
1333 fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1334 let raw = serde_json::json!({});
1337 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1338 assert!(!f.query_id);
1339 assert!(matches!(f.prompt, PromptProjection::Off));
1340 }
1341
1342 #[test]
1343 fn pending_filter_deserialize_prompt_full_tag() {
1344 let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1345 let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1346 assert!(matches!(f.prompt, PromptProjection::Full));
1347 }
1348
1349 fn tmp_dirs() -> (
1357 std::sync::Arc<crate::state::JsonFileStore>,
1358 std::sync::Arc<crate::card::FileCardStore>,
1359 std::path::PathBuf,
1360 ) {
1361 let tmp = tempfile::tempdir().expect("test tempdir");
1362 let root = tmp.path().to_path_buf();
1363 std::mem::forget(tmp);
1364 (
1365 std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
1366 std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
1367 root.join("scenarios"),
1368 )
1369 }
1370
1371 #[tokio::test]
1378 async fn snapshot_v2_contains_phase_and_timestamps() {
1379 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1380 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1381
1382 let code = r#"
1384 local response = alc.llm("what is 2+2?")
1385 return response
1386 "#
1387 .to_string();
1388
1389 let session = executor
1390 .start_session(
1391 code,
1392 serde_json::json!({}),
1393 vec![],
1394 vec![],
1395 state_store,
1396 card_store,
1397 scenarios_dir,
1398 )
1399 .await
1400 .unwrap();
1401
1402 let snap = session.snapshot(None, false);
1405
1406 assert!(
1408 snap.get("phase").is_some(),
1409 "snapshot must have 'phase' field"
1410 );
1411 assert_eq!(snap["phase"], "running", "initial state must be running");
1412
1413 assert_eq!(snap["state"], "running");
1415
1416 let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
1418 assert!(started_at > 0, "started_at must be > 0 (unix ms)");
1419
1420 let last_activity = snap["last_activity_at"]
1422 .as_i64()
1423 .expect("last_activity_at must be i64");
1424 assert_eq!(
1425 started_at, last_activity,
1426 "last_activity_at should equal started_at before any feed"
1427 );
1428 }
1429
1430 #[test]
1434 fn snapshot_phase_running_state_label() {
1435 let cases: &[(&str, &str)] = &[
1439 ("running", "running"),
1440 ("paused", "paused"),
1441 ("completed", "completed"),
1442 ("failed", "failed"),
1443 ("cancelled", "cancelled"),
1444 ];
1445 for (state_str, expected_phase) in cases {
1446 let three_value_state = match *state_str {
1450 "running" => "running",
1451 "paused" => "paused",
1452 _ => "terminal",
1453 };
1454 assert_eq!(
1456 *expected_phase, *state_str,
1457 "phase for {state_str} must be the same string"
1458 );
1459 if *state_str != "running" && *state_str != "paused" {
1460 assert_eq!(
1461 three_value_state, "terminal",
1462 "{state_str} must map to 'terminal' in 3-value state"
1463 );
1464 }
1465 }
1466 }
1467
1468 #[tokio::test]
1470 async fn snapshot_conversation_history_opt_in() {
1471 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1472 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1473
1474 let code = r#"
1475 local response = alc.llm("explain recursion")
1476 return response
1477 "#
1478 .to_string();
1479
1480 let session = executor
1481 .start_session(
1482 code,
1483 serde_json::json!({}),
1484 vec![],
1485 vec![],
1486 state_store,
1487 card_store,
1488 scenarios_dir,
1489 )
1490 .await
1491 .unwrap();
1492
1493 let snap_false = session.snapshot(None, false);
1495 assert!(
1496 snap_false
1497 .get("metrics")
1498 .and_then(|m| m.get("conversation_history"))
1499 .is_none(),
1500 "conversation_history must be absent with include_history=false"
1501 );
1502
1503 let snap_true = session.snapshot(None, true);
1505 if let Some(metrics) = snap_true.get("metrics") {
1508 let _ = metrics.get("conversation_history");
1512 }
1513 }
1514
1515 #[tokio::test]
1517 async fn snapshot_last_activity_at_starts_equal_to_started_at() {
1518 let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1519 let (state_store, card_store, scenarios_dir) = tmp_dirs();
1520
1521 let code = r#"
1522 local response = alc.llm("test query")
1523 return response
1524 "#
1525 .to_string();
1526
1527 let session = executor
1528 .start_session(
1529 code,
1530 serde_json::json!({}),
1531 vec![],
1532 vec![],
1533 state_store,
1534 card_store,
1535 scenarios_dir,
1536 )
1537 .await
1538 .unwrap();
1539
1540 let snap = session.snapshot(None, false);
1541 let started_at = snap["started_at"].as_i64().unwrap_or(-1);
1542 let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
1543
1544 assert_eq!(
1545 started_at, last_activity,
1546 "last_activity_at must equal started_at before any feed_one"
1547 );
1548 assert!(started_at > 0, "started_at must be positive unix ms");
1549 }
1550}