use std::collections::HashMap;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use algocline_core::{
ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
TerminalState,
};
use mlua_isle::{AsyncIsleDriver, AsyncTask};
use serde_json::json;
use tokio::sync::Mutex;
use crate::llm_bridge::LlmRequest;
#[derive(Debug, thiserror::Error)]
pub enum SessionError {
#[error("session '{0}' not found")]
NotFound(String),
#[error(transparent)]
Feed(#[from] algocline_core::FeedError),
#[error("invalid transition: {0}")]
InvalidTransition(String),
}
#[derive(serde::Serialize)]
pub struct ExecutionResult {
pub state: TerminalState,
pub metrics: ExecutionMetrics,
}
#[derive(serde::Serialize)]
pub enum FeedResult {
Accepted { remaining: usize },
Paused { queries: Vec<LlmQuery> },
Finished(ExecutionResult),
}
impl FeedResult {
pub fn to_json(&self, session_id: &str) -> serde_json::Value {
match self {
Self::Accepted { remaining } => json!({
"status": "accepted",
"remaining": remaining,
}),
Self::Paused { queries } => {
if queries.len() == 1 {
let q = &queries[0];
let mut obj = json!({
"status": "needs_response",
"session_id": session_id,
"query_id": q.id.as_str(),
"prompt": q.prompt,
"system": q.system,
"max_tokens": q.max_tokens,
});
if q.grounded {
obj["grounded"] = json!(true);
}
if q.underspecified {
obj["underspecified"] = json!(true);
}
obj
} else {
let qs: Vec<_> = queries
.iter()
.map(|q| {
let mut obj = json!({
"id": q.id.as_str(),
"prompt": q.prompt,
"system": q.system,
"max_tokens": q.max_tokens,
});
if q.grounded {
obj["grounded"] = json!(true);
}
if q.underspecified {
obj["underspecified"] = json!(true);
}
obj
})
.collect();
json!({
"status": "needs_response",
"session_id": session_id,
"queries": qs,
})
}
}
Self::Finished(result) => match &result.state {
TerminalState::Completed { result: val } => json!({
"status": "completed",
"result": val,
"stats": result.metrics.to_json(),
}),
TerminalState::Failed { error } => json!({
"status": "error",
"error": error,
}),
TerminalState::Cancelled => json!({
"status": "cancelled",
"stats": result.metrics.to_json(),
}),
},
}
}
}
pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct PendingFilter {
#[serde(default)]
pub query_id: bool,
#[serde(default)]
pub max_tokens: bool,
#[serde(default)]
pub system: bool,
#[serde(default)]
pub grounded: bool,
#[serde(default)]
pub underspecified: bool,
#[serde(default)]
pub prompt: PromptProjection,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum PromptProjection {
#[default]
Off,
Preview {
chars: usize,
},
Full,
}
impl PendingFilter {
pub fn preset_meta() -> Self {
Self {
query_id: true,
max_tokens: true,
..Self::default()
}
}
pub fn preset_preview() -> Self {
Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
}
pub fn preset_preview_with(chars: usize) -> Self {
Self {
query_id: true,
max_tokens: true,
prompt: PromptProjection::Preview { chars },
..Self::default()
}
}
pub fn preset_full() -> Self {
Self {
query_id: true,
max_tokens: true,
system: true,
grounded: true,
underspecified: true,
prompt: PromptProjection::Full,
}
}
pub fn from_preset(name: &str) -> Option<Self> {
match name {
"meta" => Some(Self::preset_meta()),
"preview" => Some(Self::preset_preview()),
"full" => Some(Self::preset_full()),
_ => None,
}
}
pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
match name {
"meta" => Some(Self::preset_meta()),
"preview" => Some(Self::preset_preview_with(preview_chars)),
"full" => Some(Self::preset_full()),
_ => None,
}
}
}
fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
let mut obj = serde_json::Map::new();
if f.query_id {
obj.insert("query_id".into(), q.id.as_str().into());
}
if f.max_tokens {
obj.insert("max_tokens".into(), q.max_tokens.into());
}
if f.system {
obj.insert(
"system".into(),
match &q.system {
Some(s) => serde_json::Value::String(s.clone()),
None => serde_json::Value::Null,
},
);
}
if f.grounded {
obj.insert("grounded".into(), q.grounded.into());
}
if f.underspecified {
obj.insert("underspecified".into(), q.underspecified.into());
}
match &f.prompt {
PromptProjection::Off => {}
PromptProjection::Full => {
obj.insert("prompt".into(), q.prompt.clone().into());
}
PromptProjection::Preview { chars } => {
let preview: String = q.prompt.chars().take(*chars).collect();
obj.insert("prompt_preview".into(), preview.into());
}
}
serde_json::Value::Object(obj)
}
pub struct Session {
state: ExecutionState,
metrics: ExecutionMetrics,
observer: MetricsObserver,
llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
exec_task: AsyncTask,
resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
_vm_driver: AsyncIsleDriver,
last_active: std::time::Instant,
started_at_ms: i64,
last_activity_ms: Arc<AtomicI64>,
}
impl Session {
pub fn new(
llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
exec_task: AsyncTask,
metrics: ExecutionMetrics,
vm_driver: AsyncIsleDriver,
) -> Self {
let observer = metrics.create_observer();
let started_at_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
Self {
state: ExecutionState::Running,
metrics,
observer,
llm_rx,
exec_task,
resp_txs: HashMap::new(),
_vm_driver: vm_driver,
last_active: std::time::Instant::now(),
started_at_ms,
last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
}
}
async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
tokio::select! {
result = &mut self.exec_task => {
match result {
Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
Ok(v) => {
self.state.complete(v.clone()).map_err(|e| {
SessionError::InvalidTransition(e.to_string())
})?;
self.observer.on_completed(&v);
Ok(FeedResult::Finished(ExecutionResult {
state: TerminalState::Completed { result: v },
metrics: self.take_metrics(),
}))
}
Err(e) => self.fail_with(format!("JSON parse: {e}")),
},
Err(e) => self.fail_with(e.to_string()),
}
}
Some(req) = self.llm_rx.recv() => {
let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
id: qr.id.clone(),
prompt: qr.prompt.clone(),
system: qr.system.clone(),
max_tokens: qr.max_tokens,
grounded: qr.grounded,
underspecified: qr.underspecified,
}).collect();
for qr in req.queries {
self.resp_txs.insert(qr.id, qr.resp_tx);
}
self.state.pause(queries.clone()).map_err(|e| {
SessionError::InvalidTransition(e.to_string())
})?;
self.observer.on_paused(&queries);
Ok(FeedResult::Paused { queries })
}
}
}
fn feed_one(
&mut self,
query_id: &QueryId,
response: String,
usage: Option<&algocline_core::TokenUsage>,
) -> Result<bool, SessionError> {
self.last_active = std::time::Instant::now();
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
self.last_activity_ms.store(now_ms, Ordering::Relaxed);
self.observer.on_response_fed(query_id, &response, usage);
if let Some(tx) = self.resp_txs.remove(query_id) {
let _ = tx.send(Ok(response.clone()));
}
let complete = self
.state
.feed(query_id, response)
.map_err(SessionError::Feed)?;
if complete {
self.state
.take_responses()
.map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
self.observer.on_resumed();
} else {
self.observer
.on_partial_feed(query_id, self.state.remaining());
}
Ok(complete)
}
fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
self.state
.fail(msg.clone())
.map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
self.observer.on_failed(&msg);
Ok(FeedResult::Finished(ExecutionResult {
state: TerminalState::Failed { error: msg },
metrics: self.take_metrics(),
}))
}
fn take_metrics(&mut self) -> ExecutionMetrics {
std::mem::take(&mut self.metrics)
}
pub fn snapshot(
&self,
pending_filter: Option<&PendingFilter>,
include_history: bool,
) -> serde_json::Value {
let state_label = match &self.state {
ExecutionState::Running => "running",
ExecutionState::Paused(_) => "paused",
_ => "terminal",
};
let phase = match &self.state {
ExecutionState::Running => "running",
ExecutionState::Paused(_) => "paused",
ExecutionState::Completed { .. } => "completed",
ExecutionState::Failed { .. } => "failed",
ExecutionState::Cancelled => "cancelled",
};
let mut json = serde_json::json!({
"state": state_label,
"phase": phase,
"started_at": self.started_at_ms,
"last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
});
let metrics = self.metrics.snapshot(include_history);
if !metrics.is_null() {
json["metrics"] = metrics;
}
if let ExecutionState::Paused(pending) = &self.state {
json["pending_queries"] = pending.remaining().into();
if let Some(filter) = pending_filter {
let items: Vec<serde_json::Value> = pending
.pending_queries()
.iter()
.map(|q| project_query(q, filter))
.collect();
json["pending"] = serde_json::Value::Array(items);
}
}
json
}
pub fn is_expired(&self, ttl: Duration) -> bool {
is_expired_impl(self.last_active, ttl)
}
pub(crate) fn into_driver_parts(
self,
) -> (
AsyncTask,
tokio::sync::mpsc::Receiver<LlmRequest>,
AsyncIsleDriver,
) {
(self.exec_task, self.llm_rx, self._vm_driver)
}
}
fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
std::time::Instant::now().saturating_duration_since(last_active) >= ttl
}
pub struct SessionRegistry {
sessions: Arc<Mutex<HashMap<String, Session>>>,
}
impl Default for SessionRegistry {
fn default() -> Self {
Self::new()
}
}
impl SessionRegistry {
pub fn new() -> Self {
Self {
sessions: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn start_execution(
&self,
mut session: Session,
) -> Result<(String, FeedResult), SessionError> {
let session_id = gen_session_id();
let result = session.wait_event().await?;
if matches!(result, FeedResult::Paused { .. }) {
self.sessions
.lock()
.await
.insert(session_id.clone(), session);
}
Ok((session_id, result))
}
pub async fn feed_response(
&self,
session_id: &str,
query_id: &QueryId,
response: String,
usage: Option<&algocline_core::TokenUsage>,
) -> Result<FeedResult, SessionError> {
let complete = {
let mut map = self.sessions.lock().await;
let session = map
.get_mut(session_id)
.ok_or_else(|| SessionError::NotFound(session_id.into()))?;
let complete = session.feed_one(query_id, response, usage)?;
if !complete {
return Ok(FeedResult::Accepted {
remaining: session.state.remaining(),
});
}
complete
};
debug_assert!(complete);
let mut session = {
let mut map = self.sessions.lock().await;
map.remove(session_id)
.ok_or_else(|| SessionError::NotFound(session_id.into()))?
};
let result = session.wait_event().await?;
if matches!(result, FeedResult::Paused { .. }) {
self.sessions
.lock()
.await
.insert(session_id.into(), session);
}
Ok(result)
}
pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
let map = self.sessions.lock().await;
let session = map
.get(session_id)
.ok_or_else(|| SessionError::NotFound(session_id.into()))?;
let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
match keys.len() {
0 => Err(SessionError::InvalidTransition("no pending queries".into())),
1 => keys
.into_iter()
.next()
.ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
n => Err(SessionError::InvalidTransition(format!(
"{n} queries pending; specify query_id explicitly"
))),
}
}
pub async fn list_snapshots(
&self,
pending_filter: Option<&PendingFilter>,
include_history: bool,
) -> HashMap<String, serde_json::Value> {
let map = self.sessions.lock().await;
map.iter()
.map(|(id, session)| {
(
id.clone(),
session.snapshot(pending_filter, include_history),
)
})
.collect()
}
pub fn spawn_gc_task(&self, ttl: Duration) {
let sessions = Arc::clone(&self.sessions);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let mut map = sessions.lock().await;
let expired: Vec<String> = map
.iter()
.filter(|(_, s)| s.is_expired(ttl))
.map(|(id, _)| id.clone())
.collect();
for id in &expired {
tracing::info!(session_id = %id, "GC: reaping expired session");
map.remove(id);
}
}
});
}
}
fn gen_session_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let random: u64 = {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let s = RandomState::new();
let mut h = s.build_hasher();
h.write_u128(ts);
h.finish()
};
format!("s-{ts:x}-{random:016x}")
}
#[cfg(test)]
mod tests {
use super::*;
use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
use serde_json::json;
fn make_query(index: usize) -> LlmQuery {
LlmQuery {
id: QueryId::batch(index),
prompt: format!("prompt-{index}"),
system: None,
max_tokens: 100,
grounded: false,
underspecified: false,
}
}
#[test]
fn to_json_accepted() {
let result = FeedResult::Accepted { remaining: 3 };
let json = result.to_json("s-123");
assert_eq!(json["status"], "accepted");
assert_eq!(json["remaining"], 3);
}
#[test]
fn to_json_paused_single_query() {
let query = LlmQuery {
id: QueryId::single(),
prompt: "What is 2+2?".into(),
system: Some("You are a calculator.".into()),
max_tokens: 50,
grounded: false,
underspecified: false,
};
let result = FeedResult::Paused {
queries: vec![query],
};
let json = result.to_json("s-abc");
assert_eq!(json["status"], "needs_response");
assert_eq!(json["session_id"], "s-abc");
assert_eq!(json["prompt"], "What is 2+2?");
assert_eq!(json["system"], "You are a calculator.");
assert_eq!(json["max_tokens"], 50);
assert!(json.get("queries").is_none());
assert!(
json.get("grounded").is_none(),
"grounded key must be absent when false"
);
assert!(
json.get("underspecified").is_none(),
"underspecified key must be absent when false"
);
}
#[test]
fn to_json_paused_single_query_grounded() {
let query = LlmQuery {
id: QueryId::single(),
prompt: "verify this claim".into(),
system: None,
max_tokens: 200,
grounded: true,
underspecified: false,
};
let result = FeedResult::Paused {
queries: vec![query],
};
let json = result.to_json("s-grounded");
assert_eq!(json["status"], "needs_response");
assert_eq!(
json["grounded"], true,
"grounded must appear in single-query MCP JSON"
);
}
#[test]
fn to_json_paused_single_query_underspecified() {
let query = LlmQuery {
id: QueryId::single(),
prompt: "what output format do you need?".into(),
system: None,
max_tokens: 200,
grounded: false,
underspecified: true,
};
let result = FeedResult::Paused {
queries: vec![query],
};
let json = result.to_json("s-underspec");
assert_eq!(json["status"], "needs_response");
assert_eq!(
json["underspecified"], true,
"underspecified must appear in single-query MCP JSON"
);
assert!(
json.get("grounded").is_none(),
"grounded must be absent when false"
);
}
#[test]
fn to_json_paused_multiple_queries_mixed_grounded() {
let grounded_query = LlmQuery {
id: QueryId::batch(0),
prompt: "verify".into(),
system: None,
max_tokens: 100,
grounded: true,
underspecified: false,
};
let normal_query = LlmQuery {
id: QueryId::batch(1),
prompt: "generate".into(),
system: None,
max_tokens: 100,
grounded: false,
underspecified: false,
};
let result = FeedResult::Paused {
queries: vec![grounded_query, normal_query],
};
let json = result.to_json("s-batch");
let qs = json["queries"].as_array().expect("queries should be array");
assert_eq!(
qs[0]["grounded"], true,
"grounded query must have grounded=true"
);
assert!(
qs[1].get("grounded").is_none(),
"non-grounded query must omit grounded key"
);
}
#[test]
fn to_json_paused_multiple_queries_mixed_underspecified() {
let underspec_query = LlmQuery {
id: QueryId::batch(0),
prompt: "clarify intent".into(),
system: None,
max_tokens: 100,
grounded: false,
underspecified: true,
};
let normal_query = LlmQuery {
id: QueryId::batch(1),
prompt: "generate".into(),
system: None,
max_tokens: 100,
grounded: false,
underspecified: false,
};
let result = FeedResult::Paused {
queries: vec![underspec_query, normal_query],
};
let json = result.to_json("s-batch-us");
let qs = json["queries"].as_array().expect("queries should be array");
assert_eq!(
qs[0]["underspecified"], true,
"underspecified query must have underspecified=true"
);
assert!(
qs[1].get("underspecified").is_none(),
"non-underspecified query must omit underspecified key"
);
}
#[test]
fn to_json_paused_single_query_no_system() {
let query = LlmQuery {
id: QueryId::single(),
prompt: "hello".into(),
system: None,
max_tokens: 1024,
grounded: false,
underspecified: false,
};
let result = FeedResult::Paused {
queries: vec![query],
};
let json = result.to_json("s-x");
assert_eq!(json["status"], "needs_response");
assert!(json["system"].is_null());
}
#[test]
fn to_json_paused_multiple_queries() {
let queries = vec![make_query(0), make_query(1), make_query(2)];
let result = FeedResult::Paused { queries };
let json = result.to_json("s-multi");
assert_eq!(json["status"], "needs_response");
assert_eq!(json["session_id"], "s-multi");
let qs = json["queries"].as_array().expect("queries should be array");
assert_eq!(qs.len(), 3);
assert_eq!(qs[0]["id"], "q-0");
assert_eq!(qs[0]["prompt"], "prompt-0");
assert_eq!(qs[1]["id"], "q-1");
assert_eq!(qs[2]["id"], "q-2");
}
#[test]
fn to_json_finished_completed() {
let result = FeedResult::Finished(ExecutionResult {
state: TerminalState::Completed {
result: json!({"answer": 42}),
},
metrics: ExecutionMetrics::new(),
});
let json = result.to_json("s-done");
assert_eq!(json["status"], "completed");
assert_eq!(json["result"]["answer"], 42);
assert!(json.get("stats").is_some());
}
#[test]
fn to_json_finished_failed() {
let result = FeedResult::Finished(ExecutionResult {
state: TerminalState::Failed {
error: "lua error: bad argument".into(),
},
metrics: ExecutionMetrics::new(),
});
let json = result.to_json("s-err");
assert_eq!(json["status"], "error");
assert_eq!(json["error"], "lua error: bad argument");
}
#[test]
fn to_json_finished_cancelled() {
let result = FeedResult::Finished(ExecutionResult {
state: TerminalState::Cancelled,
metrics: ExecutionMetrics::new(),
});
let json = result.to_json("s-cancel");
assert_eq!(json["status"], "cancelled");
assert!(json.get("stats").is_some());
}
#[test]
fn session_id_starts_with_prefix() {
let id = gen_session_id();
assert!(id.starts_with("s-"), "id should start with 's-': {id}");
}
#[test]
fn session_id_uniqueness() {
let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
let set: std::collections::HashSet<&String> = ids.iter().collect();
assert_eq!(set.len(), 10, "10 IDs should all be unique");
}
#[test]
fn is_expired_impl_fresh_instant_not_expired() {
let now = std::time::Instant::now();
assert!(!is_expired_impl(now, Duration::from_secs(1)));
}
#[test]
fn is_expired_impl_old_instant_expired() {
let two_hours_ago = std::time::Instant::now()
.checked_sub(Duration::from_secs(7200))
.expect("checked_sub should succeed with sane duration");
assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
}
#[test]
fn is_expired_impl_not_yet_expired() {
let one_hour_ago = std::time::Instant::now()
.checked_sub(Duration::from_secs(3600))
.expect("checked_sub should succeed with sane duration");
assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
}
#[test]
fn is_expired_impl_zero_ttl_always_expired() {
let now = std::time::Instant::now();
assert!(is_expired_impl(now, Duration::ZERO));
}
#[test]
fn pending_filter_default_is_all_off() {
let f = PendingFilter::default();
assert!(!f.query_id);
assert!(!f.max_tokens);
assert!(!f.system);
assert!(!f.grounded);
assert!(!f.underspecified);
assert!(matches!(f.prompt, PromptProjection::Off));
}
#[test]
fn pending_filter_preset_meta_flags() {
let f = PendingFilter::preset_meta();
assert!(f.query_id);
assert!(f.max_tokens);
assert!(!f.system);
assert!(!f.grounded);
assert!(!f.underspecified);
assert!(
matches!(f.prompt, PromptProjection::Off),
"meta preset must not project prompt content"
);
}
#[test]
fn pending_filter_preset_preview_uses_default_chars() {
let f = PendingFilter::preset_preview();
assert!(f.query_id);
assert!(f.max_tokens);
match f.prompt {
PromptProjection::Preview { chars } => {
assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
}
other => panic!("expected Preview, got {other:?}"),
}
}
#[test]
fn pending_filter_preset_preview_with_custom_chars() {
let f = PendingFilter::preset_preview_with(42);
match f.prompt {
PromptProjection::Preview { chars } => assert_eq!(chars, 42),
other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
}
}
#[test]
fn pending_filter_preset_full_flags_all_on() {
let f = PendingFilter::preset_full();
assert!(f.query_id);
assert!(f.max_tokens);
assert!(f.system);
assert!(f.grounded);
assert!(f.underspecified);
assert!(matches!(f.prompt, PromptProjection::Full));
}
#[test]
fn pending_filter_from_preset_known_names() {
assert!(PendingFilter::from_preset("meta").is_some());
assert!(PendingFilter::from_preset("preview").is_some());
assert!(PendingFilter::from_preset("full").is_some());
}
#[test]
fn pending_filter_from_preset_unknown_returns_none() {
assert!(PendingFilter::from_preset("").is_none());
assert!(PendingFilter::from_preset("META").is_none());
assert!(PendingFilter::from_preset("bogus").is_none());
}
#[test]
fn pending_filter_from_preset_with_overrides_preview_chars() {
let f = PendingFilter::from_preset_with("preview", 73).unwrap();
match f.prompt {
PromptProjection::Preview { chars } => assert_eq!(chars, 73),
other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
}
let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
assert!(matches!(f_meta.prompt, PromptProjection::Off));
let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
assert!(matches!(f_full.prompt, PromptProjection::Full));
}
#[test]
fn project_query_default_filter_produces_empty_object() {
let q = make_query(0);
let v = project_query(&q, &PendingFilter::default());
let obj = v.as_object().expect("object");
assert!(obj.is_empty(), "default filter should project nothing");
}
#[test]
fn project_query_meta_preset_has_id_and_max_tokens_only() {
let q = make_query(0);
let v = project_query(&q, &PendingFilter::preset_meta());
let obj = v.as_object().expect("object");
assert_eq!(obj.len(), 2);
assert_eq!(v["query_id"], "q-0");
assert_eq!(v["max_tokens"], 100);
assert!(obj.get("prompt").is_none());
assert!(obj.get("prompt_preview").is_none());
assert!(obj.get("system").is_none());
assert!(obj.get("grounded").is_none());
assert!(obj.get("underspecified").is_none());
}
#[test]
fn project_query_full_preset_has_all_fields() {
let q = LlmQuery {
id: QueryId::batch(0),
prompt: "hi".into(),
system: Some("sys".into()),
max_tokens: 100,
grounded: true,
underspecified: true,
};
let v = project_query(&q, &PendingFilter::preset_full());
assert_eq!(v["query_id"], "q-0");
assert_eq!(v["max_tokens"], 100);
assert_eq!(v["system"], "sys");
assert_eq!(v["grounded"], true);
assert_eq!(v["underspecified"], true);
assert_eq!(v["prompt"], "hi");
assert!(v.get("prompt_preview").is_none());
}
#[test]
fn project_query_preview_truncates_at_char_count() {
let q = LlmQuery {
id: QueryId::batch(0),
prompt: "abcdefghij".into(),
system: None,
max_tokens: 10,
grounded: false,
underspecified: false,
};
let v = project_query(&q, &PendingFilter::preset_preview_with(5));
assert_eq!(v["prompt_preview"], "abcde");
assert!(v.get("prompt").is_none());
}
#[test]
fn project_query_preview_utf8_multibyte_safe() {
let prompt = "あいうえお";
let q = LlmQuery {
id: QueryId::batch(0),
prompt: prompt.to_string(),
system: None,
max_tokens: 10,
grounded: false,
underspecified: false,
};
let v = project_query(&q, &PendingFilter::preset_preview_with(3));
let preview = v["prompt_preview"].as_str().expect("str");
assert_eq!(preview, "あいう");
assert_eq!(preview.chars().count(), 3);
}
#[test]
fn project_query_preview_chars_over_length_returns_whole_prompt() {
let q = LlmQuery {
id: QueryId::batch(0),
prompt: "abc".into(),
system: None,
max_tokens: 10,
grounded: false,
underspecified: false,
};
let v = project_query(&q, &PendingFilter::preset_preview_with(100));
assert_eq!(v["prompt_preview"], "abc");
}
#[test]
fn project_query_system_field_null_when_absent() {
let q = LlmQuery {
id: QueryId::batch(0),
prompt: "p".into(),
system: None,
max_tokens: 10,
grounded: false,
underspecified: false,
};
let filter = PendingFilter {
system: true,
..Default::default()
};
let v = project_query(&q, &filter);
assert!(
v["system"].is_null(),
"absent system must serialize as null"
);
}
#[test]
fn pending_filter_deserialize_custom_object_preview() {
let raw = serde_json::json!({
"query_id": true,
"prompt": { "mode": "preview", "chars": 50 }
});
let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
assert!(f.query_id);
match f.prompt {
PromptProjection::Preview { chars } => assert_eq!(chars, 50),
other => panic!("expected Preview, got {other:?}"),
}
}
#[test]
fn pending_filter_deserialize_partial_object_uses_field_defaults() {
let raw = serde_json::json!({});
let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
assert!(!f.query_id);
assert!(matches!(f.prompt, PromptProjection::Off));
}
#[test]
fn pending_filter_deserialize_prompt_full_tag() {
let raw = serde_json::json!({ "prompt": { "mode": "full" } });
let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
assert!(matches!(f.prompt, PromptProjection::Full));
}
fn tmp_dirs() -> (
std::sync::Arc<crate::state::JsonFileStore>,
std::sync::Arc<crate::card::FileCardStore>,
std::path::PathBuf,
) {
let tmp = tempfile::tempdir().expect("test tempdir");
let root = tmp.path().to_path_buf();
std::mem::forget(tmp);
(
std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
root.join("scenarios"),
)
}
#[tokio::test]
async fn snapshot_v2_contains_phase_and_timestamps() {
let executor = crate::executor::Executor::new(vec![]).await.unwrap();
let (state_store, card_store, scenarios_dir) = tmp_dirs();
let code = r#"
local response = alc.llm("what is 2+2?")
return response
"#
.to_string();
let session = executor
.start_session(
code,
serde_json::json!({}),
vec![],
vec![],
state_store,
card_store,
scenarios_dir,
)
.await
.unwrap();
let snap = session.snapshot(None, false);
assert!(
snap.get("phase").is_some(),
"snapshot must have 'phase' field"
);
assert_eq!(snap["phase"], "running", "initial state must be running");
assert_eq!(snap["state"], "running");
let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
assert!(started_at > 0, "started_at must be > 0 (unix ms)");
let last_activity = snap["last_activity_at"]
.as_i64()
.expect("last_activity_at must be i64");
assert_eq!(
started_at, last_activity,
"last_activity_at should equal started_at before any feed"
);
}
#[test]
fn snapshot_phase_running_state_label() {
let cases: &[(&str, &str)] = &[
("running", "running"),
("paused", "paused"),
("completed", "completed"),
("failed", "failed"),
("cancelled", "cancelled"),
];
for (state_str, expected_phase) in cases {
let three_value_state = match *state_str {
"running" => "running",
"paused" => "paused",
_ => "terminal",
};
assert_eq!(
*expected_phase, *state_str,
"phase for {state_str} must be the same string"
);
if *state_str != "running" && *state_str != "paused" {
assert_eq!(
three_value_state, "terminal",
"{state_str} must map to 'terminal' in 3-value state"
);
}
}
}
#[tokio::test]
async fn snapshot_conversation_history_opt_in() {
let executor = crate::executor::Executor::new(vec![]).await.unwrap();
let (state_store, card_store, scenarios_dir) = tmp_dirs();
let code = r#"
local response = alc.llm("explain recursion")
return response
"#
.to_string();
let session = executor
.start_session(
code,
serde_json::json!({}),
vec![],
vec![],
state_store,
card_store,
scenarios_dir,
)
.await
.unwrap();
let snap_false = session.snapshot(None, false);
assert!(
snap_false
.get("metrics")
.and_then(|m| m.get("conversation_history"))
.is_none(),
"conversation_history must be absent with include_history=false"
);
let snap_true = session.snapshot(None, true);
if let Some(metrics) = snap_true.get("metrics") {
let _ = metrics.get("conversation_history");
}
}
#[tokio::test]
async fn snapshot_last_activity_at_starts_equal_to_started_at() {
let executor = crate::executor::Executor::new(vec![]).await.unwrap();
let (state_store, card_store, scenarios_dir) = tmp_dirs();
let code = r#"
local response = alc.llm("test query")
return response
"#
.to_string();
let session = executor
.start_session(
code,
serde_json::json!({}),
vec![],
vec![],
state_store,
card_store,
scenarios_dir,
)
.await
.unwrap();
let snap = session.snapshot(None, false);
let started_at = snap["started_at"].as_i64().unwrap_or(-1);
let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
assert_eq!(
started_at, last_activity,
"last_activity_at must equal started_at before any feed_one"
);
assert!(started_at > 0, "started_at must be positive unix ms");
}
}