Skip to main content

algocline_engine/
session.rs

1//! Session-based Lua execution with pause/resume on alc.llm() calls.
2//!
3//! Runtime layer: ties Domain (ExecutionState) and Metrics (ExecutionMetrics)
4//! together with channel-based Lua pause/resume machinery.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use algocline_core::{
12    ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
13    TerminalState,
14};
15use mlua_isle::{AsyncIsleDriver, AsyncTask};
16use serde_json::json;
17use tokio::sync::Mutex;
18
19use crate::llm_bridge::LlmRequest;
20
21// ─── Error types (Runtime layer) ─────────────────────────────
22
23#[derive(Debug, thiserror::Error)]
24pub enum SessionError {
25    #[error("session '{0}' not found")]
26    NotFound(String),
27    #[error(transparent)]
28    Feed(#[from] algocline_core::FeedError),
29    #[error("invalid transition: {0}")]
30    InvalidTransition(String),
31}
32
33// ─── Result types (Runtime layer) ────────────────────────────
34
35/// Session completion data: terminal state + metrics.
36#[derive(serde::Serialize)]
37pub struct ExecutionResult {
38    pub state: TerminalState,
39    pub metrics: ExecutionMetrics,
40}
41
42/// Result of a session interaction (start or feed).
43#[derive(serde::Serialize)]
44pub enum FeedResult {
45    /// Partial feed accepted, still waiting for more responses.
46    Accepted { remaining: usize },
47    /// All queries answered, Lua re-paused with new queries.
48    Paused { queries: Vec<LlmQuery> },
49    /// Execution completed (success, failure, or cancellation).
50    Finished(ExecutionResult),
51}
52
53impl FeedResult {
54    /// Convert to JSON for MCP tool response.
55    pub fn to_json(&self, session_id: &str) -> serde_json::Value {
56        match self {
57            Self::Accepted { remaining } => json!({
58                "status": "accepted",
59                "remaining": remaining,
60            }),
61            Self::Paused { queries } => {
62                if queries.len() == 1 {
63                    let q = &queries[0];
64                    let mut obj = json!({
65                        "status": "needs_response",
66                        "session_id": session_id,
67                        "query_id": q.id.as_str(),
68                        "prompt": q.prompt,
69                        "system": q.system,
70                        "max_tokens": q.max_tokens,
71                    });
72                    if q.grounded {
73                        obj["grounded"] = json!(true);
74                    }
75                    if q.underspecified {
76                        obj["underspecified"] = json!(true);
77                    }
78                    obj
79                } else {
80                    let qs: Vec<_> = queries
81                        .iter()
82                        .map(|q| {
83                            let mut obj = json!({
84                                "id": q.id.as_str(),
85                                "prompt": q.prompt,
86                                "system": q.system,
87                                "max_tokens": q.max_tokens,
88                            });
89                            if q.grounded {
90                                obj["grounded"] = json!(true);
91                            }
92                            if q.underspecified {
93                                obj["underspecified"] = json!(true);
94                            }
95                            obj
96                        })
97                        .collect();
98                    json!({
99                        "status": "needs_response",
100                        "session_id": session_id,
101                        "queries": qs,
102                    })
103                }
104            }
105            Self::Finished(result) => match &result.state {
106                TerminalState::Completed { result: val } => json!({
107                    "status": "completed",
108                    "result": val,
109                    "stats": result.metrics.to_json(),
110                }),
111                TerminalState::Failed { error } => json!({
112                    "status": "error",
113                    "error": error,
114                }),
115                TerminalState::Cancelled => json!({
116                    "status": "cancelled",
117                    "stats": result.metrics.to_json(),
118                }),
119            },
120        }
121    }
122}
123
124// ─── PendingFilter (field-level filter for Session::snapshot) ────
125
126/// Default preview length (chars) used when `PendingFilter::preset_preview()`
127/// is constructed without an explicit length. Env var
128/// `ALC_PROMPT_PREVIEW_CHARS` (resolved in `AppConfig`) overrides this.
129pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
130
131/// Per-field filter controlling which `LlmQuery` attributes are projected
132/// into a Snapshot's `pending` array.
133///
134/// Adding a new field to `LlmQuery` only requires adding one matching
135/// `bool` here — the shape stays stable so API surface does not grow
136/// enum variants for every new attribute.
137#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
138pub struct PendingFilter {
139    #[serde(default)]
140    pub query_id: bool,
141    #[serde(default)]
142    pub max_tokens: bool,
143    #[serde(default)]
144    pub system: bool,
145    #[serde(default)]
146    pub grounded: bool,
147    #[serde(default)]
148    pub underspecified: bool,
149    #[serde(default)]
150    pub prompt: PromptProjection,
151}
152
153/// Prompt projection mode — 3 states rather than a bool so that truncation
154/// length can travel inside the filter object.
155///
156/// JSON tag is `mode`: `{"mode":"off"}` / `{"mode":"preview","chars":200}` /
157/// `{"mode":"full"}`.
158#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
159#[serde(tag = "mode", rename_all = "snake_case")]
160pub enum PromptProjection {
161    #[default]
162    Off,
163    Preview {
164        chars: usize,
165    },
166    Full,
167}
168
169impl PendingFilter {
170    /// Preset: query identification only (`query_id` + `max_tokens`).
171    pub fn preset_meta() -> Self {
172        Self {
173            query_id: true,
174            max_tokens: true,
175            ..Self::default()
176        }
177    }
178
179    /// Preset: meta + first N chars of the prompt. Uses the hard default
180    /// for N (`DEFAULT_PROMPT_PREVIEW_CHARS`).
181    pub fn preset_preview() -> Self {
182        Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
183    }
184
185    /// Preset: meta + first `chars` chars of the prompt. Lets callers
186    /// flow a config-resolved length (e.g. env var) into the preset.
187    pub fn preset_preview_with(chars: usize) -> Self {
188        Self {
189            query_id: true,
190            max_tokens: true,
191            prompt: PromptProjection::Preview { chars },
192            ..Self::default()
193        }
194    }
195
196    /// Preset: every field including the full prompt (debug use).
197    pub fn preset_full() -> Self {
198        Self {
199            query_id: true,
200            max_tokens: true,
201            system: true,
202            grounded: true,
203            underspecified: true,
204            prompt: PromptProjection::Full,
205        }
206    }
207
208    /// Resolve a preset by name. Unknown names return `None` so that
209    /// callers can surface a typed error rather than silently falling
210    /// back to a default projection.
211    pub fn from_preset(name: &str) -> Option<Self> {
212        match name {
213            "meta" => Some(Self::preset_meta()),
214            "preview" => Some(Self::preset_preview()),
215            "full" => Some(Self::preset_full()),
216            _ => None,
217        }
218    }
219
220    /// Same as [`Self::from_preset`] but lets `"preview"` pick up a
221    /// caller-supplied char count (config / env override).
222    pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
223        match name {
224            "meta" => Some(Self::preset_meta()),
225            "preview" => Some(Self::preset_preview_with(preview_chars)),
226            "full" => Some(Self::preset_full()),
227            _ => None,
228        }
229    }
230}
231
232/// Project a single `LlmQuery` into the JSON object requested by `filter`.
233///
234/// UTF-8 safety: `PromptProjection::Preview { chars }` uses `chars().take(N)`
235/// so the cut never splits a multi-byte code point.
236fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
237    let mut obj = serde_json::Map::new();
238    if f.query_id {
239        obj.insert("query_id".into(), q.id.as_str().into());
240    }
241    if f.max_tokens {
242        obj.insert("max_tokens".into(), q.max_tokens.into());
243    }
244    if f.system {
245        obj.insert(
246            "system".into(),
247            match &q.system {
248                Some(s) => serde_json::Value::String(s.clone()),
249                None => serde_json::Value::Null,
250            },
251        );
252    }
253    if f.grounded {
254        obj.insert("grounded".into(), q.grounded.into());
255    }
256    if f.underspecified {
257        obj.insert("underspecified".into(), q.underspecified.into());
258    }
259    match &f.prompt {
260        PromptProjection::Off => {}
261        PromptProjection::Full => {
262            obj.insert("prompt".into(), q.prompt.clone().into());
263        }
264        PromptProjection::Preview { chars } => {
265            let preview: String = q.prompt.chars().take(*chars).collect();
266            obj.insert("prompt_preview".into(), preview.into());
267        }
268    }
269    serde_json::Value::Object(obj)
270}
271
272// ─── Session ─────────────────────────────────────────────────
273
274/// A Lua execution session with domain state tracking.
275///
276/// Each session owns a dedicated Lua VM via `_vm_driver`. The VM's OS thread
277/// stays alive as long as the driver is held, and exits cleanly when the
278/// session is dropped (channel closes → Lua thread drains and exits).
279pub struct Session {
280    state: ExecutionState,
281    metrics: ExecutionMetrics,
282    observer: MetricsObserver,
283    llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
284    exec_task: AsyncTask,
285    /// QueryId → resp_tx. Populated on Paused, cleared on resume.
286    resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
287    /// Per-session VM lifecycle driver. Keeps the Lua thread alive.
288    /// Dropped when the session completes or is abandoned.
289    _vm_driver: AsyncIsleDriver,
290    /// Last activity timestamp (monotonic). Updated on creation and each feed_one().
291    /// Used by GC to identify idle sessions for cleanup.
292    last_active: std::time::Instant,
293    /// Wall-clock Unix ms when the session was created (immutable after Session::new).
294    started_at_ms: i64,
295    /// Wall-clock Unix ms of the most recent activity (feed_one or session creation).
296    /// Updated with `Relaxed` ordering — observability use only, no cross-thread invariant.
297    last_activity_ms: Arc<AtomicI64>,
298}
299
300impl Session {
301    /// Create a new session.
302    ///
303    /// # Arguments
304    ///
305    /// - `llm_rx` — Receiver for LLM requests from the Lua bridge.
306    /// - `exec_task` — The coroutine execution task handle.
307    /// - `metrics` — Session metrics (owns the LogSink ring buffer; the bridge
308    ///   reads its `log_sink_handle()` separately to wire `print()` / `alc.log()`
309    ///   into the same ring buffer that `metrics.snapshot()` exposes via
310    ///   `recent_logs` in `alc_status`).
311    /// - `vm_driver` — Keeps the Lua OS thread alive.
312    ///
313    /// # Returns
314    ///
315    /// A new `Session` in the `Running` state.
316    pub fn new(
317        llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
318        exec_task: AsyncTask,
319        metrics: ExecutionMetrics,
320        vm_driver: AsyncIsleDriver,
321    ) -> Self {
322        let observer = metrics.create_observer();
323        // Note: duration_since can only fail if the wall clock predates UNIX_EPOCH
324        // (broken system clock). Saturating to 0 is harmless for observability.
325        let started_at_ms = SystemTime::now()
326            .duration_since(UNIX_EPOCH)
327            .unwrap_or_default()
328            .as_millis() as i64;
329        Self {
330            state: ExecutionState::Running,
331            metrics,
332            observer,
333            llm_rx,
334            exec_task,
335            resp_txs: HashMap::new(),
336            _vm_driver: vm_driver,
337            last_active: std::time::Instant::now(),
338            started_at_ms,
339            last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
340        }
341    }
342
343    /// Wait for the next event from Lua execution.
344    ///
345    /// Called after initial start or after feeding all responses.
346    /// State must be Running when called.
347    async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
348        tokio::select! {
349            result = &mut self.exec_task => {
350                match result {
351                    Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
352                        Ok(v) => {
353                            self.state.complete(v.clone()).map_err(|e| {
354                                SessionError::InvalidTransition(e.to_string())
355                            })?;
356                            self.observer.on_completed(&v);
357                            Ok(FeedResult::Finished(ExecutionResult {
358                                state: TerminalState::Completed { result: v },
359                                metrics: self.take_metrics(),
360                            }))
361                        }
362                        Err(e) => self.fail_with(format!("JSON parse: {e}")),
363                    },
364                    Err(e) => self.fail_with(e.to_string()),
365                }
366            }
367            Some(req) = self.llm_rx.recv() => {
368                let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
369                    id: qr.id.clone(),
370                    prompt: qr.prompt.clone(),
371                    system: qr.system.clone(),
372                    max_tokens: qr.max_tokens,
373                    grounded: qr.grounded,
374                    underspecified: qr.underspecified,
375                }).collect();
376
377                for qr in req.queries {
378                    self.resp_txs.insert(qr.id, qr.resp_tx);
379                }
380
381                self.state.pause(queries.clone()).map_err(|e| {
382                    SessionError::InvalidTransition(e.to_string())
383                })?;
384                self.observer.on_paused(&queries);
385                Ok(FeedResult::Paused { queries })
386            }
387        }
388    }
389
390    /// Feed one response by query_id.
391    ///
392    /// # Arguments
393    ///
394    /// - `query_id` — The query to respond to.
395    /// - `response` — The LLM response string.
396    /// - `usage` — Optional token usage from the host.
397    ///
398    /// # Returns
399    ///
400    /// `Ok(true)` if all queries are now complete; `Ok(false)` if more responses remain.
401    ///
402    /// # Errors
403    ///
404    /// Returns `SessionError::Feed` if the state machine rejects the feed.
405    fn feed_one(
406        &mut self,
407        query_id: &QueryId,
408        response: String,
409        usage: Option<&algocline_core::TokenUsage>,
410    ) -> Result<bool, SessionError> {
411        // Update both monotonic and wall-clock activity timestamps on each feed.
412        self.last_active = std::time::Instant::now();
413        // Note: duration_since can only fail if wall clock predates UNIX_EPOCH.
414        // Saturating to 0 is harmless for observability.
415        let now_ms = SystemTime::now()
416            .duration_since(UNIX_EPOCH)
417            .unwrap_or_default()
418            .as_millis() as i64;
419        self.last_activity_ms.store(now_ms, Ordering::Relaxed);
420
421        // Track response before ownership transfer.
422        self.observer.on_response_fed(query_id, &response, usage);
423
424        // Runtime: send response to Lua thread (unblocks resp_rx.recv())
425        if let Some(tx) = self.resp_txs.remove(query_id) {
426            let _ = tx.send(Ok(response.clone()));
427        }
428
429        // Domain: record in state machine
430        let complete = self
431            .state
432            .feed(query_id, response)
433            .map_err(SessionError::Feed)?;
434
435        if complete {
436            // Domain: transition Paused(complete) → Running
437            self.state
438                .take_responses()
439                .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
440            self.observer.on_resumed();
441        } else {
442            self.observer
443                .on_partial_feed(query_id, self.state.remaining());
444        }
445
446        Ok(complete)
447    }
448
449    fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
450        self.state
451            .fail(msg.clone())
452            .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
453        self.observer.on_failed(&msg);
454        Ok(FeedResult::Finished(ExecutionResult {
455            state: TerminalState::Failed { error: msg },
456            metrics: self.take_metrics(),
457        }))
458    }
459
460    fn take_metrics(&mut self) -> ExecutionMetrics {
461        std::mem::take(&mut self.metrics)
462    }
463
464    /// Lightweight snapshot for external observation (alc_status).
465    ///
466    /// Returns session state label and running metrics without consuming
467    /// or modifying the session.
468    ///
469    /// # Arguments
470    ///
471    /// - `pending_filter` — Opt-in projection for the currently pending LLM queries.
472    ///   `None` emits only `pending_queries: N` (integer count), preserving the v0.x
473    ///   wire shape for light-weight polling.  `Some(filter)` adds a `pending: [...]`
474    ///   array projected through the filter's field flags.
475    /// - `include_history` — When `true`, `conversation_history` (≤10 entries) is
476    ///   included in the metrics output.  When `false` (default), the key is absent.
477    ///   High-frequency polling callers should leave this `false` to avoid wire bloat.
478    ///
479    /// # Returns
480    ///
481    /// A `serde_json::Value` snapshot with the following additive fields beyond v0.x:
482    /// - `phase` — 5-value string derived from `ExecutionState`:
483    ///   `"running"`, `"paused"`, `"completed"`, `"failed"`, `"cancelled"`.
484    ///   The existing `state` key is retained for backward compatibility (3-value).
485    /// - `started_at` — Unix millisecond timestamp when the session was created.
486    /// - `last_activity_at` — Unix millisecond timestamp of the most recent feed_one.
487    ///   Note: `started_at` and `last_activity_at` are wall-clock values while
488    ///   expiry GC uses the monotonic `last_active` Instant; they may skew slightly
489    ///   on NTP adjustments (acceptable for observability use).
490    pub fn snapshot(
491        &self,
492        pending_filter: Option<&PendingFilter>,
493        include_history: bool,
494    ) -> serde_json::Value {
495        let state_label = match &self.state {
496            ExecutionState::Running => "running",
497            ExecutionState::Paused(_) => "paused",
498            _ => "terminal",
499        };
500
501        let phase = match &self.state {
502            ExecutionState::Running => "running",
503            ExecutionState::Paused(_) => "paused",
504            ExecutionState::Completed { .. } => "completed",
505            ExecutionState::Failed { .. } => "failed",
506            ExecutionState::Cancelled => "cancelled",
507        };
508
509        let mut json = serde_json::json!({
510            "state": state_label,
511            "phase": phase,
512            "started_at": self.started_at_ms,
513            "last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
514        });
515
516        let metrics = self.metrics.snapshot(include_history);
517        if !metrics.is_null() {
518            json["metrics"] = metrics;
519        }
520
521        // Pending query projection (additive; count is always present)
522        if let ExecutionState::Paused(pending) = &self.state {
523            json["pending_queries"] = pending.remaining().into();
524
525            if let Some(filter) = pending_filter {
526                let items: Vec<serde_json::Value> = pending
527                    .pending_queries()
528                    .iter()
529                    .map(|q| project_query(q, filter))
530                    .collect();
531                json["pending"] = serde_json::Value::Array(items);
532            }
533        }
534
535        json
536    }
537
538    /// Returns true if the session has been idle longer than `ttl`.
539    ///
540    /// Uses `saturating_duration_since` to avoid panics if the clock drifts
541    /// backwards (though this is extremely rare with monotonic clocks).
542    pub fn is_expired(&self, ttl: Duration) -> bool {
543        is_expired_impl(self.last_active, ttl)
544    }
545
546    /// Decompose the session into the parts needed by `driver_loop` (v2 path).
547    ///
548    /// Returns `(exec_task, llm_rx, vm_driver, metrics)`.  Observer/resp_txs/state
549    /// remain Session-internal and are not exposed.
550    ///
551    /// This accessor is `pub(crate)` so only the `execution` module (same crate)
552    /// can call it; the public API does not expose internal VM handles.
553    pub(crate) fn into_driver_parts(
554        self,
555    ) -> (
556        AsyncTask,
557        tokio::sync::mpsc::Receiver<LlmRequest>,
558        AsyncIsleDriver,
559        ExecutionMetrics,
560    ) {
561        (self.exec_task, self.llm_rx, self._vm_driver, self.metrics)
562    }
563}
564
565/// Core expiry check, extracted for testability.
566fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
567    std::time::Instant::now().saturating_duration_since(last_active) >= ttl
568}
569
570// ─── Registry ────────────────────────────────────────────────
571
572/// Manages active sessions.
573///
574/// # Locking design (lock **C**)
575///
576/// Uses `tokio::sync::Mutex` because `feed_response` holds the lock
577/// while calling `Session::feed_one()` (which itself acquires the
578/// per-session `std::sync::Mutex<SessionStatus>`, lock **A**). The lock
579/// ordering invariant is always **C → A** — no code path acquires A
580/// then C, so deadlock is structurally impossible.
581///
582/// `tokio::sync::Mutex` is chosen here (rather than `std::sync::Mutex`)
583/// because `feed_response` must take the session out of the map for
584/// the async `wait_event()` call. The two-phase pattern (lock → remove
585/// → unlock → await → lock → reinsert) requires an async-aware mutex
586/// to avoid holding the lock across the `wait_event().await`.
587///
588/// ## Contention
589///
590/// `list_snapshots()` (from `alc_status`) holds lock C while iterating
591/// all sessions. During this time, `feed_response` for any session is
592/// blocked. Given that snapshot iteration is O(n) with n = active
593/// sessions (typically 1–3) and each snapshot takes microseconds, this
594/// is acceptable. If session count grows significantly, consider
595/// switching to a concurrent map or per-session locks.
596///
597/// ## Interaction with lock A
598///
599/// `Session::snapshot()` (called under lock C in `list_snapshots`)
600/// acquires lock A via `ExecutionMetrics::snapshot()`. This is safe:
601/// - Lock order: C → A (consistent with `feed_response`)
602/// - Lock A hold time: microseconds (JSON field reads)
603/// - Lock A is per-session (no cross-session contention)
604pub struct SessionRegistry {
605    sessions: Arc<Mutex<HashMap<String, Session>>>,
606}
607
608impl Default for SessionRegistry {
609    fn default() -> Self {
610        Self::new()
611    }
612}
613
614impl SessionRegistry {
615    pub fn new() -> Self {
616        Self {
617            sessions: Arc::new(Mutex::new(HashMap::new())),
618        }
619    }
620
621    /// Start execution and wait for first event (pause or completion).
622    pub async fn start_execution(
623        &self,
624        mut session: Session,
625    ) -> Result<(String, FeedResult), SessionError> {
626        let session_id = gen_session_id();
627        let result = session.wait_event().await?;
628
629        if matches!(result, FeedResult::Paused { .. }) {
630            self.sessions
631                .lock()
632                .await
633                .insert(session_id.clone(), session);
634        }
635
636        Ok((session_id, result))
637    }
638
639    /// Feed one response to a paused session by query_id.
640    ///
641    /// If this completes all pending queries, the session resumes and
642    /// returns the next event (Paused or Finished).
643    /// If queries remain, returns Accepted { remaining }.
644    pub async fn feed_response(
645        &self,
646        session_id: &str,
647        query_id: &QueryId,
648        response: String,
649        usage: Option<&algocline_core::TokenUsage>,
650    ) -> Result<FeedResult, SessionError> {
651        // 1. Feed under lock
652        let complete = {
653            let mut map = self.sessions.lock().await;
654            let session = map
655                .get_mut(session_id)
656                .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
657
658            let complete = session.feed_one(query_id, response, usage)?;
659
660            if !complete {
661                return Ok(FeedResult::Accepted {
662                    remaining: session.state.remaining(),
663                });
664            }
665
666            complete
667        };
668
669        // 2. All complete → take session out for async resume
670        debug_assert!(complete);
671        let mut session = {
672            let mut map = self.sessions.lock().await;
673            map.remove(session_id)
674                .ok_or_else(|| SessionError::NotFound(session_id.into()))?
675        };
676
677        let result = session.wait_event().await?;
678
679        if matches!(result, FeedResult::Paused { .. }) {
680            self.sessions
681                .lock()
682                .await
683                .insert(session_id.into(), session);
684        }
685
686        Ok(result)
687    }
688
689    /// Resolve the sole pending query ID for a session.
690    ///
691    /// When `alc_continue` is called without an explicit `query_id`, this
692    /// method checks if exactly one query is pending and returns its ID.
693    /// Returns an error if zero or multiple queries are pending.
694    pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
695        let map = self.sessions.lock().await;
696        let session = map
697            .get(session_id)
698            .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
699        let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
700        match keys.len() {
701            0 => Err(SessionError::InvalidTransition("no pending queries".into())),
702            1 => keys
703                .into_iter()
704                .next()
705                .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
706            n => Err(SessionError::InvalidTransition(format!(
707                "{n} queries pending; specify query_id explicitly"
708            ))),
709        }
710    }
711
712    /// Snapshot all active sessions for external observation (alc_status).
713    ///
714    /// Returns a map of session_id → snapshot JSON. Only includes sessions
715    /// currently held in the registry (i.e. paused, awaiting responses).
716    /// Sessions that have completed are already removed from the registry.
717    ///
718    /// # Arguments
719    ///
720    /// - `pending_filter` — Forwarded verbatim to each session's [`Session::snapshot`].
721    /// - `include_history` — When `true`, each snapshot includes `conversation_history`
722    ///   (≤10 entries).  Pass `false` for high-frequency polling to avoid wire bloat.
723    ///
724    /// # Returns
725    ///
726    /// A `HashMap` mapping session IDs to their JSON snapshots.
727    pub async fn list_snapshots(
728        &self,
729        pending_filter: Option<&PendingFilter>,
730        include_history: bool,
731    ) -> HashMap<String, serde_json::Value> {
732        let map = self.sessions.lock().await;
733        map.iter()
734            .map(|(id, session)| {
735                (
736                    id.clone(),
737                    session.snapshot(pending_filter, include_history),
738                )
739            })
740            .collect()
741    }
742
743    /// Spawn a background GC task that reaps sessions idle longer than `ttl`.
744    ///
745    /// The task runs every 60 seconds. When the process exits, the task is
746    /// naturally terminated. No `JoinHandle` is retained — process exit is
747    /// sufficient for cleanup in MCP server deployments.
748    pub fn spawn_gc_task(&self, ttl: Duration) {
749        let sessions = Arc::clone(&self.sessions);
750        tokio::spawn(async move {
751            let mut interval = tokio::time::interval(Duration::from_secs(60));
752            loop {
753                interval.tick().await;
754                let mut map = sessions.lock().await;
755                let expired: Vec<String> = map
756                    .iter()
757                    .filter(|(_, s)| s.is_expired(ttl))
758                    .map(|(id, _)| id.clone())
759                    .collect();
760                for id in &expired {
761                    tracing::info!(session_id = %id, "GC: reaping expired session");
762                    map.remove(id);
763                }
764            }
765        });
766    }
767}
768
769/// Generate a non-deterministic session ID.
770///
771/// MCP spec requires "secure, non-deterministic session IDs" to prevent
772/// session hijacking. Uses timestamp + random bytes for uniqueness and
773/// unpredictability.
774///
775/// # `unwrap_or_default` on `duration_since(UNIX_EPOCH)`
776///
777/// `SystemTime::now().duration_since(UNIX_EPOCH)` can fail if the system
778/// clock is set before 1970-01-01 (e.g. NTP drift, misconfigured VM).
779/// The Rust std docs recommend `expect()` or `match` for explicit handling,
780/// but `expect` would panic in library code (prohibited by project policy).
781///
782/// `unwrap_or_default` returns `Duration::ZERO` on failure, yielding
783/// timestamp `0`. This is acceptable here because the 8-byte random
784/// suffix (16 hex chars of entropy) independently guarantees uniqueness
785/// and unpredictability — the timestamp is a convenience prefix, not
786/// a security-critical component.
787fn gen_session_id() -> String {
788    use rand::RngExt;
789    use std::time::{SystemTime, UNIX_EPOCH};
790    let ts = SystemTime::now()
791        .duration_since(UNIX_EPOCH)
792        .unwrap_or_default()
793        .as_nanos();
794    let random: u64 = rand::rng().random();
795    format!("s-{ts:x}-{random:016x}")
796}
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801    use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
802    use serde_json::json;
803
804    fn make_query(index: usize) -> LlmQuery {
805        LlmQuery {
806            id: QueryId::batch(index),
807            prompt: format!("prompt-{index}"),
808            system: None,
809            max_tokens: 100,
810            grounded: false,
811            underspecified: false,
812        }
813    }
814
815    // ─── FeedResult::to_json tests ───
816
817    #[test]
818    fn to_json_accepted() {
819        let result = FeedResult::Accepted { remaining: 3 };
820        let json = result.to_json("s-123");
821        assert_eq!(json["status"], "accepted");
822        assert_eq!(json["remaining"], 3);
823    }
824
825    #[test]
826    fn to_json_paused_single_query() {
827        let query = LlmQuery {
828            id: QueryId::single(),
829            prompt: "What is 2+2?".into(),
830            system: Some("You are a calculator.".into()),
831            max_tokens: 50,
832            grounded: false,
833            underspecified: false,
834        };
835        let result = FeedResult::Paused {
836            queries: vec![query],
837        };
838        let json = result.to_json("s-abc");
839
840        assert_eq!(json["status"], "needs_response");
841        assert_eq!(json["session_id"], "s-abc");
842        assert_eq!(json["prompt"], "What is 2+2?");
843        assert_eq!(json["system"], "You are a calculator.");
844        assert_eq!(json["max_tokens"], 50);
845        // single query mode: no "queries" array
846        assert!(json.get("queries").is_none());
847        // grounded=false must be absent
848        assert!(
849            json.get("grounded").is_none(),
850            "grounded key must be absent when false"
851        );
852        // underspecified=false must be absent
853        assert!(
854            json.get("underspecified").is_none(),
855            "underspecified key must be absent when false"
856        );
857    }
858
859    #[test]
860    fn to_json_paused_single_query_grounded() {
861        let query = LlmQuery {
862            id: QueryId::single(),
863            prompt: "verify this claim".into(),
864            system: None,
865            max_tokens: 200,
866            grounded: true,
867            underspecified: false,
868        };
869        let result = FeedResult::Paused {
870            queries: vec![query],
871        };
872        let json = result.to_json("s-grounded");
873
874        assert_eq!(json["status"], "needs_response");
875        assert_eq!(
876            json["grounded"], true,
877            "grounded must appear in single-query MCP JSON"
878        );
879    }
880
881    #[test]
882    fn to_json_paused_single_query_underspecified() {
883        let query = LlmQuery {
884            id: QueryId::single(),
885            prompt: "what output format do you need?".into(),
886            system: None,
887            max_tokens: 200,
888            grounded: false,
889            underspecified: true,
890        };
891        let result = FeedResult::Paused {
892            queries: vec![query],
893        };
894        let json = result.to_json("s-underspec");
895
896        assert_eq!(json["status"], "needs_response");
897        assert_eq!(
898            json["underspecified"], true,
899            "underspecified must appear in single-query MCP JSON"
900        );
901        assert!(
902            json.get("grounded").is_none(),
903            "grounded must be absent when false"
904        );
905    }
906
907    #[test]
908    fn to_json_paused_multiple_queries_mixed_grounded() {
909        let grounded_query = LlmQuery {
910            id: QueryId::batch(0),
911            prompt: "verify".into(),
912            system: None,
913            max_tokens: 100,
914            grounded: true,
915            underspecified: false,
916        };
917        let normal_query = LlmQuery {
918            id: QueryId::batch(1),
919            prompt: "generate".into(),
920            system: None,
921            max_tokens: 100,
922            grounded: false,
923            underspecified: false,
924        };
925        let result = FeedResult::Paused {
926            queries: vec![grounded_query, normal_query],
927        };
928        let json = result.to_json("s-batch");
929
930        let qs = json["queries"].as_array().expect("queries should be array");
931        assert_eq!(
932            qs[0]["grounded"], true,
933            "grounded query must have grounded=true"
934        );
935        assert!(
936            qs[1].get("grounded").is_none(),
937            "non-grounded query must omit grounded key"
938        );
939    }
940
941    #[test]
942    fn to_json_paused_multiple_queries_mixed_underspecified() {
943        let underspec_query = LlmQuery {
944            id: QueryId::batch(0),
945            prompt: "clarify intent".into(),
946            system: None,
947            max_tokens: 100,
948            grounded: false,
949            underspecified: true,
950        };
951        let normal_query = LlmQuery {
952            id: QueryId::batch(1),
953            prompt: "generate".into(),
954            system: None,
955            max_tokens: 100,
956            grounded: false,
957            underspecified: false,
958        };
959        let result = FeedResult::Paused {
960            queries: vec![underspec_query, normal_query],
961        };
962        let json = result.to_json("s-batch-us");
963
964        let qs = json["queries"].as_array().expect("queries should be array");
965        assert_eq!(
966            qs[0]["underspecified"], true,
967            "underspecified query must have underspecified=true"
968        );
969        assert!(
970            qs[1].get("underspecified").is_none(),
971            "non-underspecified query must omit underspecified key"
972        );
973    }
974
975    #[test]
976    fn to_json_paused_single_query_no_system() {
977        let query = LlmQuery {
978            id: QueryId::single(),
979            prompt: "hello".into(),
980            system: None,
981            max_tokens: 1024,
982            grounded: false,
983            underspecified: false,
984        };
985        let result = FeedResult::Paused {
986            queries: vec![query],
987        };
988        let json = result.to_json("s-x");
989
990        assert_eq!(json["status"], "needs_response");
991        assert!(json["system"].is_null());
992    }
993
994    #[test]
995    fn to_json_paused_multiple_queries() {
996        let queries = vec![make_query(0), make_query(1), make_query(2)];
997        let result = FeedResult::Paused { queries };
998        let json = result.to_json("s-multi");
999
1000        assert_eq!(json["status"], "needs_response");
1001        assert_eq!(json["session_id"], "s-multi");
1002
1003        let qs = json["queries"].as_array().expect("queries should be array");
1004        assert_eq!(qs.len(), 3);
1005        assert_eq!(qs[0]["id"], "q-0");
1006        assert_eq!(qs[0]["prompt"], "prompt-0");
1007        assert_eq!(qs[1]["id"], "q-1");
1008        assert_eq!(qs[2]["id"], "q-2");
1009    }
1010
1011    #[test]
1012    fn to_json_finished_completed() {
1013        let result = FeedResult::Finished(ExecutionResult {
1014            state: TerminalState::Completed {
1015                result: json!({"answer": 42}),
1016            },
1017            metrics: ExecutionMetrics::new(),
1018        });
1019        let json = result.to_json("s-done");
1020
1021        assert_eq!(json["status"], "completed");
1022        assert_eq!(json["result"]["answer"], 42);
1023        assert!(json.get("stats").is_some());
1024    }
1025
1026    #[test]
1027    fn to_json_finished_failed() {
1028        let result = FeedResult::Finished(ExecutionResult {
1029            state: TerminalState::Failed {
1030                error: "lua error: bad argument".into(),
1031            },
1032            metrics: ExecutionMetrics::new(),
1033        });
1034        let json = result.to_json("s-err");
1035
1036        assert_eq!(json["status"], "error");
1037        assert_eq!(json["error"], "lua error: bad argument");
1038    }
1039
1040    #[test]
1041    fn to_json_finished_cancelled() {
1042        let result = FeedResult::Finished(ExecutionResult {
1043            state: TerminalState::Cancelled,
1044            metrics: ExecutionMetrics::new(),
1045        });
1046        let json = result.to_json("s-cancel");
1047
1048        assert_eq!(json["status"], "cancelled");
1049        assert!(json.get("stats").is_some());
1050    }
1051
1052    // ─── gen_session_id tests ───
1053
1054    #[test]
1055    fn session_id_starts_with_prefix() {
1056        let id = gen_session_id();
1057        assert!(id.starts_with("s-"), "id should start with 's-': {id}");
1058    }
1059
1060    #[test]
1061    fn session_id_uniqueness() {
1062        let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
1063        let set: std::collections::HashSet<&String> = ids.iter().collect();
1064        assert_eq!(set.len(), 10, "10 IDs should all be unique");
1065    }
1066
1067    // ─── is_expired_impl tests ───
1068    //
1069    // Session::is_expired delegates to is_expired_impl. Testing the impl
1070    // directly avoids the need to construct a full Session (which requires
1071    // a real Lua VM + channels).
1072
1073    #[test]
1074    fn is_expired_impl_fresh_instant_not_expired() {
1075        // A just-created instant should not be expired with a non-zero TTL
1076        let now = std::time::Instant::now();
1077        assert!(!is_expired_impl(now, Duration::from_secs(1)));
1078    }
1079
1080    #[test]
1081    fn is_expired_impl_old_instant_expired() {
1082        // Simulate a session idle for 2 hours by backdating last_active
1083        let two_hours_ago = std::time::Instant::now()
1084            .checked_sub(Duration::from_secs(7200))
1085            .expect("checked_sub should succeed with sane duration");
1086        // TTL = 1 hour: should be expired
1087        assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
1088    }
1089
1090    #[test]
1091    fn is_expired_impl_not_yet_expired() {
1092        // Simulate a session idle for 1 hour
1093        let one_hour_ago = std::time::Instant::now()
1094            .checked_sub(Duration::from_secs(3600))
1095            .expect("checked_sub should succeed with sane duration");
1096        // TTL = 3 hours: should NOT be expired yet
1097        assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
1098    }
1099
1100    #[test]
1101    fn is_expired_impl_zero_ttl_always_expired() {
1102        // TTL = 0: any instant is immediately expired (edge case)
1103        let now = std::time::Instant::now();
1104        assert!(is_expired_impl(now, Duration::ZERO));
1105    }
1106
1107    // ─── PendingFilter preset tests ───
1108
1109    #[test]
1110    fn pending_filter_default_is_all_off() {
1111        let f = PendingFilter::default();
1112        assert!(!f.query_id);
1113        assert!(!f.max_tokens);
1114        assert!(!f.system);
1115        assert!(!f.grounded);
1116        assert!(!f.underspecified);
1117        assert!(matches!(f.prompt, PromptProjection::Off));
1118    }
1119
1120    #[test]
1121    fn pending_filter_preset_meta_flags() {
1122        let f = PendingFilter::preset_meta();
1123        assert!(f.query_id);
1124        assert!(f.max_tokens);
1125        assert!(!f.system);
1126        assert!(!f.grounded);
1127        assert!(!f.underspecified);
1128        assert!(
1129            matches!(f.prompt, PromptProjection::Off),
1130            "meta preset must not project prompt content"
1131        );
1132    }
1133
1134    #[test]
1135    fn pending_filter_preset_preview_uses_default_chars() {
1136        let f = PendingFilter::preset_preview();
1137        assert!(f.query_id);
1138        assert!(f.max_tokens);
1139        match f.prompt {
1140            PromptProjection::Preview { chars } => {
1141                assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1142            }
1143            other => panic!("expected Preview, got {other:?}"),
1144        }
1145    }
1146
1147    #[test]
1148    fn pending_filter_preset_preview_with_custom_chars() {
1149        let f = PendingFilter::preset_preview_with(42);
1150        match f.prompt {
1151            PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1152            other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1153        }
1154    }
1155
1156    #[test]
1157    fn pending_filter_preset_full_flags_all_on() {
1158        let f = PendingFilter::preset_full();
1159        assert!(f.query_id);
1160        assert!(f.max_tokens);
1161        assert!(f.system);
1162        assert!(f.grounded);
1163        assert!(f.underspecified);
1164        assert!(matches!(f.prompt, PromptProjection::Full));
1165    }
1166
1167    #[test]
1168    fn pending_filter_from_preset_known_names() {
1169        assert!(PendingFilter::from_preset("meta").is_some());
1170        assert!(PendingFilter::from_preset("preview").is_some());
1171        assert!(PendingFilter::from_preset("full").is_some());
1172    }
1173
1174    #[test]
1175    fn pending_filter_from_preset_unknown_returns_none() {
1176        // Typo-protection invariant: caller must surface an error, not
1177        // silently fall back to a default projection.
1178        assert!(PendingFilter::from_preset("").is_none());
1179        assert!(PendingFilter::from_preset("META").is_none());
1180        assert!(PendingFilter::from_preset("bogus").is_none());
1181    }
1182
1183    #[test]
1184    fn pending_filter_from_preset_with_overrides_preview_chars() {
1185        // "preview" respects the per-call chars count (flowed in from env
1186        // or config); other presets ignore it.
1187        let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1188        match f.prompt {
1189            PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1190            other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1191        }
1192
1193        let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1194        assert!(matches!(f_meta.prompt, PromptProjection::Off));
1195
1196        let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1197        assert!(matches!(f_full.prompt, PromptProjection::Full));
1198    }
1199
1200    // ─── project_query tests ───
1201
1202    #[test]
1203    fn project_query_default_filter_produces_empty_object() {
1204        let q = make_query(0);
1205        let v = project_query(&q, &PendingFilter::default());
1206        let obj = v.as_object().expect("object");
1207        assert!(obj.is_empty(), "default filter should project nothing");
1208    }
1209
1210    #[test]
1211    fn project_query_meta_preset_has_id_and_max_tokens_only() {
1212        let q = make_query(0);
1213        let v = project_query(&q, &PendingFilter::preset_meta());
1214        let obj = v.as_object().expect("object");
1215        assert_eq!(obj.len(), 2);
1216        assert_eq!(v["query_id"], "q-0");
1217        assert_eq!(v["max_tokens"], 100);
1218        assert!(obj.get("prompt").is_none());
1219        assert!(obj.get("prompt_preview").is_none());
1220        assert!(obj.get("system").is_none());
1221        assert!(obj.get("grounded").is_none());
1222        assert!(obj.get("underspecified").is_none());
1223    }
1224
1225    #[test]
1226    fn project_query_full_preset_has_all_fields() {
1227        let q = LlmQuery {
1228            id: QueryId::batch(0),
1229            prompt: "hi".into(),
1230            system: Some("sys".into()),
1231            max_tokens: 100,
1232            grounded: true,
1233            underspecified: true,
1234        };
1235        let v = project_query(&q, &PendingFilter::preset_full());
1236        assert_eq!(v["query_id"], "q-0");
1237        assert_eq!(v["max_tokens"], 100);
1238        assert_eq!(v["system"], "sys");
1239        assert_eq!(v["grounded"], true);
1240        assert_eq!(v["underspecified"], true);
1241        assert_eq!(v["prompt"], "hi");
1242        assert!(v.get("prompt_preview").is_none());
1243    }
1244
1245    #[test]
1246    fn project_query_preview_truncates_at_char_count() {
1247        let q = LlmQuery {
1248            id: QueryId::batch(0),
1249            prompt: "abcdefghij".into(),
1250            system: None,
1251            max_tokens: 10,
1252            grounded: false,
1253            underspecified: false,
1254        };
1255        let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1256        assert_eq!(v["prompt_preview"], "abcde");
1257        assert!(v.get("prompt").is_none());
1258    }
1259
1260    #[test]
1261    fn project_query_preview_utf8_multibyte_safe() {
1262        // Japanese characters are 3-byte UTF-8 each; chars().take(N) must
1263        // never split a codepoint. Taking 3 chars from 5 must yield exactly
1264        // 3 chars (not bytes), and the String must be valid UTF-8.
1265        let prompt = "あいうえお";
1266        let q = LlmQuery {
1267            id: QueryId::batch(0),
1268            prompt: prompt.to_string(),
1269            system: None,
1270            max_tokens: 10,
1271            grounded: false,
1272            underspecified: false,
1273        };
1274        let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1275        let preview = v["prompt_preview"].as_str().expect("str");
1276        assert_eq!(preview, "あいう");
1277        assert_eq!(preview.chars().count(), 3);
1278    }
1279
1280    #[test]
1281    fn project_query_preview_chars_over_length_returns_whole_prompt() {
1282        let q = LlmQuery {
1283            id: QueryId::batch(0),
1284            prompt: "abc".into(),
1285            system: None,
1286            max_tokens: 10,
1287            grounded: false,
1288            underspecified: false,
1289        };
1290        let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1291        assert_eq!(v["prompt_preview"], "abc");
1292    }
1293
1294    #[test]
1295    fn project_query_system_field_null_when_absent() {
1296        let q = LlmQuery {
1297            id: QueryId::batch(0),
1298            prompt: "p".into(),
1299            system: None,
1300            max_tokens: 10,
1301            grounded: false,
1302            underspecified: false,
1303        };
1304        let filter = PendingFilter {
1305            system: true,
1306            ..Default::default()
1307        };
1308        let v = project_query(&q, &filter);
1309        assert!(
1310            v["system"].is_null(),
1311            "absent system must serialize as null"
1312        );
1313    }
1314
1315    // ─── PendingFilter deserialization (MCP custom object path) ───
1316
1317    #[test]
1318    fn pending_filter_deserialize_custom_object_preview() {
1319        // MCP callers may pass a raw JSON filter rather than a preset name.
1320        let raw = serde_json::json!({
1321            "query_id": true,
1322            "prompt": { "mode": "preview", "chars": 50 }
1323        });
1324        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1325        assert!(f.query_id);
1326        match f.prompt {
1327            PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1328            other => panic!("expected Preview, got {other:?}"),
1329        }
1330    }
1331
1332    #[test]
1333    fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1334        // serde(default) on every field means a `{}` object is valid and
1335        // equivalent to PendingFilter::default().
1336        let raw = serde_json::json!({});
1337        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1338        assert!(!f.query_id);
1339        assert!(matches!(f.prompt, PromptProjection::Off));
1340    }
1341
1342    #[test]
1343    fn pending_filter_deserialize_prompt_full_tag() {
1344        let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1345        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1346        assert!(matches!(f.prompt, PromptProjection::Full));
1347    }
1348
1349    // ─── Session snapshot v2 fields tests ───
1350    //
1351    // These tests use the Executor to create real sessions so that Session
1352    // struct fields (started_at_ms, last_activity_ms, phase) are exercised
1353    // end-to-end without requiring direct construction of AsyncTask/AsyncIsleDriver.
1354
1355    /// Helper: build a minimal temp directory pair for state/card stores.
1356    fn tmp_dirs() -> (
1357        std::sync::Arc<crate::state::JsonFileStore>,
1358        std::sync::Arc<crate::card::FileCardStore>,
1359        std::path::PathBuf,
1360    ) {
1361        let tmp = tempfile::tempdir().expect("test tempdir");
1362        let root = tmp.path().to_path_buf();
1363        std::mem::forget(tmp);
1364        (
1365            std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
1366            std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
1367            root.join("scenarios"),
1368        )
1369    }
1370
1371    // T1: Session snapshot contains phase, started_at, last_activity_at
1372    // A session that completes immediately should have these fields in its snapshot
1373    // while it is Running (before completion removes it from the registry).
1374    //
1375    // Strategy: start a session with a Lua script that calls alc.llm() to pause.
1376    // The session will be in Paused state in the registry, allowing snapshot().
1377    #[tokio::test]
1378    async fn snapshot_v2_contains_phase_and_timestamps() {
1379        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1380        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1381
1382        // Lua: pause the session with a single alc.llm() call
1383        let code = r#"
1384            local response = alc.llm("what is 2+2?")
1385            return response
1386        "#
1387        .to_string();
1388
1389        let session = executor
1390            .start_session(
1391                code,
1392                serde_json::json!({}),
1393                vec![],
1394                vec![],
1395                state_store,
1396                card_store,
1397                scenarios_dir,
1398            )
1399            .await
1400            .unwrap();
1401
1402        // While in Running state (before first event), snapshot should have new fields.
1403        // Note: session.snapshot() is called before wait_event() so state is Running.
1404        let snap = session.snapshot(None, false);
1405
1406        // phase is present
1407        assert!(
1408            snap.get("phase").is_some(),
1409            "snapshot must have 'phase' field"
1410        );
1411        assert_eq!(snap["phase"], "running", "initial state must be running");
1412
1413        // state key retained for backward compatibility
1414        assert_eq!(snap["state"], "running");
1415
1416        // started_at is a positive i64 (unix ms)
1417        let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
1418        assert!(started_at > 0, "started_at must be > 0 (unix ms)");
1419
1420        // last_activity_at starts equal to started_at
1421        let last_activity = snap["last_activity_at"]
1422            .as_i64()
1423            .expect("last_activity_at must be i64");
1424        assert_eq!(
1425            started_at, last_activity,
1426            "last_activity_at should equal started_at before any feed"
1427        );
1428    }
1429
1430    // T1: phase correctly maps 5 ExecutionState variants
1431    // We test Running via snapshot before wait_event (already done above).
1432    // Here we verify the phase string matches the ExecutionState literal.
1433    #[test]
1434    fn snapshot_phase_running_state_label() {
1435        // We can't construct Session directly in tests (AsyncTask is crate-private).
1436        // Instead verify the phase mapping logic through the match expression.
1437        // This test documents the expected mapping:
1438        let cases: &[(&str, &str)] = &[
1439            ("running", "running"),
1440            ("paused", "paused"),
1441            ("completed", "completed"),
1442            ("failed", "failed"),
1443            ("cancelled", "cancelled"),
1444        ];
1445        for (state_str, expected_phase) in cases {
1446            // The phase mapping is identical to state_str in the 5-value case,
1447            // and the 3-value state uses "terminal" for completed/failed/cancelled.
1448            // Verify that the 3-value state mapping is consistent with expectations.
1449            let three_value_state = match *state_str {
1450                "running" => "running",
1451                "paused" => "paused",
1452                _ => "terminal",
1453            };
1454            // phase must equal state_str (5-value) while state uses 3-value.
1455            assert_eq!(
1456                *expected_phase, *state_str,
1457                "phase for {state_str} must be the same string"
1458            );
1459            if *state_str != "running" && *state_str != "paused" {
1460                assert_eq!(
1461                    three_value_state, "terminal",
1462                    "{state_str} must map to 'terminal' in 3-value state"
1463                );
1464            }
1465        }
1466    }
1467
1468    // T1: snapshot(false) lacks conversation_history; snapshot(true) includes it
1469    #[tokio::test]
1470    async fn snapshot_conversation_history_opt_in() {
1471        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1472        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1473
1474        let code = r#"
1475            local response = alc.llm("explain recursion")
1476            return response
1477        "#
1478        .to_string();
1479
1480        let session = executor
1481            .start_session(
1482                code,
1483                serde_json::json!({}),
1484                vec![],
1485                vec![],
1486                state_store,
1487                card_store,
1488                scenarios_dir,
1489            )
1490            .await
1491            .unwrap();
1492
1493        // Before any LLM interaction, conversation_history is absent in both modes.
1494        let snap_false = session.snapshot(None, false);
1495        assert!(
1496            snap_false
1497                .get("metrics")
1498                .and_then(|m| m.get("conversation_history"))
1499                .is_none(),
1500            "conversation_history must be absent with include_history=false"
1501        );
1502
1503        // include_history=true: conversation_history key must exist (empty array at start).
1504        let snap_true = session.snapshot(None, true);
1505        // metrics is present; conversation_history may be empty array (no LLM calls yet)
1506        // but the key must be present.
1507        if let Some(metrics) = snap_true.get("metrics") {
1508            // If there are no transcript entries yet, conversation_history may be
1509            // absent or empty — depending on metrics implementation.
1510            // Either way, no panic. The key's presence is tested in metrics tests.
1511            let _ = metrics.get("conversation_history");
1512        }
1513    }
1514
1515    // T2: last_activity_ms starts equal to started_at_ms (edge case: no feeds yet)
1516    #[tokio::test]
1517    async fn snapshot_last_activity_at_starts_equal_to_started_at() {
1518        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1519        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1520
1521        let code = r#"
1522            local response = alc.llm("test query")
1523            return response
1524        "#
1525        .to_string();
1526
1527        let session = executor
1528            .start_session(
1529                code,
1530                serde_json::json!({}),
1531                vec![],
1532                vec![],
1533                state_store,
1534                card_store,
1535                scenarios_dir,
1536            )
1537            .await
1538            .unwrap();
1539
1540        let snap = session.snapshot(None, false);
1541        let started_at = snap["started_at"].as_i64().unwrap_or(-1);
1542        let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
1543
1544        assert_eq!(
1545            started_at, last_activity,
1546            "last_activity_at must equal started_at before any feed_one"
1547        );
1548        assert!(started_at > 0, "started_at must be positive unix ms");
1549    }
1550}