Skip to main content

algocline_engine/
session.rs

1//! Session-based Lua execution with pause/resume on alc.llm() calls.
2//!
3//! Runtime layer: ties Domain (ExecutionState) and Metrics (ExecutionMetrics)
4//! together with channel-based Lua pause/resume machinery.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use algocline_core::{
11    ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
12    TerminalState,
13};
14use mlua_isle::{AsyncIsleDriver, AsyncTask};
15use serde_json::json;
16use tokio::sync::Mutex;
17
18use crate::llm_bridge::LlmRequest;
19
20// ─── Error types (Runtime layer) ─────────────────────────────
21
22#[derive(Debug, thiserror::Error)]
23pub enum SessionError {
24    #[error("session '{0}' not found")]
25    NotFound(String),
26    #[error(transparent)]
27    Feed(#[from] algocline_core::FeedError),
28    #[error("invalid transition: {0}")]
29    InvalidTransition(String),
30}
31
32// ─── Result types (Runtime layer) ────────────────────────────
33
34/// Session completion data: terminal state + metrics.
35#[derive(serde::Serialize)]
36pub struct ExecutionResult {
37    pub state: TerminalState,
38    pub metrics: ExecutionMetrics,
39}
40
41/// Result of a session interaction (start or feed).
42#[derive(serde::Serialize)]
43pub enum FeedResult {
44    /// Partial feed accepted, still waiting for more responses.
45    Accepted { remaining: usize },
46    /// All queries answered, Lua re-paused with new queries.
47    Paused { queries: Vec<LlmQuery> },
48    /// Execution completed (success, failure, or cancellation).
49    Finished(ExecutionResult),
50}
51
52impl FeedResult {
53    /// Convert to JSON for MCP tool response.
54    pub fn to_json(&self, session_id: &str) -> serde_json::Value {
55        match self {
56            Self::Accepted { remaining } => json!({
57                "status": "accepted",
58                "remaining": remaining,
59            }),
60            Self::Paused { queries } => {
61                if queries.len() == 1 {
62                    let q = &queries[0];
63                    let mut obj = json!({
64                        "status": "needs_response",
65                        "session_id": session_id,
66                        "query_id": q.id.as_str(),
67                        "prompt": q.prompt,
68                        "system": q.system,
69                        "max_tokens": q.max_tokens,
70                    });
71                    if q.grounded {
72                        obj["grounded"] = json!(true);
73                    }
74                    if q.underspecified {
75                        obj["underspecified"] = json!(true);
76                    }
77                    obj
78                } else {
79                    let qs: Vec<_> = queries
80                        .iter()
81                        .map(|q| {
82                            let mut obj = json!({
83                                "id": q.id.as_str(),
84                                "prompt": q.prompt,
85                                "system": q.system,
86                                "max_tokens": q.max_tokens,
87                            });
88                            if q.grounded {
89                                obj["grounded"] = json!(true);
90                            }
91                            if q.underspecified {
92                                obj["underspecified"] = json!(true);
93                            }
94                            obj
95                        })
96                        .collect();
97                    json!({
98                        "status": "needs_response",
99                        "session_id": session_id,
100                        "queries": qs,
101                    })
102                }
103            }
104            Self::Finished(result) => match &result.state {
105                TerminalState::Completed { result: val } => json!({
106                    "status": "completed",
107                    "result": val,
108                    "stats": result.metrics.to_json(),
109                }),
110                TerminalState::Failed { error } => json!({
111                    "status": "error",
112                    "error": error,
113                }),
114                TerminalState::Cancelled => json!({
115                    "status": "cancelled",
116                    "stats": result.metrics.to_json(),
117                }),
118            },
119        }
120    }
121}
122
123// ─── PendingFilter (field-level filter for Session::snapshot) ────
124
125/// Default preview length (chars) used when `PendingFilter::preset_preview()`
126/// is constructed without an explicit length. Env var
127/// `ALC_PROMPT_PREVIEW_CHARS` (resolved in `AppConfig`) overrides this.
128pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
129
130/// Per-field filter controlling which `LlmQuery` attributes are projected
131/// into a Snapshot's `pending` array.
132///
133/// Adding a new field to `LlmQuery` only requires adding one matching
134/// `bool` here — the shape stays stable so API surface does not grow
135/// enum variants for every new attribute.
136#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
137pub struct PendingFilter {
138    #[serde(default)]
139    pub query_id: bool,
140    #[serde(default)]
141    pub max_tokens: bool,
142    #[serde(default)]
143    pub system: bool,
144    #[serde(default)]
145    pub grounded: bool,
146    #[serde(default)]
147    pub underspecified: bool,
148    #[serde(default)]
149    pub prompt: PromptProjection,
150}
151
152/// Prompt projection mode — 3 states rather than a bool so that truncation
153/// length can travel inside the filter object.
154///
155/// JSON tag is `mode`: `{"mode":"off"}` / `{"mode":"preview","chars":200}` /
156/// `{"mode":"full"}`.
157#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
158#[serde(tag = "mode", rename_all = "snake_case")]
159pub enum PromptProjection {
160    #[default]
161    Off,
162    Preview {
163        chars: usize,
164    },
165    Full,
166}
167
168impl PendingFilter {
169    /// Preset: query identification only (`query_id` + `max_tokens`).
170    pub fn preset_meta() -> Self {
171        Self {
172            query_id: true,
173            max_tokens: true,
174            ..Self::default()
175        }
176    }
177
178    /// Preset: meta + first N chars of the prompt. Uses the hard default
179    /// for N (`DEFAULT_PROMPT_PREVIEW_CHARS`).
180    pub fn preset_preview() -> Self {
181        Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
182    }
183
184    /// Preset: meta + first `chars` chars of the prompt. Lets callers
185    /// flow a config-resolved length (e.g. env var) into the preset.
186    pub fn preset_preview_with(chars: usize) -> Self {
187        Self {
188            query_id: true,
189            max_tokens: true,
190            prompt: PromptProjection::Preview { chars },
191            ..Self::default()
192        }
193    }
194
195    /// Preset: every field including the full prompt (debug use).
196    pub fn preset_full() -> Self {
197        Self {
198            query_id: true,
199            max_tokens: true,
200            system: true,
201            grounded: true,
202            underspecified: true,
203            prompt: PromptProjection::Full,
204        }
205    }
206
207    /// Resolve a preset by name. Unknown names return `None` so that
208    /// callers can surface a typed error rather than silently falling
209    /// back to a default projection.
210    pub fn from_preset(name: &str) -> Option<Self> {
211        match name {
212            "meta" => Some(Self::preset_meta()),
213            "preview" => Some(Self::preset_preview()),
214            "full" => Some(Self::preset_full()),
215            _ => None,
216        }
217    }
218
219    /// Same as [`Self::from_preset`] but lets `"preview"` pick up a
220    /// caller-supplied char count (config / env override).
221    pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
222        match name {
223            "meta" => Some(Self::preset_meta()),
224            "preview" => Some(Self::preset_preview_with(preview_chars)),
225            "full" => Some(Self::preset_full()),
226            _ => None,
227        }
228    }
229}
230
231/// Project a single `LlmQuery` into the JSON object requested by `filter`.
232///
233/// UTF-8 safety: `PromptProjection::Preview { chars }` uses `chars().take(N)`
234/// so the cut never splits a multi-byte code point.
235fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
236    let mut obj = serde_json::Map::new();
237    if f.query_id {
238        obj.insert("query_id".into(), q.id.as_str().into());
239    }
240    if f.max_tokens {
241        obj.insert("max_tokens".into(), q.max_tokens.into());
242    }
243    if f.system {
244        obj.insert(
245            "system".into(),
246            match &q.system {
247                Some(s) => serde_json::Value::String(s.clone()),
248                None => serde_json::Value::Null,
249            },
250        );
251    }
252    if f.grounded {
253        obj.insert("grounded".into(), q.grounded.into());
254    }
255    if f.underspecified {
256        obj.insert("underspecified".into(), q.underspecified.into());
257    }
258    match &f.prompt {
259        PromptProjection::Off => {}
260        PromptProjection::Full => {
261            obj.insert("prompt".into(), q.prompt.clone().into());
262        }
263        PromptProjection::Preview { chars } => {
264            let preview: String = q.prompt.chars().take(*chars).collect();
265            obj.insert("prompt_preview".into(), preview.into());
266        }
267    }
268    serde_json::Value::Object(obj)
269}
270
271// ─── Session ─────────────────────────────────────────────────
272
273/// A Lua execution session with domain state tracking.
274///
275/// Each session owns a dedicated Lua VM via `_vm_driver`. The VM's OS thread
276/// stays alive as long as the driver is held, and exits cleanly when the
277/// session is dropped (channel closes → Lua thread drains and exits).
278pub struct Session {
279    state: ExecutionState,
280    metrics: ExecutionMetrics,
281    observer: MetricsObserver,
282    llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
283    exec_task: AsyncTask,
284    /// QueryId → resp_tx. Populated on Paused, cleared on resume.
285    resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
286    /// Per-session VM lifecycle driver. Keeps the Lua thread alive.
287    /// Dropped when the session completes or is abandoned.
288    _vm_driver: AsyncIsleDriver,
289    /// Last activity timestamp. Updated on creation and each feed_one().
290    /// Used by GC to identify idle sessions for cleanup.
291    last_active: std::time::Instant,
292}
293
294impl Session {
295    pub fn new(
296        llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
297        exec_task: AsyncTask,
298        metrics: ExecutionMetrics,
299        vm_driver: AsyncIsleDriver,
300    ) -> Self {
301        let observer = metrics.create_observer();
302        Self {
303            state: ExecutionState::Running,
304            metrics,
305            observer,
306            llm_rx,
307            exec_task,
308            resp_txs: HashMap::new(),
309            _vm_driver: vm_driver,
310            last_active: std::time::Instant::now(),
311        }
312    }
313
314    /// Wait for the next event from Lua execution.
315    ///
316    /// Called after initial start or after feeding all responses.
317    /// State must be Running when called.
318    async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
319        tokio::select! {
320            result = &mut self.exec_task => {
321                match result {
322                    Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
323                        Ok(v) => {
324                            self.state.complete(v.clone()).map_err(|e| {
325                                SessionError::InvalidTransition(e.to_string())
326                            })?;
327                            self.observer.on_completed(&v);
328                            Ok(FeedResult::Finished(ExecutionResult {
329                                state: TerminalState::Completed { result: v },
330                                metrics: self.take_metrics(),
331                            }))
332                        }
333                        Err(e) => self.fail_with(format!("JSON parse: {e}")),
334                    },
335                    Err(e) => self.fail_with(e.to_string()),
336                }
337            }
338            Some(req) = self.llm_rx.recv() => {
339                let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
340                    id: qr.id.clone(),
341                    prompt: qr.prompt.clone(),
342                    system: qr.system.clone(),
343                    max_tokens: qr.max_tokens,
344                    grounded: qr.grounded,
345                    underspecified: qr.underspecified,
346                }).collect();
347
348                for qr in req.queries {
349                    self.resp_txs.insert(qr.id, qr.resp_tx);
350                }
351
352                self.state.pause(queries.clone()).map_err(|e| {
353                    SessionError::InvalidTransition(e.to_string())
354                })?;
355                self.observer.on_paused(&queries);
356                Ok(FeedResult::Paused { queries })
357            }
358        }
359    }
360
361    /// Feed one response by query_id.
362    ///
363    /// Returns Ok(true) if all queries are now complete, Ok(false) if still waiting.
364    fn feed_one(
365        &mut self,
366        query_id: &QueryId,
367        response: String,
368        usage: Option<&algocline_core::TokenUsage>,
369    ) -> Result<bool, SessionError> {
370        // Update activity timestamp on each feed.
371        self.last_active = std::time::Instant::now();
372
373        // Track response before ownership transfer.
374        self.observer.on_response_fed(query_id, &response, usage);
375
376        // Runtime: send response to Lua thread (unblocks resp_rx.recv())
377        if let Some(tx) = self.resp_txs.remove(query_id) {
378            let _ = tx.send(Ok(response.clone()));
379        }
380
381        // Domain: record in state machine
382        let complete = self
383            .state
384            .feed(query_id, response)
385            .map_err(SessionError::Feed)?;
386
387        if complete {
388            // Domain: transition Paused(complete) → Running
389            self.state
390                .take_responses()
391                .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
392            self.observer.on_resumed();
393        } else {
394            self.observer
395                .on_partial_feed(query_id, self.state.remaining());
396        }
397
398        Ok(complete)
399    }
400
401    fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
402        self.state
403            .fail(msg.clone())
404            .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
405        self.observer.on_failed(&msg);
406        Ok(FeedResult::Finished(ExecutionResult {
407            state: TerminalState::Failed { error: msg },
408            metrics: self.take_metrics(),
409        }))
410    }
411
412    fn take_metrics(&mut self) -> ExecutionMetrics {
413        std::mem::take(&mut self.metrics)
414    }
415
416    /// Lightweight snapshot for external observation (alc_status).
417    ///
418    /// Returns session state label and running metrics without consuming
419    /// or modifying the session.
420    ///
421    /// `pending_filter` is opt-in projection for the currently pending
422    /// LLM queries:
423    /// - `None` (default) → only `pending_queries: N` (integer count) is
424    ///   emitted, preserving the v0.x wire shape for light-weight polling.
425    /// - `Some(filter)` → adds a `pending: [...]` array whose entries are
426    ///   per-query objects projected through the filter's field flags.
427    ///
428    /// The integer `pending_queries` field is always retained when paused
429    /// so existing consumers do not break.
430    pub fn snapshot(&self, pending_filter: Option<&PendingFilter>) -> serde_json::Value {
431        let state_label = match &self.state {
432            ExecutionState::Running => "running",
433            ExecutionState::Paused(_) => "paused",
434            _ => "terminal",
435        };
436
437        let mut json = serde_json::json!({
438            "state": state_label,
439        });
440
441        let metrics = self.metrics.snapshot();
442        if !metrics.is_null() {
443            json["metrics"] = metrics;
444        }
445
446        // Pending query projection (additive; count is always present)
447        if let ExecutionState::Paused(pending) = &self.state {
448            json["pending_queries"] = pending.remaining().into();
449
450            if let Some(filter) = pending_filter {
451                let items: Vec<serde_json::Value> = pending
452                    .pending_queries()
453                    .iter()
454                    .map(|q| project_query(q, filter))
455                    .collect();
456                json["pending"] = serde_json::Value::Array(items);
457            }
458        }
459
460        json
461    }
462
463    /// Returns true if the session has been idle longer than `ttl`.
464    ///
465    /// Uses `saturating_duration_since` to avoid panics if the clock drifts
466    /// backwards (though this is extremely rare with monotonic clocks).
467    pub fn is_expired(&self, ttl: Duration) -> bool {
468        is_expired_impl(self.last_active, ttl)
469    }
470}
471
472/// Core expiry check, extracted for testability.
473fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
474    std::time::Instant::now().saturating_duration_since(last_active) >= ttl
475}
476
477// ─── Registry ────────────────────────────────────────────────
478
479/// Manages active sessions.
480///
481/// # Locking design (lock **C**)
482///
483/// Uses `tokio::sync::Mutex` because `feed_response` holds the lock
484/// while calling `Session::feed_one()` (which itself acquires the
485/// per-session `std::sync::Mutex<SessionStatus>`, lock **A**). The lock
486/// ordering invariant is always **C → A** — no code path acquires A
487/// then C, so deadlock is structurally impossible.
488///
489/// `tokio::sync::Mutex` is chosen here (rather than `std::sync::Mutex`)
490/// because `feed_response` must take the session out of the map for
491/// the async `wait_event()` call. The two-phase pattern (lock → remove
492/// → unlock → await → lock → reinsert) requires an async-aware mutex
493/// to avoid holding the lock across the `wait_event().await`.
494///
495/// ## Contention
496///
497/// `list_snapshots()` (from `alc_status`) holds lock C while iterating
498/// all sessions. During this time, `feed_response` for any session is
499/// blocked. Given that snapshot iteration is O(n) with n = active
500/// sessions (typically 1–3) and each snapshot takes microseconds, this
501/// is acceptable. If session count grows significantly, consider
502/// switching to a concurrent map or per-session locks.
503///
504/// ## Interaction with lock A
505///
506/// `Session::snapshot()` (called under lock C in `list_snapshots`)
507/// acquires lock A via `ExecutionMetrics::snapshot()`. This is safe:
508/// - Lock order: C → A (consistent with `feed_response`)
509/// - Lock A hold time: microseconds (JSON field reads)
510/// - Lock A is per-session (no cross-session contention)
511pub struct SessionRegistry {
512    sessions: Arc<Mutex<HashMap<String, Session>>>,
513}
514
515impl Default for SessionRegistry {
516    fn default() -> Self {
517        Self::new()
518    }
519}
520
521impl SessionRegistry {
522    pub fn new() -> Self {
523        Self {
524            sessions: Arc::new(Mutex::new(HashMap::new())),
525        }
526    }
527
528    /// Start execution and wait for first event (pause or completion).
529    pub async fn start_execution(
530        &self,
531        mut session: Session,
532    ) -> Result<(String, FeedResult), SessionError> {
533        let session_id = gen_session_id();
534        let result = session.wait_event().await?;
535
536        if matches!(result, FeedResult::Paused { .. }) {
537            self.sessions
538                .lock()
539                .await
540                .insert(session_id.clone(), session);
541        }
542
543        Ok((session_id, result))
544    }
545
546    /// Feed one response to a paused session by query_id.
547    ///
548    /// If this completes all pending queries, the session resumes and
549    /// returns the next event (Paused or Finished).
550    /// If queries remain, returns Accepted { remaining }.
551    pub async fn feed_response(
552        &self,
553        session_id: &str,
554        query_id: &QueryId,
555        response: String,
556        usage: Option<&algocline_core::TokenUsage>,
557    ) -> Result<FeedResult, SessionError> {
558        // 1. Feed under lock
559        let complete = {
560            let mut map = self.sessions.lock().await;
561            let session = map
562                .get_mut(session_id)
563                .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
564
565            let complete = session.feed_one(query_id, response, usage)?;
566
567            if !complete {
568                return Ok(FeedResult::Accepted {
569                    remaining: session.state.remaining(),
570                });
571            }
572
573            complete
574        };
575
576        // 2. All complete → take session out for async resume
577        debug_assert!(complete);
578        let mut session = {
579            let mut map = self.sessions.lock().await;
580            map.remove(session_id)
581                .ok_or_else(|| SessionError::NotFound(session_id.into()))?
582        };
583
584        let result = session.wait_event().await?;
585
586        if matches!(result, FeedResult::Paused { .. }) {
587            self.sessions
588                .lock()
589                .await
590                .insert(session_id.into(), session);
591        }
592
593        Ok(result)
594    }
595
596    /// Resolve the sole pending query ID for a session.
597    ///
598    /// When `alc_continue` is called without an explicit `query_id`, this
599    /// method checks if exactly one query is pending and returns its ID.
600    /// Returns an error if zero or multiple queries are pending.
601    pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
602        let map = self.sessions.lock().await;
603        let session = map
604            .get(session_id)
605            .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
606        let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
607        match keys.len() {
608            0 => Err(SessionError::InvalidTransition("no pending queries".into())),
609            1 => keys
610                .into_iter()
611                .next()
612                .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
613            n => Err(SessionError::InvalidTransition(format!(
614                "{n} queries pending; specify query_id explicitly"
615            ))),
616        }
617    }
618
619    /// Snapshot all active sessions for external observation (alc_status).
620    ///
621    /// Returns a map of session_id → snapshot JSON. Only includes sessions
622    /// currently held in the registry (i.e. paused, awaiting responses).
623    /// Sessions that have completed are already removed from the registry.
624    ///
625    /// `pending_filter` is forwarded verbatim to each session's
626    /// [`Session::snapshot`]; see its docs for semantics.
627    pub async fn list_snapshots(
628        &self,
629        pending_filter: Option<&PendingFilter>,
630    ) -> HashMap<String, serde_json::Value> {
631        let map = self.sessions.lock().await;
632        map.iter()
633            .map(|(id, session)| (id.clone(), session.snapshot(pending_filter)))
634            .collect()
635    }
636
637    /// Spawn a background GC task that reaps sessions idle longer than `ttl`.
638    ///
639    /// The task runs every 60 seconds. When the process exits, the task is
640    /// naturally terminated. No `JoinHandle` is retained — process exit is
641    /// sufficient for cleanup in MCP server deployments.
642    pub fn spawn_gc_task(&self, ttl: Duration) {
643        let sessions = Arc::clone(&self.sessions);
644        tokio::spawn(async move {
645            let mut interval = tokio::time::interval(Duration::from_secs(60));
646            loop {
647                interval.tick().await;
648                let mut map = sessions.lock().await;
649                let expired: Vec<String> = map
650                    .iter()
651                    .filter(|(_, s)| s.is_expired(ttl))
652                    .map(|(id, _)| id.clone())
653                    .collect();
654                for id in &expired {
655                    tracing::info!(session_id = %id, "GC: reaping expired session");
656                    map.remove(id);
657                }
658            }
659        });
660    }
661}
662
663/// Generate a non-deterministic session ID.
664///
665/// MCP spec requires "secure, non-deterministic session IDs" to prevent
666/// session hijacking. Uses timestamp + random bytes for uniqueness and
667/// unpredictability.
668///
669/// # `unwrap_or_default` on `duration_since(UNIX_EPOCH)`
670///
671/// `SystemTime::now().duration_since(UNIX_EPOCH)` can fail if the system
672/// clock is set before 1970-01-01 (e.g. NTP drift, misconfigured VM).
673/// The Rust std docs recommend `expect()` or `match` for explicit handling,
674/// but `expect` would panic in library code (prohibited by project policy).
675///
676/// `unwrap_or_default` returns `Duration::ZERO` on failure, yielding
677/// timestamp `0`. This is acceptable here because the 8-byte random
678/// suffix (16 hex chars of entropy) independently guarantees uniqueness
679/// and unpredictability — the timestamp is a convenience prefix, not
680/// a security-critical component.
681fn gen_session_id() -> String {
682    use std::time::{SystemTime, UNIX_EPOCH};
683    let ts = SystemTime::now()
684        .duration_since(UNIX_EPOCH)
685        .unwrap_or_default()
686        .as_nanos();
687    // 8 random bytes → 16 hex chars of entropy
688    let random: u64 = {
689        use std::collections::hash_map::RandomState;
690        use std::hash::{BuildHasher, Hasher};
691        let s = RandomState::new();
692        let mut h = s.build_hasher();
693        h.write_u128(ts);
694        h.finish()
695    };
696    format!("s-{ts:x}-{random:016x}")
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702    use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
703    use serde_json::json;
704
705    fn make_query(index: usize) -> LlmQuery {
706        LlmQuery {
707            id: QueryId::batch(index),
708            prompt: format!("prompt-{index}"),
709            system: None,
710            max_tokens: 100,
711            grounded: false,
712            underspecified: false,
713        }
714    }
715
716    // ─── FeedResult::to_json tests ───
717
718    #[test]
719    fn to_json_accepted() {
720        let result = FeedResult::Accepted { remaining: 3 };
721        let json = result.to_json("s-123");
722        assert_eq!(json["status"], "accepted");
723        assert_eq!(json["remaining"], 3);
724    }
725
726    #[test]
727    fn to_json_paused_single_query() {
728        let query = LlmQuery {
729            id: QueryId::single(),
730            prompt: "What is 2+2?".into(),
731            system: Some("You are a calculator.".into()),
732            max_tokens: 50,
733            grounded: false,
734            underspecified: false,
735        };
736        let result = FeedResult::Paused {
737            queries: vec![query],
738        };
739        let json = result.to_json("s-abc");
740
741        assert_eq!(json["status"], "needs_response");
742        assert_eq!(json["session_id"], "s-abc");
743        assert_eq!(json["prompt"], "What is 2+2?");
744        assert_eq!(json["system"], "You are a calculator.");
745        assert_eq!(json["max_tokens"], 50);
746        // single query mode: no "queries" array
747        assert!(json.get("queries").is_none());
748        // grounded=false must be absent
749        assert!(
750            json.get("grounded").is_none(),
751            "grounded key must be absent when false"
752        );
753        // underspecified=false must be absent
754        assert!(
755            json.get("underspecified").is_none(),
756            "underspecified key must be absent when false"
757        );
758    }
759
760    #[test]
761    fn to_json_paused_single_query_grounded() {
762        let query = LlmQuery {
763            id: QueryId::single(),
764            prompt: "verify this claim".into(),
765            system: None,
766            max_tokens: 200,
767            grounded: true,
768            underspecified: false,
769        };
770        let result = FeedResult::Paused {
771            queries: vec![query],
772        };
773        let json = result.to_json("s-grounded");
774
775        assert_eq!(json["status"], "needs_response");
776        assert_eq!(
777            json["grounded"], true,
778            "grounded must appear in single-query MCP JSON"
779        );
780    }
781
782    #[test]
783    fn to_json_paused_single_query_underspecified() {
784        let query = LlmQuery {
785            id: QueryId::single(),
786            prompt: "what output format do you need?".into(),
787            system: None,
788            max_tokens: 200,
789            grounded: false,
790            underspecified: true,
791        };
792        let result = FeedResult::Paused {
793            queries: vec![query],
794        };
795        let json = result.to_json("s-underspec");
796
797        assert_eq!(json["status"], "needs_response");
798        assert_eq!(
799            json["underspecified"], true,
800            "underspecified must appear in single-query MCP JSON"
801        );
802        assert!(
803            json.get("grounded").is_none(),
804            "grounded must be absent when false"
805        );
806    }
807
808    #[test]
809    fn to_json_paused_multiple_queries_mixed_grounded() {
810        let grounded_query = LlmQuery {
811            id: QueryId::batch(0),
812            prompt: "verify".into(),
813            system: None,
814            max_tokens: 100,
815            grounded: true,
816            underspecified: false,
817        };
818        let normal_query = LlmQuery {
819            id: QueryId::batch(1),
820            prompt: "generate".into(),
821            system: None,
822            max_tokens: 100,
823            grounded: false,
824            underspecified: false,
825        };
826        let result = FeedResult::Paused {
827            queries: vec![grounded_query, normal_query],
828        };
829        let json = result.to_json("s-batch");
830
831        let qs = json["queries"].as_array().expect("queries should be array");
832        assert_eq!(
833            qs[0]["grounded"], true,
834            "grounded query must have grounded=true"
835        );
836        assert!(
837            qs[1].get("grounded").is_none(),
838            "non-grounded query must omit grounded key"
839        );
840    }
841
842    #[test]
843    fn to_json_paused_multiple_queries_mixed_underspecified() {
844        let underspec_query = LlmQuery {
845            id: QueryId::batch(0),
846            prompt: "clarify intent".into(),
847            system: None,
848            max_tokens: 100,
849            grounded: false,
850            underspecified: true,
851        };
852        let normal_query = LlmQuery {
853            id: QueryId::batch(1),
854            prompt: "generate".into(),
855            system: None,
856            max_tokens: 100,
857            grounded: false,
858            underspecified: false,
859        };
860        let result = FeedResult::Paused {
861            queries: vec![underspec_query, normal_query],
862        };
863        let json = result.to_json("s-batch-us");
864
865        let qs = json["queries"].as_array().expect("queries should be array");
866        assert_eq!(
867            qs[0]["underspecified"], true,
868            "underspecified query must have underspecified=true"
869        );
870        assert!(
871            qs[1].get("underspecified").is_none(),
872            "non-underspecified query must omit underspecified key"
873        );
874    }
875
876    #[test]
877    fn to_json_paused_single_query_no_system() {
878        let query = LlmQuery {
879            id: QueryId::single(),
880            prompt: "hello".into(),
881            system: None,
882            max_tokens: 1024,
883            grounded: false,
884            underspecified: false,
885        };
886        let result = FeedResult::Paused {
887            queries: vec![query],
888        };
889        let json = result.to_json("s-x");
890
891        assert_eq!(json["status"], "needs_response");
892        assert!(json["system"].is_null());
893    }
894
895    #[test]
896    fn to_json_paused_multiple_queries() {
897        let queries = vec![make_query(0), make_query(1), make_query(2)];
898        let result = FeedResult::Paused { queries };
899        let json = result.to_json("s-multi");
900
901        assert_eq!(json["status"], "needs_response");
902        assert_eq!(json["session_id"], "s-multi");
903
904        let qs = json["queries"].as_array().expect("queries should be array");
905        assert_eq!(qs.len(), 3);
906        assert_eq!(qs[0]["id"], "q-0");
907        assert_eq!(qs[0]["prompt"], "prompt-0");
908        assert_eq!(qs[1]["id"], "q-1");
909        assert_eq!(qs[2]["id"], "q-2");
910    }
911
912    #[test]
913    fn to_json_finished_completed() {
914        let result = FeedResult::Finished(ExecutionResult {
915            state: TerminalState::Completed {
916                result: json!({"answer": 42}),
917            },
918            metrics: ExecutionMetrics::new(),
919        });
920        let json = result.to_json("s-done");
921
922        assert_eq!(json["status"], "completed");
923        assert_eq!(json["result"]["answer"], 42);
924        assert!(json.get("stats").is_some());
925    }
926
927    #[test]
928    fn to_json_finished_failed() {
929        let result = FeedResult::Finished(ExecutionResult {
930            state: TerminalState::Failed {
931                error: "lua error: bad argument".into(),
932            },
933            metrics: ExecutionMetrics::new(),
934        });
935        let json = result.to_json("s-err");
936
937        assert_eq!(json["status"], "error");
938        assert_eq!(json["error"], "lua error: bad argument");
939    }
940
941    #[test]
942    fn to_json_finished_cancelled() {
943        let result = FeedResult::Finished(ExecutionResult {
944            state: TerminalState::Cancelled,
945            metrics: ExecutionMetrics::new(),
946        });
947        let json = result.to_json("s-cancel");
948
949        assert_eq!(json["status"], "cancelled");
950        assert!(json.get("stats").is_some());
951    }
952
953    // ─── gen_session_id tests ───
954
955    #[test]
956    fn session_id_starts_with_prefix() {
957        let id = gen_session_id();
958        assert!(id.starts_with("s-"), "id should start with 's-': {id}");
959    }
960
961    #[test]
962    fn session_id_uniqueness() {
963        let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
964        let set: std::collections::HashSet<&String> = ids.iter().collect();
965        assert_eq!(set.len(), 10, "10 IDs should all be unique");
966    }
967
968    // ─── is_expired_impl tests ───
969    //
970    // Session::is_expired delegates to is_expired_impl. Testing the impl
971    // directly avoids the need to construct a full Session (which requires
972    // a real Lua VM + channels).
973
974    #[test]
975    fn is_expired_impl_fresh_instant_not_expired() {
976        // A just-created instant should not be expired with a non-zero TTL
977        let now = std::time::Instant::now();
978        assert!(!is_expired_impl(now, Duration::from_secs(1)));
979    }
980
981    #[test]
982    fn is_expired_impl_old_instant_expired() {
983        // Simulate a session idle for 2 hours by backdating last_active
984        let two_hours_ago = std::time::Instant::now()
985            .checked_sub(Duration::from_secs(7200))
986            .expect("checked_sub should succeed with sane duration");
987        // TTL = 1 hour: should be expired
988        assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
989    }
990
991    #[test]
992    fn is_expired_impl_not_yet_expired() {
993        // Simulate a session idle for 1 hour
994        let one_hour_ago = std::time::Instant::now()
995            .checked_sub(Duration::from_secs(3600))
996            .expect("checked_sub should succeed with sane duration");
997        // TTL = 3 hours: should NOT be expired yet
998        assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
999    }
1000
1001    #[test]
1002    fn is_expired_impl_zero_ttl_always_expired() {
1003        // TTL = 0: any instant is immediately expired (edge case)
1004        let now = std::time::Instant::now();
1005        assert!(is_expired_impl(now, Duration::ZERO));
1006    }
1007
1008    // ─── PendingFilter preset tests ───
1009
1010    #[test]
1011    fn pending_filter_default_is_all_off() {
1012        let f = PendingFilter::default();
1013        assert!(!f.query_id);
1014        assert!(!f.max_tokens);
1015        assert!(!f.system);
1016        assert!(!f.grounded);
1017        assert!(!f.underspecified);
1018        assert!(matches!(f.prompt, PromptProjection::Off));
1019    }
1020
1021    #[test]
1022    fn pending_filter_preset_meta_flags() {
1023        let f = PendingFilter::preset_meta();
1024        assert!(f.query_id);
1025        assert!(f.max_tokens);
1026        assert!(!f.system);
1027        assert!(!f.grounded);
1028        assert!(!f.underspecified);
1029        assert!(
1030            matches!(f.prompt, PromptProjection::Off),
1031            "meta preset must not project prompt content"
1032        );
1033    }
1034
1035    #[test]
1036    fn pending_filter_preset_preview_uses_default_chars() {
1037        let f = PendingFilter::preset_preview();
1038        assert!(f.query_id);
1039        assert!(f.max_tokens);
1040        match f.prompt {
1041            PromptProjection::Preview { chars } => {
1042                assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1043            }
1044            other => panic!("expected Preview, got {other:?}"),
1045        }
1046    }
1047
1048    #[test]
1049    fn pending_filter_preset_preview_with_custom_chars() {
1050        let f = PendingFilter::preset_preview_with(42);
1051        match f.prompt {
1052            PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1053            other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1054        }
1055    }
1056
1057    #[test]
1058    fn pending_filter_preset_full_flags_all_on() {
1059        let f = PendingFilter::preset_full();
1060        assert!(f.query_id);
1061        assert!(f.max_tokens);
1062        assert!(f.system);
1063        assert!(f.grounded);
1064        assert!(f.underspecified);
1065        assert!(matches!(f.prompt, PromptProjection::Full));
1066    }
1067
1068    #[test]
1069    fn pending_filter_from_preset_known_names() {
1070        assert!(PendingFilter::from_preset("meta").is_some());
1071        assert!(PendingFilter::from_preset("preview").is_some());
1072        assert!(PendingFilter::from_preset("full").is_some());
1073    }
1074
1075    #[test]
1076    fn pending_filter_from_preset_unknown_returns_none() {
1077        // Typo-protection invariant: caller must surface an error, not
1078        // silently fall back to a default projection.
1079        assert!(PendingFilter::from_preset("").is_none());
1080        assert!(PendingFilter::from_preset("META").is_none());
1081        assert!(PendingFilter::from_preset("bogus").is_none());
1082    }
1083
1084    #[test]
1085    fn pending_filter_from_preset_with_overrides_preview_chars() {
1086        // "preview" respects the per-call chars count (flowed in from env
1087        // or config); other presets ignore it.
1088        let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1089        match f.prompt {
1090            PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1091            other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1092        }
1093
1094        let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1095        assert!(matches!(f_meta.prompt, PromptProjection::Off));
1096
1097        let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1098        assert!(matches!(f_full.prompt, PromptProjection::Full));
1099    }
1100
1101    // ─── project_query tests ───
1102
1103    #[test]
1104    fn project_query_default_filter_produces_empty_object() {
1105        let q = make_query(0);
1106        let v = project_query(&q, &PendingFilter::default());
1107        let obj = v.as_object().expect("object");
1108        assert!(obj.is_empty(), "default filter should project nothing");
1109    }
1110
1111    #[test]
1112    fn project_query_meta_preset_has_id_and_max_tokens_only() {
1113        let q = make_query(0);
1114        let v = project_query(&q, &PendingFilter::preset_meta());
1115        let obj = v.as_object().expect("object");
1116        assert_eq!(obj.len(), 2);
1117        assert_eq!(v["query_id"], "q-0");
1118        assert_eq!(v["max_tokens"], 100);
1119        assert!(obj.get("prompt").is_none());
1120        assert!(obj.get("prompt_preview").is_none());
1121        assert!(obj.get("system").is_none());
1122        assert!(obj.get("grounded").is_none());
1123        assert!(obj.get("underspecified").is_none());
1124    }
1125
1126    #[test]
1127    fn project_query_full_preset_has_all_fields() {
1128        let q = LlmQuery {
1129            id: QueryId::batch(0),
1130            prompt: "hi".into(),
1131            system: Some("sys".into()),
1132            max_tokens: 100,
1133            grounded: true,
1134            underspecified: true,
1135        };
1136        let v = project_query(&q, &PendingFilter::preset_full());
1137        assert_eq!(v["query_id"], "q-0");
1138        assert_eq!(v["max_tokens"], 100);
1139        assert_eq!(v["system"], "sys");
1140        assert_eq!(v["grounded"], true);
1141        assert_eq!(v["underspecified"], true);
1142        assert_eq!(v["prompt"], "hi");
1143        assert!(v.get("prompt_preview").is_none());
1144    }
1145
1146    #[test]
1147    fn project_query_preview_truncates_at_char_count() {
1148        let q = LlmQuery {
1149            id: QueryId::batch(0),
1150            prompt: "abcdefghij".into(),
1151            system: None,
1152            max_tokens: 10,
1153            grounded: false,
1154            underspecified: false,
1155        };
1156        let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1157        assert_eq!(v["prompt_preview"], "abcde");
1158        assert!(v.get("prompt").is_none());
1159    }
1160
1161    #[test]
1162    fn project_query_preview_utf8_multibyte_safe() {
1163        // Japanese characters are 3-byte UTF-8 each; chars().take(N) must
1164        // never split a codepoint. Taking 3 chars from 5 must yield exactly
1165        // 3 chars (not bytes), and the String must be valid UTF-8.
1166        let prompt = "あいうえお";
1167        let q = LlmQuery {
1168            id: QueryId::batch(0),
1169            prompt: prompt.to_string(),
1170            system: None,
1171            max_tokens: 10,
1172            grounded: false,
1173            underspecified: false,
1174        };
1175        let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1176        let preview = v["prompt_preview"].as_str().expect("str");
1177        assert_eq!(preview, "あいう");
1178        assert_eq!(preview.chars().count(), 3);
1179    }
1180
1181    #[test]
1182    fn project_query_preview_chars_over_length_returns_whole_prompt() {
1183        let q = LlmQuery {
1184            id: QueryId::batch(0),
1185            prompt: "abc".into(),
1186            system: None,
1187            max_tokens: 10,
1188            grounded: false,
1189            underspecified: false,
1190        };
1191        let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1192        assert_eq!(v["prompt_preview"], "abc");
1193    }
1194
1195    #[test]
1196    fn project_query_system_field_null_when_absent() {
1197        let q = LlmQuery {
1198            id: QueryId::batch(0),
1199            prompt: "p".into(),
1200            system: None,
1201            max_tokens: 10,
1202            grounded: false,
1203            underspecified: false,
1204        };
1205        let filter = PendingFilter {
1206            system: true,
1207            ..Default::default()
1208        };
1209        let v = project_query(&q, &filter);
1210        assert!(
1211            v["system"].is_null(),
1212            "absent system must serialize as null"
1213        );
1214    }
1215
1216    // ─── PendingFilter deserialization (MCP custom object path) ───
1217
1218    #[test]
1219    fn pending_filter_deserialize_custom_object_preview() {
1220        // MCP callers may pass a raw JSON filter rather than a preset name.
1221        let raw = serde_json::json!({
1222            "query_id": true,
1223            "prompt": { "mode": "preview", "chars": 50 }
1224        });
1225        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1226        assert!(f.query_id);
1227        match f.prompt {
1228            PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1229            other => panic!("expected Preview, got {other:?}"),
1230        }
1231    }
1232
1233    #[test]
1234    fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1235        // serde(default) on every field means a `{}` object is valid and
1236        // equivalent to PendingFilter::default().
1237        let raw = serde_json::json!({});
1238        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1239        assert!(!f.query_id);
1240        assert!(matches!(f.prompt, PromptProjection::Off));
1241    }
1242
1243    #[test]
1244    fn pending_filter_deserialize_prompt_full_tag() {
1245        let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1246        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1247        assert!(matches!(f.prompt, PromptProjection::Full));
1248    }
1249}