Skip to main content

algocline_engine/
session.rs

1//! Session-based Lua execution with pause/resume on alc.llm() calls.
2//!
3//! Runtime layer: ties Domain (ExecutionState) and Metrics (ExecutionMetrics)
4//! together with channel-based Lua pause/resume machinery.
5
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicI64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use algocline_core::{
12    ExecutionMetrics, ExecutionObserver, ExecutionState, LlmQuery, MetricsObserver, QueryId,
13    TerminalState,
14};
15use mlua_isle::{AsyncIsleDriver, AsyncTask};
16use serde_json::json;
17use tokio::sync::Mutex;
18
19use crate::llm_bridge::LlmRequest;
20
21// ─── Error types (Runtime layer) ─────────────────────────────
22
23#[derive(Debug, thiserror::Error)]
24pub enum SessionError {
25    #[error("session '{0}' not found")]
26    NotFound(String),
27    #[error(transparent)]
28    Feed(#[from] algocline_core::FeedError),
29    #[error("invalid transition: {0}")]
30    InvalidTransition(String),
31}
32
33// ─── Result types (Runtime layer) ────────────────────────────
34
35/// Session completion data: terminal state + metrics.
36#[derive(serde::Serialize)]
37pub struct ExecutionResult {
38    pub state: TerminalState,
39    pub metrics: ExecutionMetrics,
40}
41
42/// Result of a session interaction (start or feed).
43#[derive(serde::Serialize)]
44pub enum FeedResult {
45    /// Partial feed accepted, still waiting for more responses.
46    Accepted { remaining: usize },
47    /// All queries answered, Lua re-paused with new queries.
48    Paused { queries: Vec<LlmQuery> },
49    /// Execution completed (success, failure, or cancellation).
50    Finished(ExecutionResult),
51}
52
53impl FeedResult {
54    /// Convert to JSON for MCP tool response.
55    pub fn to_json(&self, session_id: &str) -> serde_json::Value {
56        match self {
57            Self::Accepted { remaining } => json!({
58                "status": "accepted",
59                "remaining": remaining,
60            }),
61            Self::Paused { queries } => {
62                if queries.len() == 1 {
63                    let q = &queries[0];
64                    let mut obj = json!({
65                        "status": "needs_response",
66                        "session_id": session_id,
67                        "query_id": q.id.as_str(),
68                        "prompt": q.prompt,
69                        "system": q.system,
70                        "max_tokens": q.max_tokens,
71                    });
72                    if q.grounded {
73                        obj["grounded"] = json!(true);
74                    }
75                    if q.underspecified {
76                        obj["underspecified"] = json!(true);
77                    }
78                    obj
79                } else {
80                    let qs: Vec<_> = queries
81                        .iter()
82                        .map(|q| {
83                            let mut obj = json!({
84                                "id": q.id.as_str(),
85                                "prompt": q.prompt,
86                                "system": q.system,
87                                "max_tokens": q.max_tokens,
88                            });
89                            if q.grounded {
90                                obj["grounded"] = json!(true);
91                            }
92                            if q.underspecified {
93                                obj["underspecified"] = json!(true);
94                            }
95                            obj
96                        })
97                        .collect();
98                    json!({
99                        "status": "needs_response",
100                        "session_id": session_id,
101                        "queries": qs,
102                    })
103                }
104            }
105            Self::Finished(result) => match &result.state {
106                TerminalState::Completed { result: val } => json!({
107                    "status": "completed",
108                    "result": val,
109                    "stats": result.metrics.to_json(),
110                }),
111                TerminalState::Failed { error } => json!({
112                    "status": "error",
113                    "error": error,
114                }),
115                TerminalState::Cancelled => json!({
116                    "status": "cancelled",
117                    "stats": result.metrics.to_json(),
118                }),
119            },
120        }
121    }
122}
123
124// ─── PendingFilter (field-level filter for Session::snapshot) ────
125
126/// Default preview length (chars) used when `PendingFilter::preset_preview()`
127/// is constructed without an explicit length. Env var
128/// `ALC_PROMPT_PREVIEW_CHARS` (resolved in `AppConfig`) overrides this.
129pub const DEFAULT_PROMPT_PREVIEW_CHARS: usize = 200;
130
131/// Per-field filter controlling which `LlmQuery` attributes are projected
132/// into a Snapshot's `pending` array.
133///
134/// Adding a new field to `LlmQuery` only requires adding one matching
135/// `bool` here — the shape stays stable so API surface does not grow
136/// enum variants for every new attribute.
137#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
138pub struct PendingFilter {
139    #[serde(default)]
140    pub query_id: bool,
141    #[serde(default)]
142    pub max_tokens: bool,
143    #[serde(default)]
144    pub system: bool,
145    #[serde(default)]
146    pub grounded: bool,
147    #[serde(default)]
148    pub underspecified: bool,
149    #[serde(default)]
150    pub prompt: PromptProjection,
151}
152
153/// Prompt projection mode — 3 states rather than a bool so that truncation
154/// length can travel inside the filter object.
155///
156/// JSON tag is `mode`: `{"mode":"off"}` / `{"mode":"preview","chars":200}` /
157/// `{"mode":"full"}`.
158#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
159#[serde(tag = "mode", rename_all = "snake_case")]
160pub enum PromptProjection {
161    #[default]
162    Off,
163    Preview {
164        chars: usize,
165    },
166    Full,
167}
168
169impl PendingFilter {
170    /// Preset: query identification only (`query_id` + `max_tokens`).
171    pub fn preset_meta() -> Self {
172        Self {
173            query_id: true,
174            max_tokens: true,
175            ..Self::default()
176        }
177    }
178
179    /// Preset: meta + first N chars of the prompt. Uses the hard default
180    /// for N (`DEFAULT_PROMPT_PREVIEW_CHARS`).
181    pub fn preset_preview() -> Self {
182        Self::preset_preview_with(DEFAULT_PROMPT_PREVIEW_CHARS)
183    }
184
185    /// Preset: meta + first `chars` chars of the prompt. Lets callers
186    /// flow a config-resolved length (e.g. env var) into the preset.
187    pub fn preset_preview_with(chars: usize) -> Self {
188        Self {
189            query_id: true,
190            max_tokens: true,
191            prompt: PromptProjection::Preview { chars },
192            ..Self::default()
193        }
194    }
195
196    /// Preset: every field including the full prompt (debug use).
197    pub fn preset_full() -> Self {
198        Self {
199            query_id: true,
200            max_tokens: true,
201            system: true,
202            grounded: true,
203            underspecified: true,
204            prompt: PromptProjection::Full,
205        }
206    }
207
208    /// Resolve a preset by name. Unknown names return `None` so that
209    /// callers can surface a typed error rather than silently falling
210    /// back to a default projection.
211    pub fn from_preset(name: &str) -> Option<Self> {
212        match name {
213            "meta" => Some(Self::preset_meta()),
214            "preview" => Some(Self::preset_preview()),
215            "full" => Some(Self::preset_full()),
216            _ => None,
217        }
218    }
219
220    /// Same as [`Self::from_preset`] but lets `"preview"` pick up a
221    /// caller-supplied char count (config / env override).
222    pub fn from_preset_with(name: &str, preview_chars: usize) -> Option<Self> {
223        match name {
224            "meta" => Some(Self::preset_meta()),
225            "preview" => Some(Self::preset_preview_with(preview_chars)),
226            "full" => Some(Self::preset_full()),
227            _ => None,
228        }
229    }
230}
231
232/// Project a single `LlmQuery` into the JSON object requested by `filter`.
233///
234/// UTF-8 safety: `PromptProjection::Preview { chars }` uses `chars().take(N)`
235/// so the cut never splits a multi-byte code point.
236fn project_query(q: &LlmQuery, f: &PendingFilter) -> serde_json::Value {
237    let mut obj = serde_json::Map::new();
238    if f.query_id {
239        obj.insert("query_id".into(), q.id.as_str().into());
240    }
241    if f.max_tokens {
242        obj.insert("max_tokens".into(), q.max_tokens.into());
243    }
244    if f.system {
245        obj.insert(
246            "system".into(),
247            match &q.system {
248                Some(s) => serde_json::Value::String(s.clone()),
249                None => serde_json::Value::Null,
250            },
251        );
252    }
253    if f.grounded {
254        obj.insert("grounded".into(), q.grounded.into());
255    }
256    if f.underspecified {
257        obj.insert("underspecified".into(), q.underspecified.into());
258    }
259    match &f.prompt {
260        PromptProjection::Off => {}
261        PromptProjection::Full => {
262            obj.insert("prompt".into(), q.prompt.clone().into());
263        }
264        PromptProjection::Preview { chars } => {
265            let preview: String = q.prompt.chars().take(*chars).collect();
266            obj.insert("prompt_preview".into(), preview.into());
267        }
268    }
269    serde_json::Value::Object(obj)
270}
271
272// ─── Session ─────────────────────────────────────────────────
273
274/// A Lua execution session with domain state tracking.
275///
276/// Each session owns a dedicated Lua VM via `_vm_driver`. The VM's OS thread
277/// stays alive as long as the driver is held, and exits cleanly when the
278/// session is dropped (channel closes → Lua thread drains and exits).
279pub struct Session {
280    state: ExecutionState,
281    metrics: ExecutionMetrics,
282    observer: MetricsObserver,
283    llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
284    exec_task: AsyncTask,
285    /// QueryId → resp_tx. Populated on Paused, cleared on resume.
286    resp_txs: HashMap<QueryId, tokio::sync::oneshot::Sender<Result<String, String>>>,
287    /// Per-session VM lifecycle driver. Keeps the Lua thread alive.
288    /// Dropped when the session completes or is abandoned.
289    _vm_driver: AsyncIsleDriver,
290    /// Last activity timestamp (monotonic). Updated on creation and each feed_one().
291    /// Used by GC to identify idle sessions for cleanup.
292    last_active: std::time::Instant,
293    /// Wall-clock Unix ms when the session was created (immutable after Session::new).
294    started_at_ms: i64,
295    /// Wall-clock Unix ms of the most recent activity (feed_one or session creation).
296    /// Updated with `Relaxed` ordering — observability use only, no cross-thread invariant.
297    last_activity_ms: Arc<AtomicI64>,
298}
299
300impl Session {
301    /// Create a new session.
302    ///
303    /// # Arguments
304    ///
305    /// - `llm_rx` — Receiver for LLM requests from the Lua bridge.
306    /// - `exec_task` — The coroutine execution task handle.
307    /// - `metrics` — Session metrics (owns the LogSink ring buffer; the bridge
308    ///   reads its `log_sink_handle()` separately to wire `print()` / `alc.log()`
309    ///   into the same ring buffer that `metrics.snapshot()` exposes via
310    ///   `recent_logs` in `alc_status`).
311    /// - `vm_driver` — Keeps the Lua OS thread alive.
312    ///
313    /// # Returns
314    ///
315    /// A new `Session` in the `Running` state.
316    pub fn new(
317        llm_rx: tokio::sync::mpsc::Receiver<LlmRequest>,
318        exec_task: AsyncTask,
319        metrics: ExecutionMetrics,
320        vm_driver: AsyncIsleDriver,
321    ) -> Self {
322        let observer = metrics.create_observer();
323        // Note: duration_since can only fail if the wall clock predates UNIX_EPOCH
324        // (broken system clock). Saturating to 0 is harmless for observability.
325        let started_at_ms = SystemTime::now()
326            .duration_since(UNIX_EPOCH)
327            .unwrap_or_default()
328            .as_millis() as i64;
329        Self {
330            state: ExecutionState::Running,
331            metrics,
332            observer,
333            llm_rx,
334            exec_task,
335            resp_txs: HashMap::new(),
336            _vm_driver: vm_driver,
337            last_active: std::time::Instant::now(),
338            started_at_ms,
339            last_activity_ms: Arc::new(AtomicI64::new(started_at_ms)),
340        }
341    }
342
343    /// Wait for the next event from Lua execution.
344    ///
345    /// Called after initial start or after feeding all responses.
346    /// State must be Running when called.
347    async fn wait_event(&mut self) -> Result<FeedResult, SessionError> {
348        tokio::select! {
349            result = &mut self.exec_task => {
350                match result {
351                    Ok(json_str) => match serde_json::from_str::<serde_json::Value>(&json_str) {
352                        Ok(v) => {
353                            self.state.complete(v.clone()).map_err(|e| {
354                                SessionError::InvalidTransition(e.to_string())
355                            })?;
356                            self.observer.on_completed(&v);
357                            Ok(FeedResult::Finished(ExecutionResult {
358                                state: TerminalState::Completed { result: v },
359                                metrics: self.take_metrics(),
360                            }))
361                        }
362                        Err(e) => self.fail_with(format!("JSON parse: {e}")),
363                    },
364                    Err(e) => self.fail_with(e.to_string()),
365                }
366            }
367            Some(req) = self.llm_rx.recv() => {
368                let queries: Vec<LlmQuery> = req.queries.iter().map(|qr| LlmQuery {
369                    id: qr.id.clone(),
370                    prompt: qr.prompt.clone(),
371                    system: qr.system.clone(),
372                    max_tokens: qr.max_tokens,
373                    grounded: qr.grounded,
374                    underspecified: qr.underspecified,
375                }).collect();
376
377                for qr in req.queries {
378                    self.resp_txs.insert(qr.id, qr.resp_tx);
379                }
380
381                self.state.pause(queries.clone()).map_err(|e| {
382                    SessionError::InvalidTransition(e.to_string())
383                })?;
384                self.observer.on_paused(&queries);
385                Ok(FeedResult::Paused { queries })
386            }
387        }
388    }
389
390    /// Feed one response by query_id.
391    ///
392    /// # Arguments
393    ///
394    /// - `query_id` — The query to respond to.
395    /// - `response` — The LLM response string.
396    /// - `usage` — Optional token usage from the host.
397    ///
398    /// # Returns
399    ///
400    /// `Ok(true)` if all queries are now complete; `Ok(false)` if more responses remain.
401    ///
402    /// # Errors
403    ///
404    /// Returns `SessionError::Feed` if the state machine rejects the feed.
405    fn feed_one(
406        &mut self,
407        query_id: &QueryId,
408        response: String,
409        usage: Option<&algocline_core::TokenUsage>,
410    ) -> Result<bool, SessionError> {
411        // Update both monotonic and wall-clock activity timestamps on each feed.
412        self.last_active = std::time::Instant::now();
413        // Note: duration_since can only fail if wall clock predates UNIX_EPOCH.
414        // Saturating to 0 is harmless for observability.
415        let now_ms = SystemTime::now()
416            .duration_since(UNIX_EPOCH)
417            .unwrap_or_default()
418            .as_millis() as i64;
419        self.last_activity_ms.store(now_ms, Ordering::Relaxed);
420
421        // Track response before ownership transfer.
422        self.observer.on_response_fed(query_id, &response, usage);
423
424        // Runtime: send response to Lua thread (unblocks resp_rx.recv())
425        if let Some(tx) = self.resp_txs.remove(query_id) {
426            let _ = tx.send(Ok(response.clone()));
427        }
428
429        // Domain: record in state machine
430        let complete = self
431            .state
432            .feed(query_id, response)
433            .map_err(SessionError::Feed)?;
434
435        if complete {
436            // Domain: transition Paused(complete) → Running
437            self.state
438                .take_responses()
439                .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
440            self.observer.on_resumed();
441        } else {
442            self.observer
443                .on_partial_feed(query_id, self.state.remaining());
444        }
445
446        Ok(complete)
447    }
448
449    fn fail_with(&mut self, msg: String) -> Result<FeedResult, SessionError> {
450        self.state
451            .fail(msg.clone())
452            .map_err(|e| SessionError::InvalidTransition(e.to_string()))?;
453        self.observer.on_failed(&msg);
454        Ok(FeedResult::Finished(ExecutionResult {
455            state: TerminalState::Failed { error: msg },
456            metrics: self.take_metrics(),
457        }))
458    }
459
460    fn take_metrics(&mut self) -> ExecutionMetrics {
461        std::mem::take(&mut self.metrics)
462    }
463
464    /// Lightweight snapshot for external observation (alc_status).
465    ///
466    /// Returns session state label and running metrics without consuming
467    /// or modifying the session.
468    ///
469    /// # Arguments
470    ///
471    /// - `pending_filter` — Opt-in projection for the currently pending LLM queries.
472    ///   `None` emits only `pending_queries: N` (integer count), preserving the v0.x
473    ///   wire shape for light-weight polling.  `Some(filter)` adds a `pending: [...]`
474    ///   array projected through the filter's field flags.
475    /// - `include_history` — When `true`, `conversation_history` (≤10 entries) is
476    ///   included in the metrics output.  When `false` (default), the key is absent.
477    ///   High-frequency polling callers should leave this `false` to avoid wire bloat.
478    ///
479    /// # Returns
480    ///
481    /// A `serde_json::Value` snapshot with the following additive fields beyond v0.x:
482    /// - `phase` — 5-value string derived from `ExecutionState`:
483    ///   `"running"`, `"paused"`, `"completed"`, `"failed"`, `"cancelled"`.
484    ///   The existing `state` key is retained for backward compatibility (3-value).
485    /// - `started_at` — Unix millisecond timestamp when the session was created.
486    /// - `last_activity_at` — Unix millisecond timestamp of the most recent feed_one.
487    ///   Note: `started_at` and `last_activity_at` are wall-clock values while
488    ///   expiry GC uses the monotonic `last_active` Instant; they may skew slightly
489    ///   on NTP adjustments (acceptable for observability use).
490    pub fn snapshot(
491        &self,
492        pending_filter: Option<&PendingFilter>,
493        include_history: bool,
494    ) -> serde_json::Value {
495        let state_label = match &self.state {
496            ExecutionState::Running => "running",
497            ExecutionState::Paused(_) => "paused",
498            _ => "terminal",
499        };
500
501        let phase = match &self.state {
502            ExecutionState::Running => "running",
503            ExecutionState::Paused(_) => "paused",
504            ExecutionState::Completed { .. } => "completed",
505            ExecutionState::Failed { .. } => "failed",
506            ExecutionState::Cancelled => "cancelled",
507        };
508
509        let mut json = serde_json::json!({
510            "state": state_label,
511            "phase": phase,
512            "started_at": self.started_at_ms,
513            "last_activity_at": self.last_activity_ms.load(Ordering::Relaxed),
514        });
515
516        let metrics = self.metrics.snapshot(include_history);
517        if !metrics.is_null() {
518            json["metrics"] = metrics;
519        }
520
521        // Pending query projection (additive; count is always present)
522        if let ExecutionState::Paused(pending) = &self.state {
523            json["pending_queries"] = pending.remaining().into();
524
525            if let Some(filter) = pending_filter {
526                let items: Vec<serde_json::Value> = pending
527                    .pending_queries()
528                    .iter()
529                    .map(|q| project_query(q, filter))
530                    .collect();
531                json["pending"] = serde_json::Value::Array(items);
532            }
533        }
534
535        json
536    }
537
538    /// Returns true if the session has been idle longer than `ttl`.
539    ///
540    /// Uses `saturating_duration_since` to avoid panics if the clock drifts
541    /// backwards (though this is extremely rare with monotonic clocks).
542    pub fn is_expired(&self, ttl: Duration) -> bool {
543        is_expired_impl(self.last_active, ttl)
544    }
545}
546
547/// Core expiry check, extracted for testability.
548fn is_expired_impl(last_active: std::time::Instant, ttl: Duration) -> bool {
549    std::time::Instant::now().saturating_duration_since(last_active) >= ttl
550}
551
552// ─── Registry ────────────────────────────────────────────────
553
554/// Manages active sessions.
555///
556/// # Locking design (lock **C**)
557///
558/// Uses `tokio::sync::Mutex` because `feed_response` holds the lock
559/// while calling `Session::feed_one()` (which itself acquires the
560/// per-session `std::sync::Mutex<SessionStatus>`, lock **A**). The lock
561/// ordering invariant is always **C → A** — no code path acquires A
562/// then C, so deadlock is structurally impossible.
563///
564/// `tokio::sync::Mutex` is chosen here (rather than `std::sync::Mutex`)
565/// because `feed_response` must take the session out of the map for
566/// the async `wait_event()` call. The two-phase pattern (lock → remove
567/// → unlock → await → lock → reinsert) requires an async-aware mutex
568/// to avoid holding the lock across the `wait_event().await`.
569///
570/// ## Contention
571///
572/// `list_snapshots()` (from `alc_status`) holds lock C while iterating
573/// all sessions. During this time, `feed_response` for any session is
574/// blocked. Given that snapshot iteration is O(n) with n = active
575/// sessions (typically 1–3) and each snapshot takes microseconds, this
576/// is acceptable. If session count grows significantly, consider
577/// switching to a concurrent map or per-session locks.
578///
579/// ## Interaction with lock A
580///
581/// `Session::snapshot()` (called under lock C in `list_snapshots`)
582/// acquires lock A via `ExecutionMetrics::snapshot()`. This is safe:
583/// - Lock order: C → A (consistent with `feed_response`)
584/// - Lock A hold time: microseconds (JSON field reads)
585/// - Lock A is per-session (no cross-session contention)
586pub struct SessionRegistry {
587    sessions: Arc<Mutex<HashMap<String, Session>>>,
588}
589
590impl Default for SessionRegistry {
591    fn default() -> Self {
592        Self::new()
593    }
594}
595
596impl SessionRegistry {
597    pub fn new() -> Self {
598        Self {
599            sessions: Arc::new(Mutex::new(HashMap::new())),
600        }
601    }
602
603    /// Start execution and wait for first event (pause or completion).
604    pub async fn start_execution(
605        &self,
606        mut session: Session,
607    ) -> Result<(String, FeedResult), SessionError> {
608        let session_id = gen_session_id();
609        let result = session.wait_event().await?;
610
611        if matches!(result, FeedResult::Paused { .. }) {
612            self.sessions
613                .lock()
614                .await
615                .insert(session_id.clone(), session);
616        }
617
618        Ok((session_id, result))
619    }
620
621    /// Feed one response to a paused session by query_id.
622    ///
623    /// If this completes all pending queries, the session resumes and
624    /// returns the next event (Paused or Finished).
625    /// If queries remain, returns Accepted { remaining }.
626    pub async fn feed_response(
627        &self,
628        session_id: &str,
629        query_id: &QueryId,
630        response: String,
631        usage: Option<&algocline_core::TokenUsage>,
632    ) -> Result<FeedResult, SessionError> {
633        // 1. Feed under lock
634        let complete = {
635            let mut map = self.sessions.lock().await;
636            let session = map
637                .get_mut(session_id)
638                .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
639
640            let complete = session.feed_one(query_id, response, usage)?;
641
642            if !complete {
643                return Ok(FeedResult::Accepted {
644                    remaining: session.state.remaining(),
645                });
646            }
647
648            complete
649        };
650
651        // 2. All complete → take session out for async resume
652        debug_assert!(complete);
653        let mut session = {
654            let mut map = self.sessions.lock().await;
655            map.remove(session_id)
656                .ok_or_else(|| SessionError::NotFound(session_id.into()))?
657        };
658
659        let result = session.wait_event().await?;
660
661        if matches!(result, FeedResult::Paused { .. }) {
662            self.sessions
663                .lock()
664                .await
665                .insert(session_id.into(), session);
666        }
667
668        Ok(result)
669    }
670
671    /// Resolve the sole pending query ID for a session.
672    ///
673    /// When `alc_continue` is called without an explicit `query_id`, this
674    /// method checks if exactly one query is pending and returns its ID.
675    /// Returns an error if zero or multiple queries are pending.
676    pub async fn resolve_sole_pending_id(&self, session_id: &str) -> Result<QueryId, SessionError> {
677        let map = self.sessions.lock().await;
678        let session = map
679            .get(session_id)
680            .ok_or_else(|| SessionError::NotFound(session_id.into()))?;
681        let keys: Vec<QueryId> = session.resp_txs.keys().cloned().collect();
682        match keys.len() {
683            0 => Err(SessionError::InvalidTransition("no pending queries".into())),
684            1 => keys
685                .into_iter()
686                .next()
687                .ok_or_else(|| SessionError::InvalidTransition("unexpected empty keys".into())),
688            n => Err(SessionError::InvalidTransition(format!(
689                "{n} queries pending; specify query_id explicitly"
690            ))),
691        }
692    }
693
694    /// Snapshot all active sessions for external observation (alc_status).
695    ///
696    /// Returns a map of session_id → snapshot JSON. Only includes sessions
697    /// currently held in the registry (i.e. paused, awaiting responses).
698    /// Sessions that have completed are already removed from the registry.
699    ///
700    /// # Arguments
701    ///
702    /// - `pending_filter` — Forwarded verbatim to each session's [`Session::snapshot`].
703    /// - `include_history` — When `true`, each snapshot includes `conversation_history`
704    ///   (≤10 entries).  Pass `false` for high-frequency polling to avoid wire bloat.
705    ///
706    /// # Returns
707    ///
708    /// A `HashMap` mapping session IDs to their JSON snapshots.
709    pub async fn list_snapshots(
710        &self,
711        pending_filter: Option<&PendingFilter>,
712        include_history: bool,
713    ) -> HashMap<String, serde_json::Value> {
714        let map = self.sessions.lock().await;
715        map.iter()
716            .map(|(id, session)| {
717                (
718                    id.clone(),
719                    session.snapshot(pending_filter, include_history),
720                )
721            })
722            .collect()
723    }
724
725    /// Spawn a background GC task that reaps sessions idle longer than `ttl`.
726    ///
727    /// The task runs every 60 seconds. When the process exits, the task is
728    /// naturally terminated. No `JoinHandle` is retained — process exit is
729    /// sufficient for cleanup in MCP server deployments.
730    pub fn spawn_gc_task(&self, ttl: Duration) {
731        let sessions = Arc::clone(&self.sessions);
732        tokio::spawn(async move {
733            let mut interval = tokio::time::interval(Duration::from_secs(60));
734            loop {
735                interval.tick().await;
736                let mut map = sessions.lock().await;
737                let expired: Vec<String> = map
738                    .iter()
739                    .filter(|(_, s)| s.is_expired(ttl))
740                    .map(|(id, _)| id.clone())
741                    .collect();
742                for id in &expired {
743                    tracing::info!(session_id = %id, "GC: reaping expired session");
744                    map.remove(id);
745                }
746            }
747        });
748    }
749}
750
751/// Generate a non-deterministic session ID.
752///
753/// MCP spec requires "secure, non-deterministic session IDs" to prevent
754/// session hijacking. Uses timestamp + random bytes for uniqueness and
755/// unpredictability.
756///
757/// # `unwrap_or_default` on `duration_since(UNIX_EPOCH)`
758///
759/// `SystemTime::now().duration_since(UNIX_EPOCH)` can fail if the system
760/// clock is set before 1970-01-01 (e.g. NTP drift, misconfigured VM).
761/// The Rust std docs recommend `expect()` or `match` for explicit handling,
762/// but `expect` would panic in library code (prohibited by project policy).
763///
764/// `unwrap_or_default` returns `Duration::ZERO` on failure, yielding
765/// timestamp `0`. This is acceptable here because the 8-byte random
766/// suffix (16 hex chars of entropy) independently guarantees uniqueness
767/// and unpredictability — the timestamp is a convenience prefix, not
768/// a security-critical component.
769fn gen_session_id() -> String {
770    use std::time::{SystemTime, UNIX_EPOCH};
771    let ts = SystemTime::now()
772        .duration_since(UNIX_EPOCH)
773        .unwrap_or_default()
774        .as_nanos();
775    // 8 random bytes → 16 hex chars of entropy
776    let random: u64 = {
777        use std::collections::hash_map::RandomState;
778        use std::hash::{BuildHasher, Hasher};
779        let s = RandomState::new();
780        let mut h = s.build_hasher();
781        h.write_u128(ts);
782        h.finish()
783    };
784    format!("s-{ts:x}-{random:016x}")
785}
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790    use algocline_core::{ExecutionMetrics, LlmQuery, QueryId};
791    use serde_json::json;
792
793    fn make_query(index: usize) -> LlmQuery {
794        LlmQuery {
795            id: QueryId::batch(index),
796            prompt: format!("prompt-{index}"),
797            system: None,
798            max_tokens: 100,
799            grounded: false,
800            underspecified: false,
801        }
802    }
803
804    // ─── FeedResult::to_json tests ───
805
806    #[test]
807    fn to_json_accepted() {
808        let result = FeedResult::Accepted { remaining: 3 };
809        let json = result.to_json("s-123");
810        assert_eq!(json["status"], "accepted");
811        assert_eq!(json["remaining"], 3);
812    }
813
814    #[test]
815    fn to_json_paused_single_query() {
816        let query = LlmQuery {
817            id: QueryId::single(),
818            prompt: "What is 2+2?".into(),
819            system: Some("You are a calculator.".into()),
820            max_tokens: 50,
821            grounded: false,
822            underspecified: false,
823        };
824        let result = FeedResult::Paused {
825            queries: vec![query],
826        };
827        let json = result.to_json("s-abc");
828
829        assert_eq!(json["status"], "needs_response");
830        assert_eq!(json["session_id"], "s-abc");
831        assert_eq!(json["prompt"], "What is 2+2?");
832        assert_eq!(json["system"], "You are a calculator.");
833        assert_eq!(json["max_tokens"], 50);
834        // single query mode: no "queries" array
835        assert!(json.get("queries").is_none());
836        // grounded=false must be absent
837        assert!(
838            json.get("grounded").is_none(),
839            "grounded key must be absent when false"
840        );
841        // underspecified=false must be absent
842        assert!(
843            json.get("underspecified").is_none(),
844            "underspecified key must be absent when false"
845        );
846    }
847
848    #[test]
849    fn to_json_paused_single_query_grounded() {
850        let query = LlmQuery {
851            id: QueryId::single(),
852            prompt: "verify this claim".into(),
853            system: None,
854            max_tokens: 200,
855            grounded: true,
856            underspecified: false,
857        };
858        let result = FeedResult::Paused {
859            queries: vec![query],
860        };
861        let json = result.to_json("s-grounded");
862
863        assert_eq!(json["status"], "needs_response");
864        assert_eq!(
865            json["grounded"], true,
866            "grounded must appear in single-query MCP JSON"
867        );
868    }
869
870    #[test]
871    fn to_json_paused_single_query_underspecified() {
872        let query = LlmQuery {
873            id: QueryId::single(),
874            prompt: "what output format do you need?".into(),
875            system: None,
876            max_tokens: 200,
877            grounded: false,
878            underspecified: true,
879        };
880        let result = FeedResult::Paused {
881            queries: vec![query],
882        };
883        let json = result.to_json("s-underspec");
884
885        assert_eq!(json["status"], "needs_response");
886        assert_eq!(
887            json["underspecified"], true,
888            "underspecified must appear in single-query MCP JSON"
889        );
890        assert!(
891            json.get("grounded").is_none(),
892            "grounded must be absent when false"
893        );
894    }
895
896    #[test]
897    fn to_json_paused_multiple_queries_mixed_grounded() {
898        let grounded_query = LlmQuery {
899            id: QueryId::batch(0),
900            prompt: "verify".into(),
901            system: None,
902            max_tokens: 100,
903            grounded: true,
904            underspecified: false,
905        };
906        let normal_query = LlmQuery {
907            id: QueryId::batch(1),
908            prompt: "generate".into(),
909            system: None,
910            max_tokens: 100,
911            grounded: false,
912            underspecified: false,
913        };
914        let result = FeedResult::Paused {
915            queries: vec![grounded_query, normal_query],
916        };
917        let json = result.to_json("s-batch");
918
919        let qs = json["queries"].as_array().expect("queries should be array");
920        assert_eq!(
921            qs[0]["grounded"], true,
922            "grounded query must have grounded=true"
923        );
924        assert!(
925            qs[1].get("grounded").is_none(),
926            "non-grounded query must omit grounded key"
927        );
928    }
929
930    #[test]
931    fn to_json_paused_multiple_queries_mixed_underspecified() {
932        let underspec_query = LlmQuery {
933            id: QueryId::batch(0),
934            prompt: "clarify intent".into(),
935            system: None,
936            max_tokens: 100,
937            grounded: false,
938            underspecified: true,
939        };
940        let normal_query = LlmQuery {
941            id: QueryId::batch(1),
942            prompt: "generate".into(),
943            system: None,
944            max_tokens: 100,
945            grounded: false,
946            underspecified: false,
947        };
948        let result = FeedResult::Paused {
949            queries: vec![underspec_query, normal_query],
950        };
951        let json = result.to_json("s-batch-us");
952
953        let qs = json["queries"].as_array().expect("queries should be array");
954        assert_eq!(
955            qs[0]["underspecified"], true,
956            "underspecified query must have underspecified=true"
957        );
958        assert!(
959            qs[1].get("underspecified").is_none(),
960            "non-underspecified query must omit underspecified key"
961        );
962    }
963
964    #[test]
965    fn to_json_paused_single_query_no_system() {
966        let query = LlmQuery {
967            id: QueryId::single(),
968            prompt: "hello".into(),
969            system: None,
970            max_tokens: 1024,
971            grounded: false,
972            underspecified: false,
973        };
974        let result = FeedResult::Paused {
975            queries: vec![query],
976        };
977        let json = result.to_json("s-x");
978
979        assert_eq!(json["status"], "needs_response");
980        assert!(json["system"].is_null());
981    }
982
983    #[test]
984    fn to_json_paused_multiple_queries() {
985        let queries = vec![make_query(0), make_query(1), make_query(2)];
986        let result = FeedResult::Paused { queries };
987        let json = result.to_json("s-multi");
988
989        assert_eq!(json["status"], "needs_response");
990        assert_eq!(json["session_id"], "s-multi");
991
992        let qs = json["queries"].as_array().expect("queries should be array");
993        assert_eq!(qs.len(), 3);
994        assert_eq!(qs[0]["id"], "q-0");
995        assert_eq!(qs[0]["prompt"], "prompt-0");
996        assert_eq!(qs[1]["id"], "q-1");
997        assert_eq!(qs[2]["id"], "q-2");
998    }
999
1000    #[test]
1001    fn to_json_finished_completed() {
1002        let result = FeedResult::Finished(ExecutionResult {
1003            state: TerminalState::Completed {
1004                result: json!({"answer": 42}),
1005            },
1006            metrics: ExecutionMetrics::new(),
1007        });
1008        let json = result.to_json("s-done");
1009
1010        assert_eq!(json["status"], "completed");
1011        assert_eq!(json["result"]["answer"], 42);
1012        assert!(json.get("stats").is_some());
1013    }
1014
1015    #[test]
1016    fn to_json_finished_failed() {
1017        let result = FeedResult::Finished(ExecutionResult {
1018            state: TerminalState::Failed {
1019                error: "lua error: bad argument".into(),
1020            },
1021            metrics: ExecutionMetrics::new(),
1022        });
1023        let json = result.to_json("s-err");
1024
1025        assert_eq!(json["status"], "error");
1026        assert_eq!(json["error"], "lua error: bad argument");
1027    }
1028
1029    #[test]
1030    fn to_json_finished_cancelled() {
1031        let result = FeedResult::Finished(ExecutionResult {
1032            state: TerminalState::Cancelled,
1033            metrics: ExecutionMetrics::new(),
1034        });
1035        let json = result.to_json("s-cancel");
1036
1037        assert_eq!(json["status"], "cancelled");
1038        assert!(json.get("stats").is_some());
1039    }
1040
1041    // ─── gen_session_id tests ───
1042
1043    #[test]
1044    fn session_id_starts_with_prefix() {
1045        let id = gen_session_id();
1046        assert!(id.starts_with("s-"), "id should start with 's-': {id}");
1047    }
1048
1049    #[test]
1050    fn session_id_uniqueness() {
1051        let ids: Vec<String> = (0..10).map(|_| gen_session_id()).collect();
1052        let set: std::collections::HashSet<&String> = ids.iter().collect();
1053        assert_eq!(set.len(), 10, "10 IDs should all be unique");
1054    }
1055
1056    // ─── is_expired_impl tests ───
1057    //
1058    // Session::is_expired delegates to is_expired_impl. Testing the impl
1059    // directly avoids the need to construct a full Session (which requires
1060    // a real Lua VM + channels).
1061
1062    #[test]
1063    fn is_expired_impl_fresh_instant_not_expired() {
1064        // A just-created instant should not be expired with a non-zero TTL
1065        let now = std::time::Instant::now();
1066        assert!(!is_expired_impl(now, Duration::from_secs(1)));
1067    }
1068
1069    #[test]
1070    fn is_expired_impl_old_instant_expired() {
1071        // Simulate a session idle for 2 hours by backdating last_active
1072        let two_hours_ago = std::time::Instant::now()
1073            .checked_sub(Duration::from_secs(7200))
1074            .expect("checked_sub should succeed with sane duration");
1075        // TTL = 1 hour: should be expired
1076        assert!(is_expired_impl(two_hours_ago, Duration::from_secs(3600)));
1077    }
1078
1079    #[test]
1080    fn is_expired_impl_not_yet_expired() {
1081        // Simulate a session idle for 1 hour
1082        let one_hour_ago = std::time::Instant::now()
1083            .checked_sub(Duration::from_secs(3600))
1084            .expect("checked_sub should succeed with sane duration");
1085        // TTL = 3 hours: should NOT be expired yet
1086        assert!(!is_expired_impl(one_hour_ago, Duration::from_secs(10800)));
1087    }
1088
1089    #[test]
1090    fn is_expired_impl_zero_ttl_always_expired() {
1091        // TTL = 0: any instant is immediately expired (edge case)
1092        let now = std::time::Instant::now();
1093        assert!(is_expired_impl(now, Duration::ZERO));
1094    }
1095
1096    // ─── PendingFilter preset tests ───
1097
1098    #[test]
1099    fn pending_filter_default_is_all_off() {
1100        let f = PendingFilter::default();
1101        assert!(!f.query_id);
1102        assert!(!f.max_tokens);
1103        assert!(!f.system);
1104        assert!(!f.grounded);
1105        assert!(!f.underspecified);
1106        assert!(matches!(f.prompt, PromptProjection::Off));
1107    }
1108
1109    #[test]
1110    fn pending_filter_preset_meta_flags() {
1111        let f = PendingFilter::preset_meta();
1112        assert!(f.query_id);
1113        assert!(f.max_tokens);
1114        assert!(!f.system);
1115        assert!(!f.grounded);
1116        assert!(!f.underspecified);
1117        assert!(
1118            matches!(f.prompt, PromptProjection::Off),
1119            "meta preset must not project prompt content"
1120        );
1121    }
1122
1123    #[test]
1124    fn pending_filter_preset_preview_uses_default_chars() {
1125        let f = PendingFilter::preset_preview();
1126        assert!(f.query_id);
1127        assert!(f.max_tokens);
1128        match f.prompt {
1129            PromptProjection::Preview { chars } => {
1130                assert_eq!(chars, DEFAULT_PROMPT_PREVIEW_CHARS);
1131            }
1132            other => panic!("expected Preview, got {other:?}"),
1133        }
1134    }
1135
1136    #[test]
1137    fn pending_filter_preset_preview_with_custom_chars() {
1138        let f = PendingFilter::preset_preview_with(42);
1139        match f.prompt {
1140            PromptProjection::Preview { chars } => assert_eq!(chars, 42),
1141            other => panic!("expected Preview {{chars: 42}}, got {other:?}"),
1142        }
1143    }
1144
1145    #[test]
1146    fn pending_filter_preset_full_flags_all_on() {
1147        let f = PendingFilter::preset_full();
1148        assert!(f.query_id);
1149        assert!(f.max_tokens);
1150        assert!(f.system);
1151        assert!(f.grounded);
1152        assert!(f.underspecified);
1153        assert!(matches!(f.prompt, PromptProjection::Full));
1154    }
1155
1156    #[test]
1157    fn pending_filter_from_preset_known_names() {
1158        assert!(PendingFilter::from_preset("meta").is_some());
1159        assert!(PendingFilter::from_preset("preview").is_some());
1160        assert!(PendingFilter::from_preset("full").is_some());
1161    }
1162
1163    #[test]
1164    fn pending_filter_from_preset_unknown_returns_none() {
1165        // Typo-protection invariant: caller must surface an error, not
1166        // silently fall back to a default projection.
1167        assert!(PendingFilter::from_preset("").is_none());
1168        assert!(PendingFilter::from_preset("META").is_none());
1169        assert!(PendingFilter::from_preset("bogus").is_none());
1170    }
1171
1172    #[test]
1173    fn pending_filter_from_preset_with_overrides_preview_chars() {
1174        // "preview" respects the per-call chars count (flowed in from env
1175        // or config); other presets ignore it.
1176        let f = PendingFilter::from_preset_with("preview", 73).unwrap();
1177        match f.prompt {
1178            PromptProjection::Preview { chars } => assert_eq!(chars, 73),
1179            other => panic!("expected Preview {{chars: 73}}, got {other:?}"),
1180        }
1181
1182        let f_meta = PendingFilter::from_preset_with("meta", 73).unwrap();
1183        assert!(matches!(f_meta.prompt, PromptProjection::Off));
1184
1185        let f_full = PendingFilter::from_preset_with("full", 73).unwrap();
1186        assert!(matches!(f_full.prompt, PromptProjection::Full));
1187    }
1188
1189    // ─── project_query tests ───
1190
1191    #[test]
1192    fn project_query_default_filter_produces_empty_object() {
1193        let q = make_query(0);
1194        let v = project_query(&q, &PendingFilter::default());
1195        let obj = v.as_object().expect("object");
1196        assert!(obj.is_empty(), "default filter should project nothing");
1197    }
1198
1199    #[test]
1200    fn project_query_meta_preset_has_id_and_max_tokens_only() {
1201        let q = make_query(0);
1202        let v = project_query(&q, &PendingFilter::preset_meta());
1203        let obj = v.as_object().expect("object");
1204        assert_eq!(obj.len(), 2);
1205        assert_eq!(v["query_id"], "q-0");
1206        assert_eq!(v["max_tokens"], 100);
1207        assert!(obj.get("prompt").is_none());
1208        assert!(obj.get("prompt_preview").is_none());
1209        assert!(obj.get("system").is_none());
1210        assert!(obj.get("grounded").is_none());
1211        assert!(obj.get("underspecified").is_none());
1212    }
1213
1214    #[test]
1215    fn project_query_full_preset_has_all_fields() {
1216        let q = LlmQuery {
1217            id: QueryId::batch(0),
1218            prompt: "hi".into(),
1219            system: Some("sys".into()),
1220            max_tokens: 100,
1221            grounded: true,
1222            underspecified: true,
1223        };
1224        let v = project_query(&q, &PendingFilter::preset_full());
1225        assert_eq!(v["query_id"], "q-0");
1226        assert_eq!(v["max_tokens"], 100);
1227        assert_eq!(v["system"], "sys");
1228        assert_eq!(v["grounded"], true);
1229        assert_eq!(v["underspecified"], true);
1230        assert_eq!(v["prompt"], "hi");
1231        assert!(v.get("prompt_preview").is_none());
1232    }
1233
1234    #[test]
1235    fn project_query_preview_truncates_at_char_count() {
1236        let q = LlmQuery {
1237            id: QueryId::batch(0),
1238            prompt: "abcdefghij".into(),
1239            system: None,
1240            max_tokens: 10,
1241            grounded: false,
1242            underspecified: false,
1243        };
1244        let v = project_query(&q, &PendingFilter::preset_preview_with(5));
1245        assert_eq!(v["prompt_preview"], "abcde");
1246        assert!(v.get("prompt").is_none());
1247    }
1248
1249    #[test]
1250    fn project_query_preview_utf8_multibyte_safe() {
1251        // Japanese characters are 3-byte UTF-8 each; chars().take(N) must
1252        // never split a codepoint. Taking 3 chars from 5 must yield exactly
1253        // 3 chars (not bytes), and the String must be valid UTF-8.
1254        let prompt = "あいうえお";
1255        let q = LlmQuery {
1256            id: QueryId::batch(0),
1257            prompt: prompt.to_string(),
1258            system: None,
1259            max_tokens: 10,
1260            grounded: false,
1261            underspecified: false,
1262        };
1263        let v = project_query(&q, &PendingFilter::preset_preview_with(3));
1264        let preview = v["prompt_preview"].as_str().expect("str");
1265        assert_eq!(preview, "あいう");
1266        assert_eq!(preview.chars().count(), 3);
1267    }
1268
1269    #[test]
1270    fn project_query_preview_chars_over_length_returns_whole_prompt() {
1271        let q = LlmQuery {
1272            id: QueryId::batch(0),
1273            prompt: "abc".into(),
1274            system: None,
1275            max_tokens: 10,
1276            grounded: false,
1277            underspecified: false,
1278        };
1279        let v = project_query(&q, &PendingFilter::preset_preview_with(100));
1280        assert_eq!(v["prompt_preview"], "abc");
1281    }
1282
1283    #[test]
1284    fn project_query_system_field_null_when_absent() {
1285        let q = LlmQuery {
1286            id: QueryId::batch(0),
1287            prompt: "p".into(),
1288            system: None,
1289            max_tokens: 10,
1290            grounded: false,
1291            underspecified: false,
1292        };
1293        let filter = PendingFilter {
1294            system: true,
1295            ..Default::default()
1296        };
1297        let v = project_query(&q, &filter);
1298        assert!(
1299            v["system"].is_null(),
1300            "absent system must serialize as null"
1301        );
1302    }
1303
1304    // ─── PendingFilter deserialization (MCP custom object path) ───
1305
1306    #[test]
1307    fn pending_filter_deserialize_custom_object_preview() {
1308        // MCP callers may pass a raw JSON filter rather than a preset name.
1309        let raw = serde_json::json!({
1310            "query_id": true,
1311            "prompt": { "mode": "preview", "chars": 50 }
1312        });
1313        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1314        assert!(f.query_id);
1315        match f.prompt {
1316            PromptProjection::Preview { chars } => assert_eq!(chars, 50),
1317            other => panic!("expected Preview, got {other:?}"),
1318        }
1319    }
1320
1321    #[test]
1322    fn pending_filter_deserialize_partial_object_uses_field_defaults() {
1323        // serde(default) on every field means a `{}` object is valid and
1324        // equivalent to PendingFilter::default().
1325        let raw = serde_json::json!({});
1326        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1327        assert!(!f.query_id);
1328        assert!(matches!(f.prompt, PromptProjection::Off));
1329    }
1330
1331    #[test]
1332    fn pending_filter_deserialize_prompt_full_tag() {
1333        let raw = serde_json::json!({ "prompt": { "mode": "full" } });
1334        let f: PendingFilter = serde_json::from_value(raw).expect("deserialize");
1335        assert!(matches!(f.prompt, PromptProjection::Full));
1336    }
1337
1338    // ─── Session snapshot v2 fields tests ───
1339    //
1340    // These tests use the Executor to create real sessions so that Session
1341    // struct fields (started_at_ms, last_activity_ms, phase) are exercised
1342    // end-to-end without requiring direct construction of AsyncTask/AsyncIsleDriver.
1343
1344    /// Helper: build a minimal temp directory pair for state/card stores.
1345    fn tmp_dirs() -> (
1346        std::sync::Arc<crate::state::JsonFileStore>,
1347        std::sync::Arc<crate::card::FileCardStore>,
1348        std::path::PathBuf,
1349    ) {
1350        let tmp = tempfile::tempdir().expect("test tempdir");
1351        let root = tmp.path().to_path_buf();
1352        std::mem::forget(tmp);
1353        (
1354            std::sync::Arc::new(crate::state::JsonFileStore::new(root.join("state"))),
1355            std::sync::Arc::new(crate::card::FileCardStore::new(root.join("cards"))),
1356            root.join("scenarios"),
1357        )
1358    }
1359
1360    // T1: Session snapshot contains phase, started_at, last_activity_at
1361    // A session that completes immediately should have these fields in its snapshot
1362    // while it is Running (before completion removes it from the registry).
1363    //
1364    // Strategy: start a session with a Lua script that calls alc.llm() to pause.
1365    // The session will be in Paused state in the registry, allowing snapshot().
1366    #[tokio::test]
1367    async fn snapshot_v2_contains_phase_and_timestamps() {
1368        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1369        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1370
1371        // Lua: pause the session with a single alc.llm() call
1372        let code = r#"
1373            local response = alc.llm("what is 2+2?")
1374            return response
1375        "#
1376        .to_string();
1377
1378        let session = executor
1379            .start_session(
1380                code,
1381                serde_json::json!({}),
1382                vec![],
1383                vec![],
1384                state_store,
1385                card_store,
1386                scenarios_dir,
1387            )
1388            .await
1389            .unwrap();
1390
1391        // While in Running state (before first event), snapshot should have new fields.
1392        // Note: session.snapshot() is called before wait_event() so state is Running.
1393        let snap = session.snapshot(None, false);
1394
1395        // phase is present
1396        assert!(
1397            snap.get("phase").is_some(),
1398            "snapshot must have 'phase' field"
1399        );
1400        assert_eq!(snap["phase"], "running", "initial state must be running");
1401
1402        // state key retained for backward compatibility
1403        assert_eq!(snap["state"], "running");
1404
1405        // started_at is a positive i64 (unix ms)
1406        let started_at = snap["started_at"].as_i64().expect("started_at must be i64");
1407        assert!(started_at > 0, "started_at must be > 0 (unix ms)");
1408
1409        // last_activity_at starts equal to started_at
1410        let last_activity = snap["last_activity_at"]
1411            .as_i64()
1412            .expect("last_activity_at must be i64");
1413        assert_eq!(
1414            started_at, last_activity,
1415            "last_activity_at should equal started_at before any feed"
1416        );
1417    }
1418
1419    // T1: phase correctly maps 5 ExecutionState variants
1420    // We test Running via snapshot before wait_event (already done above).
1421    // Here we verify the phase string matches the ExecutionState literal.
1422    #[test]
1423    fn snapshot_phase_running_state_label() {
1424        // We can't construct Session directly in tests (AsyncTask is crate-private).
1425        // Instead verify the phase mapping logic through the match expression.
1426        // This test documents the expected mapping:
1427        let cases: &[(&str, &str)] = &[
1428            ("running", "running"),
1429            ("paused", "paused"),
1430            ("completed", "completed"),
1431            ("failed", "failed"),
1432            ("cancelled", "cancelled"),
1433        ];
1434        for (state_str, expected_phase) in cases {
1435            // The phase mapping is identical to state_str in the 5-value case,
1436            // and the 3-value state uses "terminal" for completed/failed/cancelled.
1437            // Verify that the 3-value state mapping is consistent with expectations.
1438            let three_value_state = match *state_str {
1439                "running" => "running",
1440                "paused" => "paused",
1441                _ => "terminal",
1442            };
1443            // phase must equal state_str (5-value) while state uses 3-value.
1444            assert_eq!(
1445                *expected_phase, *state_str,
1446                "phase for {state_str} must be the same string"
1447            );
1448            if *state_str != "running" && *state_str != "paused" {
1449                assert_eq!(
1450                    three_value_state, "terminal",
1451                    "{state_str} must map to 'terminal' in 3-value state"
1452                );
1453            }
1454        }
1455    }
1456
1457    // T1: snapshot(false) lacks conversation_history; snapshot(true) includes it
1458    #[tokio::test]
1459    async fn snapshot_conversation_history_opt_in() {
1460        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1461        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1462
1463        let code = r#"
1464            local response = alc.llm("explain recursion")
1465            return response
1466        "#
1467        .to_string();
1468
1469        let session = executor
1470            .start_session(
1471                code,
1472                serde_json::json!({}),
1473                vec![],
1474                vec![],
1475                state_store,
1476                card_store,
1477                scenarios_dir,
1478            )
1479            .await
1480            .unwrap();
1481
1482        // Before any LLM interaction, conversation_history is absent in both modes.
1483        let snap_false = session.snapshot(None, false);
1484        assert!(
1485            snap_false
1486                .get("metrics")
1487                .and_then(|m| m.get("conversation_history"))
1488                .is_none(),
1489            "conversation_history must be absent with include_history=false"
1490        );
1491
1492        // include_history=true: conversation_history key must exist (empty array at start).
1493        let snap_true = session.snapshot(None, true);
1494        // metrics is present; conversation_history may be empty array (no LLM calls yet)
1495        // but the key must be present.
1496        if let Some(metrics) = snap_true.get("metrics") {
1497            // If there are no transcript entries yet, conversation_history may be
1498            // absent or empty — depending on metrics implementation.
1499            // Either way, no panic. The key's presence is tested in metrics tests.
1500            let _ = metrics.get("conversation_history");
1501        }
1502    }
1503
1504    // T2: last_activity_ms starts equal to started_at_ms (edge case: no feeds yet)
1505    #[tokio::test]
1506    async fn snapshot_last_activity_at_starts_equal_to_started_at() {
1507        let executor = crate::executor::Executor::new(vec![]).await.unwrap();
1508        let (state_store, card_store, scenarios_dir) = tmp_dirs();
1509
1510        let code = r#"
1511            local response = alc.llm("test query")
1512            return response
1513        "#
1514        .to_string();
1515
1516        let session = executor
1517            .start_session(
1518                code,
1519                serde_json::json!({}),
1520                vec![],
1521                vec![],
1522                state_store,
1523                card_store,
1524                scenarios_dir,
1525            )
1526            .await
1527            .unwrap();
1528
1529        let snap = session.snapshot(None, false);
1530        let started_at = snap["started_at"].as_i64().unwrap_or(-1);
1531        let last_activity = snap["last_activity_at"].as_i64().unwrap_or(-2);
1532
1533        assert_eq!(
1534            started_at, last_activity,
1535            "last_activity_at must equal started_at before any feed_one"
1536        );
1537        assert!(started_at > 0, "started_at must be positive unix ms");
1538    }
1539}