Skip to main content

algocline_engine/
session.rs

1//! Session-based Lua execution with pause/resume on alc.llm() calls.
2//!
3//! Runtime layer: ties Domain (ExecutionState) and Metrics (ExecutionMetrics)
4//! together with channel-based Lua pause/resume machinery.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use algocline_core::{
12    ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
13    TerminalState,
14};
15use mlua_isle::{AsyncIsleDriver, AsyncTask};
16use serde_json::json;
17use tokio::sync::Mutex;
18
19use crate::llm_bridge::LlmRequest;
20
21// ─── Error types (Runtime layer) ─────────────────────────────
22
23#[derive(Debug, thiserror::Error)]
24pub enum SessionError {
25    #[error("session '{0}' not found")]
26    NotFound(String),
27    #[error(transparent)]
28    Feed(#[from] algocline_core::FeedError),
29    #[error("invalid transition: {0}")]
30    InvalidTransition(String),
31}
32
33// ─── Result types (Runtime layer) ────────────────────────────
34
35/// Session completion data: terminal state + metrics.
36#[derive(serde::Serialize)]
37pub struct ExecutionResult {
38    pub state: TerminalState,
39    pub metrics: ExecutionMetrics,
40}
41
42/// Result of a session interaction (start or feed).
43#[derive(serde::Serialize)]
44pub enum FeedResult {
45    /// Partial feed accepted, still waiting for more responses.
46    Accepted { remaining: usize },
47    /// All queries answered, Lua re-paused with new queries.
48    Paused { queries: Vec<LlmQuery> },
49    /// Execution completed (success, failure, or cancellation).
50    Finished(ExecutionResult),
51}
52
53impl FeedResult {
54    /// Convert to JSON for MCP tool response.
55    pub fn to_json(&self, session_id: &str) -> serde_json::Value {
56        match self {
57            Self::Accepted { remaining } => json!({
58                "status": "accepted",
59                "remaining": remaining,
60            }),
61            Self::Paused { queries } => {
62                if queries.len() == 1 {
63                    let q = &queries[0];
64                    let mut obj = json!({
65                        "status": "needs_response",
66                        "session_id": session_id,
67                        "query_id": q.id.as_str(),
68                        "prompt": q.prompt,
69                        "system": q.system,
70                        "max_tokens": q.max_tokens,
71                    });
72                    if q.grounded {
73                        obj["grounded"] = json!(true);
74                    }
75                    if q.underspecified {
76                        obj["underspecified"] = json!(true);
77                    }
78                    obj
79                } else {
80                    let qs: Vec<_> = queries
81                        .iter()
82                        .map(|q| {
83                            let mut obj = json!({
84                                "id": q.id.as_str(),
85                                "prompt": q.prompt,
86                                "system": q.system,
87                                "max_tokens": q.max_tokens,
88                            });
89                            if q.grounded {
90                                obj["grounded"] = json!(true);
91                            }
92                            if q.underspecified {
93                                obj["underspecified"] = json!(true);
94                            }
95                            obj
96                        })
97                        .collect();
98                    json!({
99                        "status": "needs_response",
100                        "session_id": session_id,
101                        "queries": qs,
102                    })
103                }
104            }
105            Self::Finished(result) => match &result.state {
106                TerminalState::Completed { result: val } => json!({
107                    "status": "completed",
108                    "result": val,
109                    "stats": result.metrics.to_json(),
110                }),
111                TerminalState::Failed { error } => json!({
112                    "status": "error",
113                    "error": error,
114                }),
115                TerminalState::Cancelled => json!({
116                    "status": "cancelled",
117                    "stats": result.metrics.to_json(),
118                }),
119            },
120        }
121    }
122}
123
124// ─── PendingFilter (field-level filter for Session::snapshot) ────
125
126/// Default preview length (chars) used when `PendingFilter::preset_preview()`
127/// is constructed without an explicit length. Env var
128/// `ALC_PROMPT_PREVIEW_CHARS` (resolved in `AppConfig`) overrides this.
129pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
130
131/// Per-field filter controlling which `LlmQuery` attributes are projected
132/// into a Snapshot's `pending` array.
133///
134/// Adding a new field to `LlmQuery` only requires adding one matching
135/// `bool` here — the shape stays stable so API surface does not grow
136/// enum variants for every new attribute.
137#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
138pub struct PendingFilter {
139    #[serde(default)]
140    pub query_id: bool,
141    #[serde(default)]
142    pub max_tokens: bool,
143    #[serde(default)]
144    pub system: bool,
145    #[serde(default)]
146    pub grounded: bool,
147    #[serde(default)]
148    pub underspecified: bool,
149    #[serde(default)]
150    pub prompt: PromptProjection,
151}
152
153/// Prompt projection mode — 3 states rather than a bool so that truncation
154/// length can travel inside the filter object.
155///
156/// JSON tag is `mode`: `{"mode":"off"}` / `{"mode":"preview","chars":200}` /
157/// `{"mode":"full"}`.
158#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
159#[serde(tag = "mode", rename_all = "snake_case")]
160pub enum PromptProjection {
161    #[default]
162    Off,
163    Preview {
164        chars: usize,
165    },
166    Full,
167}
168
169impl PendingFilter {
170    /// Preset: query identification only (`query_id` + `max_tokens`).
171    pub fn preset_meta() -> Self {
172        Self {
173            query_id: true,
174            max_tokens: true,
175            ..Self::default()
176        }
177    }
178
179    /// Preset: meta + first N chars of the prompt. Uses the hard default
180    /// for N (`DEFAULT_PROMPT_PREVIEW_CHARS`).
181    pub fn preset_preview() -> Self {
182        Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
183    }
184
185    /// Preset: meta + first `chars` chars of the prompt. Lets callers
186    /// flow a config-resolved length (e.g. env var) into the preset.
187    pub fn preset_preview_with(chars: usize) -> Self {
188        Self {
189            query_id: true,
190            max_tokens: true,
191            prompt: PromptProjection::Preview { chars },
192            ..Self::default()
193        }
194    }
195
196    /// Preset: every field including the full prompt (debug use).
197    pub fn preset_full() -> Self {
198        Self {
199            query_id: true,
200            max_tokens: true,
201            system: true,
202            grounded: true,
203            underspecified: true,
204            prompt: PromptProjection::Full,
205        }
206    }
207
208    /// Resolve a preset by name. Unknown names return `None` so that
209    /// callers can surface a typed error rather than silently falling
210    /// back to a default projection.
211    pub fn from_preset(name: &str) -> Option<Self> {
212        match name {
213            "meta" => Some(Self::preset_meta()),
214            "preview" => Some(Self::preset_preview()),
215            "full" => Some(Self::preset_full()),
216            _ => None,
217        }
218    }
219
220    /// Same as [`Self::from_preset`] but lets `"preview"` pick up a
221    /// caller-supplied char count (config / env override).
222    pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
223        match name {
224            "meta" => Some(Self::preset_meta()),
225            "preview" => Some(Self::preset_preview_with(preview_chars)),
226            "full" => Some(Self::preset_full()),
227            _ => None,
228        }
229    }
230}
231
232/// Project a single `LlmQuery` into the JSON object requested by `filter`.
233///
234/// UTF-8 safety: `PromptProjection::Preview { chars }` uses `chars().take(N)`
235/// so the cut never splits a multi-byte code point.
236fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
237    let mut obj = serde_json::Map::new();
238    if f.query_id {
239        obj.insert("query_id".into(), q.id.as_str().into());
240    }
241    if f.max_tokens {
242        obj.insert("max_tokens".into(), q.max_tokens.into());
243    }
244    if f.system {
245        obj.insert(
246            "system".into(),
247            match &q.system {
248                Some(s) => serde_json::Value::String(s.clone()),
249                None => serde_json::Value::Null,
250            },
251        );
252    }
253    if f.grounded {
254        obj.insert("grounded".into(), q.grounded.into());
255    }
256    if f.underspecified {
257        obj.insert("underspecified".into(), q.underspecified.into());
258    }
259    match &f.prompt {
260        PromptProjection::Off => {}
261        PromptProjection::Full => {
262            obj.insert("prompt".into(), q.prompt.clone().into());
263        }
264        PromptProjection::Preview { chars } => {
265            let preview: String = q.prompt.chars().take(*chars).collect();
266            obj.insert("prompt_preview".into(), preview.into());
267        }
268    }
269    serde_json::Value::Object(obj)
270}
271
272// ─── Session ─────────────────────────────────────────────────
273
274/// A Lua execution session with domain state tracking.
275///
276/// Each session owns a dedicated Lua VM via `_vm_driver`. The VM's OS thread
277/// stays alive as long as the driver is held, and exits cleanly when the
278/// session is dropped (channel closes → Lua thread drains and exits).
279pub struct Session {
280    state: ExecutionState,
281    metrics: ExecutionMetrics,
282    observer: MetricsObserver,
283    llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
284    exec_task: AsyncTask,
285    /// QueryId → resp_tx. Populated on Paused, cleared on resume.
286    resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
287    /// Per-session VM lifecycle driver. Keeps the Lua thread alive.
288    /// Dropped when the session completes or is abandoned.
289    _vm_driver: AsyncIsleDriver,
290    /// Last activity timestamp (monotonic). Updated on creation and each feed_one().
291    /// Used by GC to identify idle sessions for cleanup.
292    last_active: std::time::Instant,
293    /// Wall-clock Unix ms when the session was created (immutable after Session::new).
294    started_at_ms: i64,
295    /// Wall-clock Unix ms of the most recent activity (feed_one or session creation).
296    /// Updated with `Relaxed` ordering — observability use only, no cross-thread invariant.
297    last_activity_ms: Arc<AtomicI64>,
298}
299
300impl Session {
301    /// Create a new session.
302    ///
303    /// # Arguments
304    ///
305    /// - `llm_rx` — Receiver for LLM requests from the Lua bridge.
306    /// - `exec_task` — The coroutine execution task handle.
307    /// - `metrics` — Session metrics (owns the LogSink ring buffer; the bridge
308    ///   reads its `log_sink_handle()` separately to wire `print()` / `alc.log()`
309    ///   into the same ring buffer that `metrics.snapshot()` exposes via
310    ///   `recent_logs` in `alc_status`).
311    /// - `vm_driver` — Keeps the Lua OS thread alive.
312    ///
313    /// # Returns
314    ///
315    /// A new `Session` in the `Running` state.
316    pub fn new(
317        llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
318        exec_task: AsyncTask,
319        metrics: ExecutionMetrics,
320        vm_driver: AsyncIsleDriver,
321    ) -> Self {
322        let observer = metrics.create_observer();
323        // Note: duration_since can only fail if the wall clock predates UNIX_EPOCH
324        // (broken system clock). Saturating to 0 is harmless for observability.
325        let started_at_ms = SystemTime::now()
326            .duration_since(UNIX_EPOCH)
327            .unwrap_or_default()
328            .as_millis() as i64;
329        Self {
330            state: ExecutionState::Running,
331            metrics,
332            observer,
333            llm_rx,
334            exec_task,
335            resp_txs: HashMap::new(),
336            _vm_driver: vm_driver,
337            last_active: std::time::Instant::now(),
338            started_at_ms,
339            last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
340        }
341    }
342
343    /// Wait for the next event from Lua execution.
344    ///
345    /// Called after initial start or after feeding all responses.
346    /// State must be Running when called.
347    async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
348        tokio::select! {
349            result = &mut self.exec_task => {
350                match result {
351                    Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
352                        Ok(v) => {
353                            self.state.complete(v.clone()).map_err(|e| {
354                                SessionError::InvalidTransition(e.to_string())
355                            })?;
356                            self.observer.on_completed(&v);
357                            Ok(FeedResult::Finished(ExecutionResult {
358                                state: TerminalState::Completed { result: v },
359                                metrics: self.take_metrics(),
360                            }))
361                        }
362                        Err(e) => self.fail_with(format!("JSON parse: {e}")),
363                    },
364                    Err(e) => self.fail_with(e.to_string()),
365                }
366            }
367            Some(req) = self.llm_rx.recv() => {
368                let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
369                    id: qr.id.clone(),
370                    prompt: qr.prompt.clone(),
371                    system: qr.system.clone(),
372                    max_tokens: qr.max_tokens,
373                    grounded: qr.grounded,
374                    underspecified: qr.underspecified,
375                }).collect();
376
377                for qr in req.queries {
378                    self.resp_txs.insert(qr.id, qr.resp_tx);
379                }
380
381                self.state.pause(queries.clone()).map_err(|e| {
382                    SessionError::InvalidTransition(e.to_string())
383                })?;
384                self.observer.on_paused(&queries);
385                Ok(FeedResult::Paused { queries })
386            }
387        }
388    }
389
390    /// Feed one response by query_id.
391    ///
392    /// # Arguments
393    ///
394    /// - `query_id` — The query to respond to.
395    /// - `response` — The LLM response string.
396    /// - `usage` — Optional token usage from the host.
397    ///
398    /// # Returns
399    ///
400    /// `Ok(true)` if all queries are now complete; `Ok(false)` if more responses remain.
401    ///
402    /// # Errors
403    ///
404    /// Returns `SessionError::Feed` if the state machine rejects the feed.
405    fn feed_one(
406        &mut self,
407        query_id: &QueryId,
408        response: String,
409        usage: Option<&algocline_core::TokenUsage>,
410    ) -> Result<bool, SessionError> {
411        // Update both monotonic and wall-clock activity timestamps on each feed.
412        self.last_active = std::time::Instant::now();
413        // Note: duration_since can only fail if wall clock predates UNIX_EPOCH.
414        // Saturating to 0 is harmless for observability.
415        let now_ms = SystemTime::now()
416            .duration_since(UNIX_EPOCH)
417            .unwrap_or_default()
418            .as_millis() as i64;
419        self.last_activity_ms.store(now_ms, Ordering::Relaxed);
420
421        // Track response before ownership transfer.
422        self.observer.on_response_fed(query_id, &response, usage);
423
424        // Runtime: send response to Lua thread (unblocks resp_rx.recv())
425        if let Some(tx) = self.resp_txs.remove(query_id) {
426            let _ = tx.send(Ok(response.clone()));
427        }
428
429        // Domain: record in state machine
430        let complete = self
431            .state
432            .feed(query_id, response)
433            .map_err(SessionError::Feed)?;
434
435        if complete {
436            // Domain: transition Paused(complete) → Running
437            self.state
438                .take_responses()
439                .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
440            self.observer.on_resumed();
441        } else {
442            self.observer
443                .on_partial_feed(query_id, self.state.remaining());
444        }
445
446        Ok(complete)
447    }
448
449    fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
450        self.state
451            .fail(msg.clone())
452            .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
453        self.observer.on_failed(&msg);
454        Ok(FeedResult::Finished(ExecutionResult {
455            state: TerminalState::Failed { error: msg },
456            metrics: self.take_metrics(),
457        }))
458    }
459
460    fn take_metrics(&mut self) -> ExecutionMetrics {
461        std::mem::take(&mut self.metrics)
462    }
463
464    /// Lightweight snapshot for external observation (alc_status).
465    ///
466    /// Returns session state label and running metrics without consuming
467    /// or modifying the session.
468    ///
469    /// # Arguments
470    ///
471    /// - `pending_filter` — Opt-in projection for the currently pending LLM queries.
472    ///   `None` emits only `pending_queries: N` (integer count), preserving the v0.x
473    ///   wire shape for light-weight polling.  `Some(filter)` adds a `pending: [...]`
474    ///   array projected through the filter's field flags.
475    /// - `include_history` — When `true`, `conversation_history` (≤10 entries) is
476    ///   included in the metrics output.  When `false` (default), the key is absent.
477    ///   High-frequency polling callers should leave this `false` to avoid wire bloat.
478    ///
479    /// # Returns
480    ///
481    /// A `serde_json::Value` snapshot with the following additive fields beyond v0.x:
482    /// - `phase` — 5-value string derived from `ExecutionState`:
483    ///   `"running"`, `"paused"`, `"completed"`, `"failed"`, `"cancelled"`.
484    ///   The existing `state` key is retained for backward compatibility (3-value).
485    /// - `started_at` — Unix millisecond timestamp when the session was created.
486    /// - `last_activity_at` — Unix millisecond timestamp of the most recent feed_one.
487    ///   Note: `started_at` and `last_activity_at` are wall-clock values while
488    ///   expiry GC uses the monotonic `last_active` Instant; they may skew slightly
489    ///   on NTP adjustments (acceptable for observability use).
490    pub fn snapshot(
491        &self,
492        pending_filter: Option<&PendingFilter>,
493        include_history: bool,
494    ) -> serde_json::Value {
495        let state_label = match &self.state {
496            ExecutionState::Running => "running",
497            ExecutionState::Paused(_) => "paused",
498            _ => "terminal",
499        };
500
501        let phase = match &self.state {
502            ExecutionState::Running => "running",
503            ExecutionState::Paused(_) => "paused",
504            ExecutionState::Completed { .. } => "completed",
505            ExecutionState::Failed { .. } => "failed",
506            ExecutionState::Cancelled => "cancelled",
507        };
508
509        let mut json = serde_json::json!({
510            "state": state_label,
511            "phase": phase,
512            "started_at": self.started_at_ms,
513            "last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
514        });
515
516        let metrics = self.metrics.snapshot(include_history);
517        if !metrics.is_null() {
518            json["metrics"] = metrics;
519        }
520
521        // Pending query projection (additive; count is always present)
522        if let ExecutionState::Paused(pending) = &self.state {
523            json["pending_queries"] = pending.remaining().into();
524
525            if let Some(filter) = pending_filter {
526                let items: Vec<serde_json::Value> = pending
527                    .pending_queries()
528                    .iter()
529                    .map(|q| project_query(q, filter))
530                    .collect();
531                json["pending"] = serde_json::Value::Array(items);
532            }
533        }
534
535        json
536    }
537
538    /// Returns true if the session has been idle longer than `ttl`.
539    ///
540    /// Uses `saturating_duration_since` to avoid panics if the clock drifts
541    /// backwards (though this is extremely rare with monotonic clocks).
542    pub fn is_expired(&self, ttl: Duration) -> bool {
543        is_expired_impl(self.last_active, ttl)
544    }
545
546    /// Decompose the session into the parts needed by `driver_loop` (v2 path).
547    ///
548    /// Returns `(exec_task, llm_rx, vm_driver, metrics)`.  Observer/resp_txs/state
549    /// remain Session-internal and are not exposed.
550    ///
551    /// This accessor is `pub(crate)` so only the `execution` module (same crate)
552    /// can call it; the public API does not expose internal VM handles.
553    pub(crate) fn into_driver_parts(
554        self,
555    ) -> (
556        AsyncTask,
557        tokio::sync::mpsc::Receiver<LlmRequest>,
558        AsyncIsleDriver,
559        ExecutionMetrics,
560    ) {
561        (self.exec_task, self.llm_rx, self._vm_driver, self.metrics)
562    }
563}
564
565/// Core expiry check, extracted for testability.
566fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
567    std::time::Instant::now().saturating_duration_since(last_active) >= ttl
568}
569
570// ─── Registry ────────────────────────────────────────────────
571
572/// Manages active sessions.
573///
574/// # Locking design (lock **C**)
575///
576/// Uses `tokio::sync::Mutex` because `feed_response` holds the lock
577/// while calling `Session::feed_one()` (which itself acquires the
578/// per-session `std::sync::Mutex<SessionStatus>`, lock **A**). The lock
579/// ordering invariant is always **C → A** — no code path acquires A
580/// then C, so deadlock is structurally impossible.
581///
582/// `tokio::sync::Mutex` is chosen here (rather than `std::sync::Mutex`)
583/// because `feed_response` must take the session out of the map for
584/// the async `wait_event()` call. The two-phase pattern (lock → remove
585/// → unlock → await → lock → reinsert) requires an async-aware mutex
586/// to avoid holding the lock across the `wait_event().await`.
587///
588/// ## Contention
589///
590/// `list_snapshots()` (from `alc_status`) holds lock C while iterating
591/// all sessions. During this time, `feed_response` for any session is
592/// blocked. Given that snapshot iteration is O(n) with n = active
593/// sessions (typically 1–3) and each snapshot takes microseconds, this
594/// is acceptable. If session count grows significantly, consider
595/// switching to a concurrent map or per-session locks.
596///
597/// ## Interaction with lock A
598///
599/// `Session::snapshot()` (called under lock C in `list_snapshots`)
600/// acquires lock A via `ExecutionMetrics::snapshot()`. This is safe:
601/// - Lock order: C → A (consistent with `feed_response`)
602/// - Lock A hold time: microseconds (JSON field reads)
603/// - Lock A is per-session (no cross-session contention)
604pub struct SessionRegistry {
605    sessions: Arc<Mutex<HashMap<String, Session>>>,
606}
607
608impl Default for SessionRegistry {
609    fn default() -> Self {
610        Self::new()
611    }
612}
613
614impl SessionRegistry {
615    pub fn new() -> Self {
616        Self {
617            sessions: Arc::new(Mutex::new(HashMap::new())),
618        }
619    }
620
621    /// Start execution and wait for first event (pause or completion).
622    pub async fn start_execution(
623        &self,
624        mut session: Session,
625    ) -> Result<(String, FeedResult), SessionError> {
626        let session_id = gen_session_id();
627        let result = session.wait_event().await?;
628
629        if matches!(result, FeedResult::Paused { .. }) {
630            self.sessions
631                .lock()
632                .await
633                .insert(session_id.clone(), session);
634        }
635
636        Ok((session_id, result))
637    }
638
639    /// Feed one response to a paused session by query_id.
640    ///
641    /// If this completes all pending queries, the session resumes and
642    /// returns the next event (Paused or Finished).
643    /// If queries remain, returns Accepted { remaining }.
644    pub async fn feed_response(
645        &self,
646        session_id: &str,
647        query_id: &QueryId,
648        response: String,
649        usage: Option<&algocline_core::TokenUsage>,
650    ) -> Result<FeedResult, SessionError> {
651        // 1. Feed under lock
652        let complete = {
653            let mut map = self.sessions.lock().await;
654            let session = map
655                .get_mut(session_id)
656                .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
657
658            let complete = session.feed_one(query_id, response, usage)?;
659
660            if !complete {
661                return Ok(FeedResult::Accepted {
662                    remaining: session.state.remaining(),
663                });
664            }
665
666            complete
667        };
668
669        // 2. All complete → take session out for async resume
670        debug_assert!(complete);
671        let mut session = {
672            let mut map = self.sessions.lock().await;
673            map.remove(session_id)
674                .ok_or_else(|| SessionError::NotFound(session_id.into()))?
675        };
676
677        let result = session.wait_event().await?;
678
679        if matches!(result, FeedResult::Paused { .. }) {
680            self.sessions
681                .lock()
682                .await
683                .insert(session_id.into(), session);
684        }
685
686        Ok(result)
687    }
688
689    /// Resolve the sole pending query ID for a session.
690    ///
691    /// When `alc_continue` is called without an explicit `query_id`, this
692    /// method checks if exactly one query is pending and returns its ID.
693    /// Returns an error if zero or multiple queries are pending.
694    pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
695        let map = self.sessions.lock().await;
696        let session = map
697            .get(session_id)
698            .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
699        let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
700        match keys.len() {
701            0 => Err(SessionError::InvalidTransition("no pending queries".into())),
702            1 => keys
703                .into_iter()
704                .next()
705                .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
706            n => Err(SessionError::InvalidTransition(format!(
707                "{n} queries pending; specify query_id explicitly"
708            ))),
709        }
710    }
711
712    /// Snapshot all active sessions for external observation (alc_status).
713    ///
714    /// Returns a map of session_id → snapshot JSON. Only includes sessions
715    /// currently held in the registry (i.e. paused, awaiting responses).
716    /// Sessions that have completed are already removed from the registry.
717    ///
718    /// # Arguments
719    ///
720    /// - `pending_filter` — Forwarded verbatim to each session's [`Session::snapshot`].
721    /// - `include_history` — When `true`, each snapshot includes `conversation_history`
722    ///   (≤10 entries).  Pass `false` for high-frequency polling to avoid wire bloat.
723    ///
724    /// # Returns
725    ///
726    /// A `HashMap` mapping session IDs to their JSON snapshots.
727    pub async fn list_snapshots(
728        &self,
729        pending_filter: Option<&PendingFilter>,
730        include_history: bool,
731    ) -> HashMap<String, serde_json::Value> {
732        let map = self.sessions.lock().await;
733        map.iter()
734            .map(|(id, session)| {
735                (
736                    id.clone(),
737                    session.snapshot(pending_filter, include_history),
738                )
739            })
740            .collect()
741    }
742
743    /// Spawn a background GC task that reaps sessions idle longer than `ttl`.
744    ///
745    /// The task runs every 60 seconds. When the process exits, the task is
746    /// naturally terminated. No `JoinHandle` is retained — process exit is
747    /// sufficient for cleanup in MCP server deployments.
748    pub fn spawn_gc_task(&self, ttl: Duration) {
749        let sessions = Arc::clone(&self.sessions);
750        tokio::spawn(async move {
751            let mut interval = tokio::time::interval(Duration::from_secs(60));
752            loop {
753                interval.tick().await;
754                let mut map = sessions.lock().await;
755                let expired: Vec<String> = map
756                    .iter()
757                    .filter(|(_, s)| s.is_expired(ttl))
758                    .map(|(id, _)| id.clone())
759                    .collect();
760                for id in &expired {
761                    tracing::info!(session_id = %id, "GC: reaping expired session");
762                    map.remove(id);
763                }
764            }
765        });
766    }
767}
768
769/// Generate a non-deterministic session ID.
770///
771/// MCP spec requires "secure, non-deterministic session IDs" to prevent
772/// session hijacking. Uses timestamp + random bytes for uniqueness and
773/// unpredictability.
774///
775/// # `unwrap_or_default` on `duration_since(UNIX_EPOCH)`
776///
777/// `SystemTime::now().duration_since(UNIX_EPOCH)` can fail if the system
778/// clock is set before 1970-01-01 (e.g. NTP drift, misconfigured VM).
779/// The Rust std docs recommend `expect()` or `match` for explicit handling,
780/// but `expect` would panic in library code (prohibited by project policy).
781///
782/// `unwrap_or_default` returns `Duration::ZERO` on failure, yielding
783/// timestamp `0`. This is acceptable here because the 8-byte random
784/// suffix (16 hex chars of entropy) independently guarantees uniqueness
785/// and unpredictability — the timestamp is a convenience prefix, not
786/// a security-critical component.
787fn gen_session_id() -> String {
788    use std::time::{SystemTime, UNIX_EPOCH};
789    let ts = SystemTime::now()
790        .duration_since(UNIX_EPOCH)
791        .unwrap_or_default()
792        .as_nanos();
793    // 8 random bytes → 16 hex chars of entropy
794    let random: u64 = {
795        use std::collections::hash_map::RandomState;
796        use std::hash::{BuildHasher, Hasher};
797        let s = RandomState::new();
798        let mut h = s.build_hasher();
799        h.write_u128(ts);
800        h.finish()
801    };
802    format!("s-{ts:x}-{random:016x}")
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808    use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
809    use serde_json::json;
810
811    fn make_query(index: usize) -> LlmQuery {
812        LlmQuery {
813            id: QueryId::batch(index),
814            prompt: format!("prompt-{index}"),
815            system: None,
816            max_tokens: 100,
817            grounded: false,
818            underspecified: false,
819        }
820    }
821
822    // ─── FeedResult::to_json tests ───
823
824    #[test]
825    fn to_json_accepted() {
826        let result = FeedResult::Accepted { remaining: 3 };
827        let json = result.to_json("s-123");
828        assert_eq!(json["status"], "accepted");
829        assert_eq!(json["remaining"], 3);
830    }
831
832    #[test]
833    fn to_json_paused_single_query() {
834        let query = LlmQuery {
835            id: QueryId::single(),
836            prompt: "What is 2+2?".into(),
837            system: Some("You are a calculator.".into()),
838            max_tokens: 50,
839            grounded: false,
840            underspecified: false,
841        };
842        let result = FeedResult::Paused {
843            queries: vec![query],
844        };
845        let json = result.to_json("s-abc");
846
847        assert_eq!(json["status"], "needs_response");
848        assert_eq!(json["session_id"], "s-abc");
849        assert_eq!(json["prompt"], "What is 2+2?");
850        assert_eq!(json["system"], "You are a calculator.");
851        assert_eq!(json["max_tokens"], 50);
852        // single query mode: no "queries" array
853        assert!(json.get("queries").is_none());
854        // grounded=false must be absent
855        assert!(
856            json.get("grounded").is_none(),
857            "grounded key must be absent when false"
858        );
859        // underspecified=false must be absent
860        assert!(
861            json.get("underspecified").is_none(),
862            "underspecified key must be absent when false"
863        );
864    }
865
866    #[test]
867    fn to_json_paused_single_query_grounded() {
868        let query = LlmQuery {
869            id: QueryId::single(),
870            prompt: "verify this claim".into(),
871            system: None,
872            max_tokens: 200,
873            grounded: true,
874            underspecified: false,
875        };
876        let result = FeedResult::Paused {
877            queries: vec![query],
878        };
879        let json = result.to_json("s-grounded");
880
881        assert_eq!(json["status"], "needs_response");
882        assert_eq!(
883            json["grounded"], true,
884            "grounded must appear in single-query MCP JSON"
885        );
886    }
887
888    #[test]
889    fn to_json_paused_single_query_underspecified() {
890        let query = LlmQuery {
891            id: QueryId::single(),
892            prompt: "what output format do you need?".into(),
893            system: None,
894            max_tokens: 200,
895            grounded: false,
896            underspecified: true,
897        };
898        let result = FeedResult::Paused {
899            queries: vec![query],
900        };
901        let json = result.to_json("s-underspec");
902
903        assert_eq!(json["status"], "needs_response");
904        assert_eq!(
905            json["underspecified"], true,
906            "underspecified must appear in single-query MCP JSON"
907        );
908        assert!(
909            json.get("grounded").is_none(),
910            "grounded must be absent when false"
911        );
912    }
913
914    #[test]
915    fn to_json_paused_multiple_queries_mixed_grounded() {
916        let grounded_query = LlmQuery {
917            id: QueryId::batch(0),
918            prompt: "verify".into(),
919            system: None,
920            max_tokens: 100,
921            grounded: true,
922            underspecified: false,
923        };
924        let normal_query = LlmQuery {
925            id: QueryId::batch(1),
926            prompt: "generate".into(),
927            system: None,
928            max_tokens: 100,
929            grounded: false,
930            underspecified: false,
931        };
932        let result = FeedResult::Paused {
933            queries: vec![grounded_query, normal_query],
934        };
935        let json = result.to_json("s-batch");
936
937        let qs = json["queries"].as_array().expect("queries should be array");
938        assert_eq!(
939            qs[0]["grounded"], true,
940            "grounded query must have grounded=true"
941        );
942        assert!(
943            qs[1].get("grounded").is_none(),
944            "non-grounded query must omit grounded key"
945        );
946    }
947
948    #[test]
949    fn to_json_paused_multiple_queries_mixed_underspecified() {
950        let underspec_query = LlmQuery {
951            id: QueryId::batch(0),
952            prompt: "clarify intent".into(),
953            system: None,
954            max_tokens: 100,
955            grounded: false,
956            underspecified: true,
957        };
958        let normal_query = LlmQuery {
959            id: QueryId::batch(1),
960            prompt: "generate".into(),
961            system: None,
962            max_tokens: 100,
963            grounded: false,
964            underspecified: false,
965        };
966        let result = FeedResult::Paused {
967            queries: vec![underspec_query, normal_query],
968        };
969        let json = result.to_json("s-batch-us");
970
971        let qs = json["queries"].as_array().expect("queries should be array");
972        assert_eq!(
973            qs[0]["underspecified"], true,
974            "underspecified query must have underspecified=true"
975        );
976        assert!(
977            qs[1].get("underspecified").is_none(),
978            "non-underspecified query must omit underspecified key"
979        );
980    }
981
982    #[test]
983    fn to_json_paused_single_query_no_system() {
984        let query = LlmQuery {
985            id: QueryId::single(),
986            prompt: "hello".into(),
987            system: None,
988            max_tokens: 1024,
989            grounded: false,
990            underspecified: false,
991        };
992        let result = FeedResult::Paused {
993            queries: vec![query],
994        };
995        let json = result.to_json("s-x");
996
997        assert_eq!(json["status"], "needs_response");
998        assert!(json["system"].is_null());
999    }
1000
1001    #[test]
1002    fn to_json_paused_multiple_queries() {
1003        let queries = vec![make_query(0), make_query(1), make_query(2)];
1004        let result = FeedResult::Paused { queries };
1005        let json = result.to_json("s-multi");
1006
1007        assert_eq!(json["status"], "needs_response");
1008        assert_eq!(json["session_id"], "s-multi");
1009
1010        let qs = json["queries"].as_array().expect("queries should be array");
1011        assert_eq!(qs.len(), 3);
1012        assert_eq!(qs[0]["id"], "q-0");
1013        assert_eq!(qs[0]["prompt"], "prompt-0");
1014        assert_eq!(qs[1]["id"], "q-1");
1015        assert_eq!(qs[2]["id"], "q-2");
1016    }
1017
1018    #[test]
1019    fn to_json_finished_completed() {
1020        let result = FeedResult::Finished(ExecutionResult {
1021            state: TerminalState::Completed {
1022                result: json!({"answer": 42}),
1023            },
1024            metrics: ExecutionMetrics::new(),
1025        });
1026        let json = result.to_json("s-done");
1027
1028        assert_eq!(json["status"], "completed");
1029        assert_eq!(json["result"]["answer"], 42);
1030        assert!(json.get("stats").is_some());
1031    }
1032
1033    #[test]
1034    fn to_json_finished_failed() {
1035        let result = FeedResult::Finished(ExecutionResult {
1036            state: TerminalState::Failed {
1037                error: "lua error: bad argument".into(),
1038            },
1039            metrics: ExecutionMetrics::new(),
1040        });
1041        let json = result.to_json("s-err");
1042
1043        assert_eq!(json["status"], "error");
1044        assert_eq!(json["error"], "lua error: bad argument");
1045    }
1046
1047    #[test]
1048    fn to_json_finished_cancelled() {
1049        let result = FeedResult::Finished(ExecutionResult {
1050            state: TerminalState::Cancelled,
1051            metrics: ExecutionMetrics::new(),
1052        });
1053        let json = result.to_json("s-cancel");
1054
1055        assert_eq!(json["status"], "cancelled");
1056        assert!(json.get("stats").is_some());
1057    }
1058
1059    // ─── gen_session_id tests ───
1060
1061    #[test]
1062    fn session_id_starts_with_prefix() {
1063        let id = gen_session_id();
1064        assert!(id.starts_with("s-"), "id should start with 's-': {id}");
1065    }
1066
1067    #[test]
1068    fn session_id_uniqueness() {
1069        let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
1070        let set: std::collections::HashSet<&String> = ids.iter().collect();
1071        assert_eq!(set.len(), 10, "10 IDs should all be unique");
1072    }
1073
1074    // ─── is_expired_impl tests ───
1075    //
1076    // Session::is_expired delegates to is_expired_impl. Testing the impl
1077    // directly avoids the need to construct a full Session (which requires
1078    // a real Lua VM + channels).
1079
1080    #[test]
1081    fn is_expired_impl_fresh_instant_not_expired() {
1082        // A just-created instant should not be expired with a non-zero TTL
1083        let now = std::time::Instant::now();
1084        assert!(!is_expired_impl(now, Duration::from_secs(1)));
1085    }
1086
1087    #[test]
1088    fn is_expired_impl_old_instant_expired() {
1089        // Simulate a session idle for 2 hours by backdating last_active
1090        let two_hours_ago = std::time::Instant::now()
1091            .checked_sub(Duration::from_secs(7200))
1092            .expect("checked_sub should succeed with sane duration");
1093        // TTL = 1 hour: should be expired
1094        assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
1095    }
1096
1097    #[test]
1098    fn is_expired_impl_not_yet_expired() {
1099        // Simulate a session idle for 1 hour
1100        let one_hour_ago = std::time::Instant::now()
1101            .checked_sub(Duration::from_secs(3600))
1102            .expect("checked_sub should succeed with sane duration");
1103        // TTL = 3 hours: should NOT be expired yet
1104        assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
1105    }
1106
1107    #[test]
1108    fn is_expired_impl_zero_ttl_always_expired() {
1109        // TTL = 0: any instant is immediately expired (edge case)
1110        let now = std::time::Instant::now();
1111        assert!(is_expired_impl(now, Duration::ZERO));
1112    }
1113
1114    // ─── PendingFilter preset tests ───
1115
1116    #[test]
1117    fn pending_filter_default_is_all_off() {
1118        let f = PendingFilter::default();
1119        assert!(!f.query_id);
1120        assert!(!f.max_tokens);
1121        assert!(!f.system);
1122        assert!(!f.grounded);
1123        assert!(!f.underspecified);
1124        assert!(matches!(f.prompt, PromptProjection::Off));
1125    }
1126
1127    #[test]
1128    fn pending_filter_preset_meta_flags() {
1129        let f = PendingFilter::preset_meta();
1130        assert!(f.query_id);
1131        assert!(f.max_tokens);
1132        assert!(!f.system);
1133        assert!(!f.grounded);
1134        assert!(!f.underspecified);
1135        assert!(
1136            matches!(f.prompt, PromptProjection::Off),
1137            "meta preset must not project prompt content"
1138        );
1139    }
1140
1141    #[test]
1142    fn pending_filter_preset_preview_uses_default_chars() {
1143        let f = PendingFilter::preset_preview();
1144        assert!(f.query_id);
1145        assert!(f.max_tokens);
1146        match f.prompt {
1147            PromptProjection::Preview { chars } => {
1148                assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1149            }
1150            other => panic!("expected Preview, got {other:?}"),
1151        }
1152    }
1153
1154    #[test]
1155    fn pending_filter_preset_preview_with_custom_chars() {
1156        let f = PendingFilter::preset_preview_with(42);
1157        match f.prompt {
1158            PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1159            other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1160        }
1161    }
1162
1163    #[test]
1164    fn pending_filter_preset_full_flags_all_on() {
1165        let f = PendingFilter::preset_full();
1166        assert!(f.query_id);
1167        assert!(f.max_tokens);
1168        assert!(f.system);
1169        assert!(f.grounded);
1170        assert!(f.underspecified);
1171        assert!(matches!(f.prompt, PromptProjection::Full));
1172    }
1173
1174    #[test]
1175    fn pending_filter_from_preset_known_names() {
1176        assert!(PendingFilter::from_preset("meta").is_some());
1177        assert!(PendingFilter::from_preset("preview").is_some());
1178        assert!(PendingFilter::from_preset("full").is_some());
1179    }
1180
1181    #[test]
1182    fn pending_filter_from_preset_unknown_returns_none() {
1183        // Typo-protection invariant: caller must surface an error, not
1184        // silently fall back to a default projection.
1185        assert!(PendingFilter::from_preset("").is_none());
1186        assert!(PendingFilter::from_preset("META").is_none());
1187        assert!(PendingFilter::from_preset("bogus").is_none());
1188    }
1189
1190    #[test]
1191    fn pending_filter_from_preset_with_overrides_preview_chars() {
1192        // "preview" respects the per-call chars count (flowed in from env
1193        // or config); other presets ignore it.
1194        let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1195        match f.prompt {
1196            PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1197            other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1198        }
1199
1200        let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1201        assert!(matches!(f_meta.prompt, PromptProjection::Off));
1202
1203        let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1204        assert!(matches!(f_full.prompt, PromptProjection::Full));
1205    }
1206
1207    // ─── project_query tests ───
1208
1209    #[test]
1210    fn project_query_default_filter_produces_empty_object() {
1211        let q = make_query(0);
1212        let v = project_query(&q, &PendingFilter::default());
1213        let obj = v.as_object().expect("object");
1214        assert!(obj.is_empty(), "default filter should project nothing");
1215    }
1216
1217    #[test]
1218    fn project_query_meta_preset_has_id_and_max_tokens_only() {
1219        let q = make_query(0);
1220        let v = project_query(&q, &PendingFilter::preset_meta());
1221        let obj = v.as_object().expect("object");
1222        assert_eq!(obj.len(), 2);
1223        assert_eq!(v["query_id"], "q-0");
1224        assert_eq!(v["max_tokens"], 100);
1225        assert!(obj.get("prompt").is_none());
1226        assert!(obj.get("prompt_preview").is_none());
1227        assert!(obj.get("system").is_none());
1228        assert!(obj.get("grounded").is_none());
1229        assert!(obj.get("underspecified").is_none());
1230    }
1231
1232    #[test]
1233    fn project_query_full_preset_has_all_fields() {
1234        let q = LlmQuery {
1235            id: QueryId::batch(0),
1236            prompt: "hi".into(),
1237            system: Some("sys".into()),
1238            max_tokens: 100,
1239            grounded: true,
1240            underspecified: true,
1241        };
1242        let v = project_query(&q, &PendingFilter::preset_full());
1243        assert_eq!(v["query_id"], "q-0");
1244        assert_eq!(v["max_tokens"], 100);
1245        assert_eq!(v["system"], "sys");
1246        assert_eq!(v["grounded"], true);
1247        assert_eq!(v["underspecified"], true);
1248        assert_eq!(v["prompt"], "hi");
1249        assert!(v.get("prompt_preview").is_none());
1250    }
1251
1252    #[test]
1253    fn project_query_preview_truncates_at_char_count() {
1254        let q = LlmQuery {
1255            id: QueryId::batch(0),
1256            prompt: "abcdefghij".into(),
1257            system: None,
1258            max_tokens: 10,
1259            grounded: false,
1260            underspecified: false,
1261        };
1262        let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1263        assert_eq!(v["prompt_preview"], "abcde");
1264        assert!(v.get("prompt").is_none());
1265    }
1266
1267    #[test]
1268    fn project_query_preview_utf8_multibyte_safe() {
1269        // Japanese characters are 3-byte UTF-8 each; chars().take(N) must
1270        // never split a codepoint. Taking 3 chars from 5 must yield exactly
1271        // 3 chars (not bytes), and the String must be valid UTF-8.
1272        let prompt = "あいうえお";
1273        let q = LlmQuery {
1274            id: QueryId::batch(0),
1275            prompt: prompt.to_string(),
1276            system: None,
1277            max_tokens: 10,
1278            grounded: false,
1279            underspecified: false,
1280        };
1281        let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1282        let preview = v["prompt_preview"].as_str().expect("str");
1283        assert_eq!(preview, "あいう");
1284        assert_eq!(preview.chars().count(), 3);
1285    }
1286
1287    #[test]
1288    fn project_query_preview_chars_over_length_returns_whole_prompt() {
1289        let q = LlmQuery {
1290            id: QueryId::batch(0),
1291            prompt: "abc".into(),
1292            system: None,
1293            max_tokens: 10,
1294            grounded: false,
1295            underspecified: false,
1296        };
1297        let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1298        assert_eq!(v["prompt_preview"], "abc");
1299    }
1300
1301    #[test]
1302    fn project_query_system_field_null_when_absent() {
1303        let q = LlmQuery {
1304            id: QueryId::batch(0),
1305            prompt: "p".into(),
1306            system: None,
1307            max_tokens: 10,
1308            grounded: false,
1309            underspecified: false,
1310        };
1311        let filter = PendingFilter {
1312            system: true,
1313            ..Default::default()
1314        };
1315        let v = project_query(&q, &filter);
1316        assert!(
1317            v["system"].is_null(),
1318            "absent system must serialize as null"
1319        );
1320    }
1321
1322    // ─── PendingFilter deserialization (MCP custom object path) ───
1323
1324    #[test]
1325    fn pending_filter_deserialize_custom_object_preview() {
1326        // MCP callers may pass a raw JSON filter rather than a preset name.
1327        let raw = serde_json::json!({
1328            "query_id": true,
1329            "prompt": { "mode": "preview", "chars": 50 }
1330        });
1331        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1332        assert!(f.query_id);
1333        match f.prompt {
1334            PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1335            other => panic!("expected Preview, got {other:?}"),
1336        }
1337    }
1338
1339    #[test]
1340    fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1341        // serde(default) on every field means a `{}` object is valid and
1342        // equivalent to PendingFilter::default().
1343        let raw = serde_json::json!({});
1344        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1345        assert!(!f.query_id);
1346        assert!(matches!(f.prompt, PromptProjection::Off));
1347    }
1348
1349    #[test]
1350    fn pending_filter_deserialize_prompt_full_tag() {
1351        let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1352        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1353        assert!(matches!(f.prompt, PromptProjection::Full));
1354    }
1355
1356    // ─── Session snapshot v2 fields tests ───
1357    //
1358    // These tests use the Executor to create real sessions so that Session
1359    // struct fields (started_at_ms, last_activity_ms, phase) are exercised
1360    // end-to-end without requiring direct construction of AsyncTask/AsyncIsleDriver.
1361
1362    /// Helper: build a minimal temp directory pair for state/card stores.
1363    fn tmp_dirs() -> (
1364        std::sync::Arc<crate::state::JsonFileStore>,
1365        std::sync::Arc<crate::card::FileCardStore>,
1366        std::path::PathBuf,
1367    ) {
1368        let tmp = tempfile::tempdir().expect("test tempdir");
1369        let root = tmp.path().to_path_buf();
1370        std::mem::forget(tmp);
1371        (
1372            std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
1373            std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
1374            root.join("scenarios"),
1375        )
1376    }
1377
1378    // T1: Session snapshot contains phase, started_at, last_activity_at
1379    // A session that completes immediately should have these fields in its snapshot
1380    // while it is Running (before completion removes it from the registry).
1381    //
1382    // Strategy: start a session with a Lua script that calls alc.llm() to pause.
1383    // The session will be in Paused state in the registry, allowing snapshot().
1384    #[tokio::test]
1385    async fn snapshot_v2_contains_phase_and_timestamps() {
1386        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1387        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1388
1389        // Lua: pause the session with a single alc.llm() call
1390        let code = r#"
1391            local response = alc.llm("what is 2+2?")
1392            return response
1393        "#
1394        .to_string();
1395
1396        let session = executor
1397            .start_session(
1398                code,
1399                serde_json::json!({}),
1400                vec![],
1401                vec![],
1402                state_store,
1403                card_store,
1404                scenarios_dir,
1405            )
1406            .await
1407            .unwrap();
1408
1409        // While in Running state (before first event), snapshot should have new fields.
1410        // Note: session.snapshot() is called before wait_event() so state is Running.
1411        let snap = session.snapshot(None, false);
1412
1413        // phase is present
1414        assert!(
1415            snap.get("phase").is_some(),
1416            "snapshot must have 'phase' field"
1417        );
1418        assert_eq!(snap["phase"], "running", "initial state must be running");
1419
1420        // state key retained for backward compatibility
1421        assert_eq!(snap["state"], "running");
1422
1423        // started_at is a positive i64 (unix ms)
1424        let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
1425        assert!(started_at > 0, "started_at must be > 0 (unix ms)");
1426
1427        // last_activity_at starts equal to started_at
1428        let last_activity = snap["last_activity_at"]
1429            .as_i64()
1430            .expect("last_activity_at must be i64");
1431        assert_eq!(
1432            started_at, last_activity,
1433            "last_activity_at should equal started_at before any feed"
1434        );
1435    }
1436
1437    // T1: phase correctly maps 5 ExecutionState variants
1438    // We test Running via snapshot before wait_event (already done above).
1439    // Here we verify the phase string matches the ExecutionState literal.
1440    #[test]
1441    fn snapshot_phase_running_state_label() {
1442        // We can't construct Session directly in tests (AsyncTask is crate-private).
1443        // Instead verify the phase mapping logic through the match expression.
1444        // This test documents the expected mapping:
1445        let cases: &[(&str, &str)] = &[
1446            ("running", "running"),
1447            ("paused", "paused"),
1448            ("completed", "completed"),
1449            ("failed", "failed"),
1450            ("cancelled", "cancelled"),
1451        ];
1452        for (state_str, expected_phase) in cases {
1453            // The phase mapping is identical to state_str in the 5-value case,
1454            // and the 3-value state uses "terminal" for completed/failed/cancelled.
1455            // Verify that the 3-value state mapping is consistent with expectations.
1456            let three_value_state = match *state_str {
1457                "running" => "running",
1458                "paused" => "paused",
1459                _ => "terminal",
1460            };
1461            // phase must equal state_str (5-value) while state uses 3-value.
1462            assert_eq!(
1463                *expected_phase, *state_str,
1464                "phase for {state_str} must be the same string"
1465            );
1466            if *state_str != "running" && *state_str != "paused" {
1467                assert_eq!(
1468                    three_value_state, "terminal",
1469                    "{state_str} must map to 'terminal' in 3-value state"
1470                );
1471            }
1472        }
1473    }
1474
1475    // T1: snapshot(false) lacks conversation_history; snapshot(true) includes it
1476    #[tokio::test]
1477    async fn snapshot_conversation_history_opt_in() {
1478        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1479        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1480
1481        let code = r#"
1482            local response = alc.llm("explain recursion")
1483            return response
1484        "#
1485        .to_string();
1486
1487        let session = executor
1488            .start_session(
1489                code,
1490                serde_json::json!({}),
1491                vec![],
1492                vec![],
1493                state_store,
1494                card_store,
1495                scenarios_dir,
1496            )
1497            .await
1498            .unwrap();
1499
1500        // Before any LLM interaction, conversation_history is absent in both modes.
1501        let snap_false = session.snapshot(None, false);
1502        assert!(
1503            snap_false
1504                .get("metrics")
1505                .and_then(|m| m.get("conversation_history"))
1506                .is_none(),
1507            "conversation_history must be absent with include_history=false"
1508        );
1509
1510        // include_history=true: conversation_history key must exist (empty array at start).
1511        let snap_true = session.snapshot(None, true);
1512        // metrics is present; conversation_history may be empty array (no LLM calls yet)
1513        // but the key must be present.
1514        if let Some(metrics) = snap_true.get("metrics") {
1515            // If there are no transcript entries yet, conversation_history may be
1516            // absent or empty — depending on metrics implementation.
1517            // Either way, no panic. The key's presence is tested in metrics tests.
1518            let _ = metrics.get("conversation_history");
1519        }
1520    }
1521
1522    // T2: last_activity_ms starts equal to started_at_ms (edge case: no feeds yet)
1523    #[tokio::test]
1524    async fn snapshot_last_activity_at_starts_equal_to_started_at() {
1525        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1526        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1527
1528        let code = r#"
1529            local response = alc.llm("test query")
1530            return response
1531        "#
1532        .to_string();
1533
1534        let session = executor
1535            .start_session(
1536                code,
1537                serde_json::json!({}),
1538                vec![],
1539                vec![],
1540                state_store,
1541                card_store,
1542                scenarios_dir,
1543            )
1544            .await
1545            .unwrap();
1546
1547        let snap = session.snapshot(None, false);
1548        let started_at = snap["started_at"].as_i64().unwrap_or(-1);
1549        let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
1550
1551        assert_eq!(
1552            started_at, last_activity,
1553            "last_activity_at must equal started_at before any feed_one"
1554        );
1555        assert!(started_at > 0, "started_at must be positive unix ms");
1556    }
1557}