Skip to main content

zero_tui/app/
session.rs

1//! Session glue — adapts [`zero_session::Store`] to the TUI.
2//!
3//! The TUI does not care whether persistence is on. If the user
4//! asked for `--no-persist`, or the DB failed to open, we fall
5//! back to a no-op sink so the render path is unchanged.
6//!
7//! This module hosts two adaptor surfaces:
8//! - [`SessionSink`] — the write side. Every [`LogEntry`]
9//!   flowing through `AppState::push` is mirrored here.
10//! - [`SessionAdapter`] — the read side plus the fork/save
11//!   hooks. It implements [`zero_commands::SessionSource`] so
12//!   `/sessions`, `/resume`, `/fork`, `/save` all reach the store
13//!   without `zero-commands` taking a hard dep on
14//!   `zero-session`.
15//!
16//! Both share an `Arc<Mutex<ActiveSession>>` so a `/fork` command
17//! can atomically swap the sink's target under the dispatcher's
18//! feet without a round-trip through `apply_dispatch` — keeping
19//! the "every persisted line lands in the current session" rule
20//! enforceable without ceremony.
21
22use std::sync::{Arc, Mutex};
23
24use chrono::{DateTime, Utc};
25use zero_commands::{
26    ReplayEvent, ReplayKind, SessionError as CmdSessionError, SessionSource, SessionSummary,
27};
28use zero_session::{EventKind as SessionKind, SessionError, SessionRow, Store, StoredEvent};
29
30use crate::app::log::{EntryKind, LogEntry};
31
32/// The `(session_id, ulid)` pair currently receiving writes.
33/// `None` is reached only after [`SessionAdapter::end_current`]
34/// (which we do not call yet) — the field exists today for the
35/// `/fork` swap.
36#[derive(Debug, Default, Clone)]
37struct ActiveSession {
38    row_id: Option<i64>,
39    ulid: Option<String>,
40}
41
42/// A write sink for session persistence. `None` means persistence
43/// is disabled; callers treat it as an append-only log.
44#[derive(Clone)]
45pub struct SessionSink {
46    store: Arc<Store>,
47    active: Arc<Mutex<ActiveSession>>,
48}
49
50impl std::fmt::Debug for SessionSink {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        let active = self.active.lock().unwrap();
53        f.debug_struct("SessionSink")
54            .field("row_id", &active.row_id)
55            .field("ulid", &active.ulid)
56            .finish_non_exhaustive()
57    }
58}
59
60impl SessionSink {
61    #[must_use]
62    pub fn new(store: Arc<Store>, session_id: i64, ulid: String) -> Self {
63        Self {
64            store,
65            active: Arc::new(Mutex::new(ActiveSession {
66                row_id: Some(session_id),
67                ulid: Some(ulid),
68            })),
69        }
70    }
71
72    /// Clone the shared handle so a [`SessionAdapter`] sees the
73    /// same active ulid/row-id whenever `/fork` swaps it.
74    #[must_use]
75    pub fn adapter(&self) -> SessionAdapter {
76        SessionAdapter {
77            store: Arc::clone(&self.store),
78            active: Arc::clone(&self.active),
79        }
80    }
81
82    /// Record one log entry. Errors are logged but do not propagate
83    /// — a DB hiccup must not deny the operator a visible render.
84    pub fn record(&self, entry: &LogEntry) {
85        let Some(session_id) = self.active.lock().unwrap().row_id else {
86            return;
87        };
88        let kind = to_session_kind(entry.kind);
89        if let Err(e) = self.store.append(session_id, kind, &entry.text) {
90            tracing::warn!(err = %e, "session append failed");
91        }
92    }
93
94    /// Close the *originally-opened* session row. Called during
95    /// shutdown. Forks open their own rows but do not close on
96    /// exit — a child session whose parent is still marked live
97    /// is the honest representation of a crash-exit.
98    pub fn end(&self) {
99        if let Some(session_id) = self.active.lock().unwrap().row_id
100            && let Err(e) = self.store.end_session(session_id)
101        {
102            tracing::warn!(err = %e, "session end failed");
103        }
104    }
105
106    /// The store this sink writes to. Exposed so a post-run
107    /// caller (daily-wrap generator, milestone writer) can
108    /// reach the same DB handle the sink has been using
109    /// without the caller having to carry a separate `Arc`.
110    ///
111    /// Returned as a shared reference — the caller must not
112    /// mutate the Arc; the write path remains the sink itself.
113    #[must_use]
114    pub fn store(&self) -> &Store {
115        &self.store
116    }
117
118    /// Snapshot of the originally-opened row's id. Returns
119    /// `None` if the session was forked away and never forked
120    /// back — today `end()` is still keyed on this same id,
121    /// so `None` is only reachable via a `/fork` without a
122    /// return, which the fork command never performs in M1.
123    #[must_use]
124    pub fn session_id(&self) -> Option<i64> {
125        self.active.lock().unwrap().row_id
126    }
127
128    /// Snapshot of the current active ULID. Same caveat as
129    /// [`Self::session_id`].
130    #[must_use]
131    pub fn ulid(&self) -> Option<String> {
132        self.active.lock().unwrap().ulid.clone()
133    }
134}
135
136/// Read + fork/save adaptor over a [`Store`], implementing
137/// [`SessionSource`] so the dispatcher can reach the on-disk
138/// history. Carries the same `Arc<Mutex<ActiveSession>>` as
139/// [`SessionSink`] so `/fork` atomically swaps the write target.
140#[derive(Clone)]
141pub struct SessionAdapter {
142    store: Arc<Store>,
143    active: Arc<Mutex<ActiveSession>>,
144}
145
146impl std::fmt::Debug for SessionAdapter {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        f.debug_struct("SessionAdapter")
149            .field("active_ulid", &self.active.lock().unwrap().ulid)
150            .finish_non_exhaustive()
151    }
152}
153
154impl SessionAdapter {
155    /// Resolve a `needle` (ulid prefix or saved label) to a full
156    /// session row. Prefix match requires ≥ 6 chars so
157    /// cross-session collisions are vanishingly unlikely (ulids
158    /// are 26 base-32 chars; the first 10 encode time). Label
159    /// lookup runs only when the prefix path misses. Returns
160    /// `Ok(None)` for a clean "no such session" so callers can
161    /// translate to [`CmdSessionError::NotFound`] without
162    /// depending on `rusqlite`.
163    fn resolve_needle(&self, needle: &str) -> Result<Option<SessionRow>, SessionError> {
164        if let Some(row) = self.store.get_session_by_ulid(needle)? {
165            return Ok(Some(row));
166        }
167        if needle.len() >= 6 {
168            let rows = self.store.list_sessions(1000)?;
169            if let Some(hit) = rows.into_iter().find(|r| r.ulid.starts_with(needle)) {
170                return Ok(Some(hit));
171            }
172        }
173        let key = label_key(needle);
174        if let Some(ulid) = self.store.get_milestone(&key)?
175            && let Some(row) = self.store.get_session_by_ulid(&ulid)?
176        {
177            return Ok(Some(row));
178        }
179        Ok(None)
180    }
181}
182
183impl SessionSource for SessionAdapter {
184    fn current_ulid(&self) -> Option<String> {
185        self.active.lock().unwrap().ulid.clone()
186    }
187
188    fn list(&self, limit: u32) -> Result<Vec<SessionSummary>, CmdSessionError> {
189        let rows = self.store.list_sessions(limit).map_err(io_err)?;
190        let mut out = Vec::with_capacity(rows.len());
191        for row in rows {
192            // count_events is cheap (indexed COUNT) so we pay it
193            // per row. If the table grows beyond toy sizes we can
194            // switch to a single JOIN query; for now clarity wins.
195            let n_events = self.store.count_events(row.id).map_err(io_err)?;
196            out.push(row_to_summary(row, n_events));
197        }
198        Ok(out)
199    }
200
201    fn find(&self, needle: &str) -> Result<SessionSummary, CmdSessionError> {
202        let row = self
203            .resolve_needle(needle)
204            .map_err(io_err)?
205            .ok_or(CmdSessionError::NotFound)?;
206        let n_events = self.store.count_events(row.id).map_err(io_err)?;
207        Ok(row_to_summary(row, n_events))
208    }
209
210    fn list_events(&self, ulid: &str, limit: u32) -> Result<Vec<ReplayEvent>, CmdSessionError> {
211        let row = self
212            .store
213            .get_session_by_ulid(ulid)
214            .map_err(io_err)?
215            .ok_or(CmdSessionError::NotFound)?;
216        let events = self.store.list_events(row.id, limit).map_err(io_err)?;
217        Ok(events.into_iter().map(stored_to_replay).collect())
218    }
219
220    fn save_label(&self, ulid: &str, label: &str) -> Result<(), CmdSessionError> {
221        // Guard against empty / whitespace-only labels — a bare
222        // `/save  ` would otherwise overwrite the sentinel key.
223        let trimmed = label.trim();
224        if trimmed.is_empty() {
225            return Err(CmdSessionError::Io("empty label".into()));
226        }
227        self.store
228            .set_milestone(&label_key(trimmed), ulid)
229            .map_err(io_err)
230    }
231
232    fn fork_from_current(&self) -> Result<Option<String>, CmdSessionError> {
233        let parent = self.active.lock().unwrap().ulid.clone();
234        let Some(parent_ulid) = parent else {
235            return Ok(None);
236        };
237        // We don't know the engine_base_url / cli_version here —
238        // those were captured at initial startup. The store keeps
239        // them on the parent row; the child row inherits them
240        // only via `parent_ulid` so `/sessions` shows the link.
241        // That is honest: a fork happens mid-session and the
242        // engine URL could have changed via `/connect` (future),
243        // so re-using the original value would be a subtle lie.
244        let new_ulid = new_ulid();
245        let new_row_id = self
246            .store
247            .start_session(
248                &new_ulid,
249                None,
250                env!("CARGO_PKG_VERSION"),
251                Some(&parent_ulid),
252            )
253            .map_err(io_err)?;
254        let mut g = self.active.lock().unwrap();
255        g.row_id = Some(new_row_id);
256        g.ulid = Some(new_ulid.clone());
257        Ok(Some(new_ulid))
258    }
259}
260
261/// Translate the persisted kind back into a TUI entry kind. The
262/// schema's `mode_change` rows are folded into `System` on replay
263/// — the mode switch already happened; the row is a breadcrumb.
264#[must_use]
265pub fn to_entry_kind(k: SessionKind) -> EntryKind {
266    match k {
267        SessionKind::Prompt => EntryKind::Prompt,
268        SessionKind::System | SessionKind::ModeChange => EntryKind::System,
269        SessionKind::Command => EntryKind::Command,
270        SessionKind::Warn => EntryKind::Warn,
271        SessionKind::Alert => EntryKind::Alert,
272    }
273}
274
275fn to_session_kind(k: EntryKind) -> SessionKind {
276    match k {
277        EntryKind::Prompt => SessionKind::Prompt,
278        EntryKind::System => SessionKind::System,
279        EntryKind::Command => SessionKind::Command,
280        EntryKind::Warn => SessionKind::Warn,
281        EntryKind::Alert => SessionKind::Alert,
282    }
283}
284
285/// Rehydrate stored events into log entries, preserving the
286/// original timestamps so rendered "age" reads stay truthful.
287#[must_use]
288pub fn replay(events: &[StoredEvent]) -> Vec<LogEntry> {
289    events
290        .iter()
291        .map(|e| LogEntry::new(to_entry_kind(e.kind), &e.text).at(e.at))
292        .collect()
293}
294
295/// Heuristic summary of a prior session for the resume banner.
296#[must_use]
297pub fn summarize(row: &SessionRow, n_events: usize) -> String {
298    let ts = row.started_at.format("%Y-%m-%d %H:%M UTC");
299    let status = if row.ended_at.is_some() {
300        "ended"
301    } else {
302        "interrupted"
303    };
304    format!("resuming: {ts} · {status} · {n_events} prior event(s)")
305}
306
307fn row_to_summary(row: SessionRow, n_events: i64) -> SessionSummary {
308    SessionSummary {
309        ulid: row.ulid,
310        started_at_ms: row.started_at.timestamp_millis(),
311        ended_at_ms: row.ended_at.map(|dt| dt.timestamp_millis()),
312        engine_base_url: row.engine_base_url,
313        cli_version: row.cli_version,
314        parent_ulid: row.parent_ulid,
315        n_events,
316    }
317}
318
319fn stored_to_replay(e: StoredEvent) -> ReplayEvent {
320    ReplayEvent {
321        kind: stored_kind_to_replay(e.kind),
322        at_ms: e.at.timestamp_millis(),
323        text: e.text,
324    }
325}
326
327fn stored_kind_to_replay(k: SessionKind) -> ReplayKind {
328    match k {
329        SessionKind::Prompt => ReplayKind::Prompt,
330        SessionKind::System | SessionKind::ModeChange => ReplayKind::System,
331        SessionKind::Command => ReplayKind::Command,
332        SessionKind::Warn => ReplayKind::Warn,
333        SessionKind::Alert => ReplayKind::Alert,
334    }
335}
336
337// Passed by value because the caller's error is already about to
338// be discarded — there is no use for a borrowed reference, and
339// `.map_err(io_err)` is the idiomatic short form. Clippy's
340// needless-pass-by-value fires here despite that.
341#[allow(clippy::needless_pass_by_value)]
342fn io_err(e: SessionError) -> CmdSessionError {
343    CmdSessionError::Io(e.to_string())
344}
345
346fn label_key(label: &str) -> String {
347    // Namespace labels under a fixed prefix so they cannot collide
348    // with the in-tree milestone constants (`welcome_shown`, etc.).
349    format!("session.label.{label}")
350}
351
352/// Minimal ULID-ish id — see the matching function in
353/// `zero/src/main.rs`. Copied (rather than exposed as a public
354/// helper) because session-sensitive crates should not fork the
355/// `ulid` crate transitively through `zero-tui`.
356fn new_ulid() -> String {
357    use std::time::{SystemTime, UNIX_EPOCH};
358    let ms = SystemTime::now()
359        .duration_since(UNIX_EPOCH)
360        .map(|d| d.as_millis())
361        .unwrap_or(0);
362    let rand = fastrand_hex(6);
363    format!("{ms:013x}{rand}")
364}
365
366fn fastrand_hex(n: usize) -> String {
367    use std::time::{SystemTime, UNIX_EPOCH};
368    let mut state: u64 = SystemTime::now()
369        .duration_since(UNIX_EPOCH)
370        .map_or(0x9E37_79B9_7F4A_7C15, |d| {
371            u64::try_from(d.as_nanos()).unwrap_or(0x9E37_79B9_7F4A_7C15)
372        });
373    (0..n)
374        .map(|_| {
375            state = state
376                .wrapping_mul(6_364_136_223_846_793_005)
377                .wrapping_add(1_442_695_040_888_963_407);
378            char::from_digit(u32::try_from((state >> 60) & 0xF).unwrap_or(0), 16).unwrap_or('0')
379        })
380        .collect()
381}
382
383// Unused — keeps the dead-code lint quiet since the rfc3339
384// parser helper previously exported here is no longer referenced
385// after the adapter refactor.
386#[allow(dead_code)]
387fn _nudge(_: DateTime<Utc>) {}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn adapter_current_ulid_tracks_active_session() {
395        let store = Arc::new(Store::open_in_memory().unwrap());
396        let id = store.start_session("01HX", None, "0.3.0", None).unwrap();
397        let sink = SessionSink::new(Arc::clone(&store), id, "01HX".into());
398        let adapter = sink.adapter();
399        assert_eq!(adapter.current_ulid().as_deref(), Some("01HX"));
400    }
401
402    #[test]
403    fn adapter_fork_swaps_active_ulid_and_links_parent() {
404        let store = Arc::new(Store::open_in_memory().unwrap());
405        let id = store
406            .start_session("01HPARENT", None, "0.3.0", None)
407            .unwrap();
408        let sink = SessionSink::new(Arc::clone(&store), id, "01HPARENT".into());
409        let adapter = sink.adapter();
410
411        let child_ulid = adapter
412            .fork_from_current()
413            .unwrap()
414            .expect("fork produced ulid");
415        // Active ulid must have swapped from both views.
416        assert_eq!(adapter.current_ulid(), Some(child_ulid.clone()));
417        assert_eq!(
418            sink.active.lock().unwrap().ulid.as_deref(),
419            Some(child_ulid.as_str()),
420            "sink must see the fork under it",
421        );
422
423        // Child row in DB should carry parent_ulid.
424        let child = store.get_session_by_ulid(&child_ulid).unwrap().unwrap();
425        assert_eq!(child.parent_ulid.as_deref(), Some("01HPARENT"));
426    }
427
428    #[test]
429    fn adapter_save_label_then_find_by_label() {
430        let store = Arc::new(Store::open_in_memory().unwrap());
431        let id = store.start_session("01HLBL", None, "0.3.0", None).unwrap();
432        let sink = SessionSink::new(Arc::clone(&store), id, "01HLBL".into());
433        let adapter = sink.adapter();
434
435        adapter.save_label("01HLBL", "pre-cpi").unwrap();
436        let hit = adapter.find("pre-cpi").unwrap();
437        assert_eq!(hit.ulid, "01HLBL");
438    }
439
440    #[test]
441    fn adapter_find_missing_returns_not_found() {
442        let store = Arc::new(Store::open_in_memory().unwrap());
443        let id = store.start_session("01HX", None, "0.3.0", None).unwrap();
444        let sink = SessionSink::new(store, id, "01HX".into());
445        let adapter = sink.adapter();
446        assert!(matches!(
447            adapter.find("nope").unwrap_err(),
448            CmdSessionError::NotFound
449        ));
450    }
451
452    #[test]
453    fn adapter_save_rejects_empty_label() {
454        let store = Arc::new(Store::open_in_memory().unwrap());
455        let id = store.start_session("01HE", None, "0.3.0", None).unwrap();
456        let sink = SessionSink::new(store, id, "01HE".into());
457        let adapter = sink.adapter();
458        assert!(matches!(
459            adapter.save_label("01HE", "   ").unwrap_err(),
460            CmdSessionError::Io(_)
461        ));
462    }
463}