Skip to main content

zero_commands/
session.rs

1//! Session-store abstraction for the dispatcher.
2//!
3//! The session cohort commands (`/sessions`, `/resume`, `/fork`,
4//! `/save`) need read/write access to the on-disk session store.
5//! We reach it through a trait (rather than a hard dependency on
6//! `zero-session`) for the same reason dispatch reaches
7//! operator-state through [`crate::StateSource`]: keep
8//! `zero-commands` pluggable, testable without SQLite, and free of
9//! migrations noise.
10//!
11//! Data crossing the trait is plain Rust — `String` ulids, epoch
12//! milliseconds, and the crate-local [`ReplayKind`] enum. Callers
13//! translate to / from their durable types (`zero_session::EventKind`,
14//! `chrono::DateTime<Utc>`) at the boundary.
15//!
16//! Error policy mirrors `StateSource`: the trait returns `Result`
17//! so SQLite-backed impls can surface IO failures, but the
18//! dispatcher wraps every call in a function that downgrades the
19//! error to an [`OutputLine::Alert`]. A DB hiccup must never take
20//! down the TUI — the operator still needs to read the engine.
21
22use std::fmt;
23
24/// One session row surfaced to the dispatcher. Minimal by design:
25/// everything beyond what `/sessions` renders is looked up on
26/// demand through [`SessionSource::list_events`].
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct SessionSummary {
29    /// Stable public identifier. What `/resume <arg>` takes as
30    /// input and `/sessions` shows first.
31    pub ulid: String,
32    /// Epoch milliseconds of `started_at`. Callers format for
33    /// display; keeping it as a raw integer means the trait is
34    /// `chrono`-free.
35    pub started_at_ms: i64,
36    /// Epoch milliseconds of `ended_at`, or `None` if the session
37    /// was interrupted (crash, kill -9). A missing `ended_at` is
38    /// load-bearing — it tells the operator a session did not
39    /// wrap cleanly, which is information they need when deciding
40    /// whether to `/resume` or `/fork`.
41    pub ended_at_ms: Option<i64>,
42    /// Engine base URL at session start. Surfaced so the operator
43    /// can spot cross-environment mismatches ("why is this
44    /// resuming from a paper-trading URL?") before replaying.
45    pub engine_base_url: Option<String>,
46    /// CLI version string at session start. Same rationale as
47    /// `engine_base_url`: out-of-date sessions produce subtle
48    /// replay surprises and the operator deserves a heads-up.
49    pub cli_version: String,
50    /// Parent ulid when this session was created via `/fork`.
51    /// Rendered as a `parent:<ulid>` tag in the `/sessions` list.
52    pub parent_ulid: Option<String>,
53    /// Number of events in the session. `-1` means the impl
54    /// could not count cheaply; renderers should omit the count
55    /// rather than show a lying zero.
56    pub n_events: i64,
57}
58
59/// Categorisation of a replayed event, stripped of the concrete
60/// `EventKind` enum in `zero-session`. The dispatcher hands these
61/// to the TUI which renders them through its own
62/// [`EntryKind`-equivalent] palette.
63///
64/// Adding a variant here is a two-site change (this enum + the
65/// two translation sites in the runtime impl); leaving the
66/// surface narrow is worth the minor duplication.
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum ReplayKind {
69    Prompt,
70    System,
71    Command,
72    Warn,
73    Alert,
74}
75
76/// One replayed event. `at_ms` is the original wall-clock
77/// timestamp, so "resuming: X prior events, most recent 14m ago"
78/// reads from a truthful clock source rather than a rendered
79/// approximation.
80#[derive(Debug, Clone, PartialEq, Eq)]
81pub struct ReplayEvent {
82    pub kind: ReplayKind,
83    pub at_ms: i64,
84    pub text: String,
85}
86
87/// Errors a `SessionSource` can surface.
88///
89/// Deliberately tiny: the dispatcher only needs to know whether a
90/// requested id was missing (for a friendly "no such session"
91/// line) vs. something else went wrong (alert the operator).
92#[derive(Debug, Clone)]
93pub enum SessionError {
94    NotFound,
95    Io(String),
96}
97
98impl fmt::Display for SessionError {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        match self {
101            Self::NotFound => write!(f, "session not found"),
102            Self::Io(s) => write!(f, "session store error: {s}"),
103        }
104    }
105}
106
107impl std::error::Error for SessionError {}
108
109/// The dispatcher's view of the session store. Implemented by the
110/// TUI (wrapping a `zero_session::Store` + a label index on top
111/// of milestones) and by test scaffolding that keeps every call
112/// in-memory.
113///
114/// All methods take `&self` — impls are expected to handle
115/// interior mutability. The SQLite-backed impl already does, via
116/// the store's own `Mutex<Connection>`.
117pub trait SessionSource: Send + Sync + 'static {
118    /// Ulid of the session currently being recorded into, if any.
119    /// `None` when persistence is disabled (`--no-persist`) — the
120    /// dispatcher surfaces a clear "persistence disabled" line
121    /// rather than pretending the save went through.
122    fn current_ulid(&self) -> Option<String>;
123
124    /// Newest-first list of up to `limit` sessions.
125    ///
126    /// # Errors
127    /// Propagates backend IO errors; a listing that errors cannot
128    /// be partially rendered without inviting mismatched counts.
129    fn list(&self, limit: u32) -> Result<Vec<SessionSummary>, SessionError>;
130
131    /// Look up a single session by ulid **or** a human-assigned
132    /// label (see [`Self::save_label`]). Impls should match ulid
133    /// first (exact prefix match on at least 6 chars is fine —
134    /// ulids are time-sortable so a prefix rarely collides) and
135    /// fall through to label resolution.
136    ///
137    /// # Errors
138    /// Returns [`SessionError::NotFound`] when no row matches the
139    /// argument. Any other error variant is a backend failure.
140    fn find(&self, needle: &str) -> Result<SessionSummary, SessionError>;
141
142    /// All events for a session in seq order (oldest first), up
143    /// to `limit`. Mirrors `zero_session::Store::list_events`'s
144    /// semantics but returns this crate's [`ReplayEvent`].
145    ///
146    /// # Errors
147    /// Propagates backend IO errors.
148    fn list_events(&self, ulid: &str, limit: u32) -> Result<Vec<ReplayEvent>, SessionError>;
149
150    /// Associate a short human label with a session ulid. Labels
151    /// are stored as milestones (`session.label.<label>` →
152    /// `<ulid>`) so they survive CLI restarts and are queryable
153    /// without schema changes. Overwriting a label is fine — the
154    /// old one is silently reassigned.
155    ///
156    /// # Errors
157    /// Propagates backend IO errors.
158    fn save_label(&self, ulid: &str, label: &str) -> Result<(), SessionError>;
159
160    /// Start a new session whose `parent_ulid` is the current
161    /// one. The impl becomes the authority for the new session's
162    /// ulid; returns it so the dispatcher can echo the fork line.
163    /// `None` is returned when persistence is disabled.
164    ///
165    /// # Errors
166    /// Propagates backend IO errors.
167    fn fork_from_current(&self) -> Result<Option<String>, SessionError>;
168}
169
170#[cfg(test)]
171pub(crate) mod test_support {
172    //! In-memory `SessionSource` for dispatcher tests. Concrete
173    //! enough to exercise argument parsing + every error path —
174    //! no SQLite, no async, no tempdir. Mirror the runtime impl
175    //! close enough that parity between the two is easy to eyeball.
176
177    use super::*;
178    use std::sync::Mutex;
179
180    #[derive(Default, Debug)]
181    pub struct MockSessions {
182        pub inner: Mutex<MockInner>,
183    }
184
185    #[derive(Default, Debug)]
186    pub struct MockInner {
187        pub sessions: Vec<SessionSummary>,
188        pub events: std::collections::HashMap<String, Vec<ReplayEvent>>,
189        pub labels: std::collections::HashMap<String, String>,
190        pub current: Option<String>,
191        /// When set, every call returns this error instead of the
192        /// normal result. Used to prove the dispatcher's error
193        /// paths surface alerts without crashing.
194        pub fail_with: Option<SessionError>,
195    }
196
197    impl MockSessions {
198        pub fn with_current(ulid: &str) -> Self {
199            Self {
200                inner: Mutex::new(MockInner {
201                    current: Some(ulid.to_string()),
202                    ..MockInner::default()
203                }),
204            }
205        }
206
207        pub fn insert(&self, summary: SessionSummary, events: Vec<ReplayEvent>) {
208            let mut g = self.inner.lock().unwrap();
209            g.events.insert(summary.ulid.clone(), events);
210            g.sessions.push(summary);
211            // Keep list_sessions' newest-first invariant.
212            g.sessions
213                .sort_by(|a, b| b.started_at_ms.cmp(&a.started_at_ms));
214        }
215    }
216
217    impl SessionSource for MockSessions {
218        fn current_ulid(&self) -> Option<String> {
219            self.inner.lock().unwrap().current.clone()
220        }
221
222        fn list(&self, limit: u32) -> Result<Vec<SessionSummary>, SessionError> {
223            let g = self.inner.lock().unwrap();
224            if let Some(e) = g.fail_with.clone() {
225                return Err(e);
226            }
227            Ok(g.sessions
228                .iter()
229                .take(usize::try_from(limit).unwrap_or(usize::MAX))
230                .cloned()
231                .collect())
232        }
233
234        fn find(&self, needle: &str) -> Result<SessionSummary, SessionError> {
235            let g = self.inner.lock().unwrap();
236            if let Some(e) = g.fail_with.clone() {
237                return Err(e);
238            }
239            // Label first so the mock matches the "exact ulid wins,
240            // label resolves otherwise" rule we want at runtime.
241            let ulid = g
242                .labels
243                .get(needle)
244                .cloned()
245                .or_else(|| {
246                    g.sessions
247                        .iter()
248                        .find(|s| s.ulid == needle || s.ulid.starts_with(needle))
249                        .map(|s| s.ulid.clone())
250                })
251                .ok_or(SessionError::NotFound)?;
252            g.sessions
253                .iter()
254                .find(|s| s.ulid == ulid)
255                .cloned()
256                .ok_or(SessionError::NotFound)
257        }
258
259        fn list_events(&self, ulid: &str, limit: u32) -> Result<Vec<ReplayEvent>, SessionError> {
260            let g = self.inner.lock().unwrap();
261            if let Some(e) = g.fail_with.clone() {
262                return Err(e);
263            }
264            Ok(g.events
265                .get(ulid)
266                .cloned()
267                .unwrap_or_default()
268                .into_iter()
269                .take(usize::try_from(limit).unwrap_or(usize::MAX))
270                .collect())
271        }
272
273        fn save_label(&self, ulid: &str, label: &str) -> Result<(), SessionError> {
274            let mut g = self.inner.lock().unwrap();
275            if let Some(e) = g.fail_with.clone() {
276                return Err(e);
277            }
278            g.labels.insert(label.to_string(), ulid.to_string());
279            Ok(())
280        }
281
282        fn fork_from_current(&self) -> Result<Option<String>, SessionError> {
283            let mut g = self.inner.lock().unwrap();
284            if let Some(e) = g.fail_with.clone() {
285                return Err(e);
286            }
287            let Some(parent) = g.current.clone() else {
288                return Ok(None);
289            };
290            let child = format!("{parent}-fork");
291            let next_ts = g.sessions.first().map_or(0, |s| s.started_at_ms + 1);
292            g.sessions.insert(
293                0,
294                SessionSummary {
295                    ulid: child.clone(),
296                    started_at_ms: next_ts,
297                    ended_at_ms: None,
298                    engine_base_url: None,
299                    cli_version: "test".into(),
300                    parent_ulid: Some(parent),
301                    n_events: 0,
302                },
303            );
304            g.current = Some(child.clone());
305            Ok(Some(child))
306        }
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::test_support::MockSessions;
313    use super::*;
314
315    #[test]
316    fn mock_list_returns_newest_first_and_respects_limit() {
317        let m = MockSessions::with_current("01HA");
318        m.insert(
319            SessionSummary {
320                ulid: "01HA".into(),
321                started_at_ms: 1,
322                ended_at_ms: None,
323                engine_base_url: None,
324                cli_version: "0.3.0".into(),
325                parent_ulid: None,
326                n_events: 0,
327            },
328            vec![],
329        );
330        m.insert(
331            SessionSummary {
332                ulid: "01HB".into(),
333                started_at_ms: 2,
334                ended_at_ms: Some(3),
335                engine_base_url: None,
336                cli_version: "0.3.0".into(),
337                parent_ulid: None,
338                n_events: 2,
339            },
340            vec![],
341        );
342
343        let rows = m.list(10).unwrap();
344        assert_eq!(
345            rows.iter().map(|s| s.ulid.as_str()).collect::<Vec<_>>(),
346            vec!["01HB", "01HA"]
347        );
348        assert_eq!(m.list(1).unwrap().len(), 1);
349    }
350
351    #[test]
352    fn mock_find_resolves_ulid_and_label() {
353        let m = MockSessions::with_current("01HA");
354        m.insert(
355            SessionSummary {
356                ulid: "01HA".into(),
357                started_at_ms: 1,
358                ended_at_ms: None,
359                engine_base_url: None,
360                cli_version: "0.3.0".into(),
361                parent_ulid: None,
362                n_events: 0,
363            },
364            vec![],
365        );
366        m.save_label("01HA", "scratch").unwrap();
367        assert_eq!(m.find("01HA").unwrap().ulid, "01HA");
368        assert_eq!(m.find("scratch").unwrap().ulid, "01HA");
369        assert!(matches!(
370            m.find("nope").unwrap_err(),
371            SessionError::NotFound
372        ));
373    }
374
375    #[test]
376    fn mock_fork_returns_none_when_no_current_session() {
377        let m = MockSessions {
378            inner: std::sync::Mutex::new(super::test_support::MockInner::default()),
379        };
380        assert_eq!(m.fork_from_current().unwrap(), None);
381    }
382}