Skip to main content

zenith_session/
session.rs

1//! Tier-1 ephemeral session: a content-addressed snapshot DAG with a HEAD
2//! pointer and a redo stack, persisted under `session_dir(doc_id)`.
3//!
4//! Records are STATE snapshots (full content), never operation logs — so an
5//! external change (git checkout, hand-edit) is captured as a normal snapshot
6//! and is therefore undoable. Undo/redo (next unit) are pure pointer moves over
7//! this DAG; this unit provides the state file plus state-recording and
8//! current-content readback.
9
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use std::time::UNIX_EPOCH;
13
14use crate::adapter::{Clock, Fs, Rng};
15use crate::error::SessionError;
16use crate::layout::StorePaths;
17use crate::manifest::{HistoryRecord, append_record, read_records};
18use crate::store::{get_object, object_hash, put_object_with_hash};
19
20// ── Persisted pointer state ────────────────────────────────────────────────────
21
22/// The mutable pointer state for a session: HEAD and the redo stack.
23/// Persisted to `state.json` inside `session_dir(doc_id)`.
24#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
25pub struct SessionState {
26    /// Current record id (HEAD). None = empty session (no states yet).
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub head: Option<String>,
29    /// Record ids available for redo, most-recently-undone LAST (stack top = end).
30    #[serde(default, skip_serializing_if = "Vec::is_empty")]
31    pub redo: Vec<String>,
32}
33
34// ── Outcome ────────────────────────────────────────────────────────────────────
35
36/// The outcome of a [`record_state`] call.
37#[derive(Debug, Clone, PartialEq)]
38pub enum RecordOutcome {
39    /// Content was byte-identical to HEAD's snapshot; no new record was created.
40    Unchanged,
41    /// A new state was recorded; HEAD advanced to this record id.
42    Recorded { id: String },
43}
44
45// ── Private path helpers ───────────────────────────────────────────────────────
46
47fn state_path(paths: &StorePaths, doc_id: &str) -> PathBuf {
48    paths.session_dir(doc_id).join("state.json")
49}
50
51pub(crate) fn journal_path(paths: &StorePaths, doc_id: &str) -> PathBuf {
52    paths.session_dir(doc_id).join("journal.jsonl")
53}
54
55// ── State load / save ──────────────────────────────────────────────────────────
56
57pub(crate) fn load_state(
58    fs: &impl Fs,
59    paths: &StorePaths,
60    doc_id: &str,
61) -> Result<SessionState, SessionError> {
62    let p = state_path(paths, doc_id);
63    if !fs.exists(&p) {
64        return Ok(SessionState::default());
65    }
66    let bytes = fs.read(&p)?;
67    serde_json::from_slice(&bytes)
68        .map_err(|e| SessionError::new(format!("parse session state: {e}")))
69}
70
71pub(crate) fn save_state(
72    fs: &impl Fs,
73    paths: &StorePaths,
74    doc_id: &str,
75    state: &SessionState,
76) -> Result<(), SessionError> {
77    let p = state_path(paths, doc_id);
78    fs.create_dir_all(&paths.session_dir(doc_id))?;
79    let bytes = serde_json::to_vec_pretty(state)
80        .map_err(|e| SessionError::new(format!("serialize session state: {e}")))?;
81    fs.write(&p, &bytes)
82}
83
84// ── Record lookup helper ───────────────────────────────────────────────────────
85
86/// Linear search for a record by id. Returns `None` if not found.
87pub(crate) fn find_record<'a>(records: &'a [HistoryRecord], id: &str) -> Option<&'a HistoryRecord> {
88    records.iter().find(|r| r.id == id)
89}
90
91// ── Public API ─────────────────────────────────────────────────────────────────
92
93/// Record the current `content` as a new state snapshot for `doc_id`.
94///
95/// `op_kind` is an optional label (e.g. "edit", "external") stored on the
96/// record; it does NOT affect undo/redo. Returns `Unchanged` if `content` is
97/// byte-identical to the current HEAD snapshot (dedup), else `Recorded { id }`
98/// with HEAD advanced and the redo stack cleared.
99pub fn record_state(
100    fs: &impl Fs,
101    paths: &StorePaths,
102    clock: &impl Clock,
103    _rng: &impl Rng,
104    doc_id: &str,
105    content: &[u8],
106    op_kind: Option<&str>,
107) -> Result<RecordOutcome, SessionError> {
108    let mut state = load_state(fs, paths, doc_id)?;
109    let jpath = journal_path(paths, doc_id);
110    let records = read_records(fs, &jpath)?;
111
112    // Dedup against the CURRENT head snapshot.
113    let new_hash = object_hash(content);
114    if let Some(head_id) = &state.head
115        && let Some(head_rec) = find_record(&records, head_id)
116        && head_rec.snapshot == new_hash
117    {
118        return Ok(RecordOutcome::Unchanged);
119    }
120
121    // Store the object (dedups bytes) and append a new record. The address is
122    // the hash we already computed for the dedup check above.
123    put_object_with_hash(fs, paths, doc_id, content, &new_hash)?;
124    let seq = u64::try_from(records.len())
125        .map_err(|_| SessionError::new("session record count exceeds u64"))?;
126    let id = format!("r{seq}");
127    let mut rec = HistoryRecord::new(id.clone(), seq, state.head.clone(), new_hash);
128    rec.op_kind = op_kind.map(str::to_owned);
129    rec.timestamp_ms = clock
130        .now()
131        .duration_since(UNIX_EPOCH)
132        .ok()
133        .map(|d| d.as_millis());
134    append_record(fs, &jpath, &rec)?;
135
136    state.head = Some(id.clone());
137    state.redo.clear();
138    save_state(fs, paths, doc_id, &state)?;
139    Ok(RecordOutcome::Recorded { id })
140}
141
142/// The full content of the current HEAD snapshot, or `None` if the session is
143/// empty (no states recorded yet).
144pub fn current_content(
145    fs: &impl Fs,
146    paths: &StorePaths,
147    doc_id: &str,
148) -> Result<Option<Vec<u8>>, SessionError> {
149    let state = load_state(fs, paths, doc_id)?;
150    let head_id = match state.head {
151        Some(h) => h,
152        None => return Ok(None),
153    };
154    let records = read_records(fs, &journal_path(paths, doc_id))?;
155    let rec = find_record(&records, &head_id).ok_or_else(|| {
156        SessionError::new(format!("session HEAD points to unknown record: {head_id}"))
157    })?;
158    let content = get_object(fs, paths, doc_id, &rec.snapshot)?;
159    Ok(Some(content))
160}
161
162/// Undo: move HEAD to its parent snapshot. Returns the content now at HEAD, or
163/// `None` if the session is empty or already at the root (nothing to undo). The
164/// record we left is pushed onto the redo stack so [`redo`] can return to it.
165pub fn undo(
166    fs: &impl Fs,
167    paths: &StorePaths,
168    doc_id: &str,
169) -> Result<Option<Vec<u8>>, SessionError> {
170    let mut state = load_state(fs, paths, doc_id)?;
171    let head_id = match state.head.as_deref() {
172        Some(h) => h,
173        None => return Ok(None),
174    };
175    let records = read_records(fs, &journal_path(paths, doc_id))?;
176    let rec = find_record(&records, head_id).ok_or_else(|| {
177        SessionError::new(format!("session HEAD points to unknown record: {head_id}"))
178    })?;
179    let parent = match rec.parent.clone() {
180        Some(p) => p,
181        None => return Ok(None), // at root; nothing to undo, HEAD unchanged
182    };
183    state.redo.push(head_id.to_owned());
184    state.head = Some(parent);
185    save_state(fs, paths, doc_id, &state)?;
186    current_content(fs, paths, doc_id)
187}
188
189/// Redo: move HEAD forward to the most-recently-undone snapshot. Returns the
190/// content now at HEAD, or `None` if the redo stack is empty.
191pub fn redo(
192    fs: &impl Fs,
193    paths: &StorePaths,
194    doc_id: &str,
195) -> Result<Option<Vec<u8>>, SessionError> {
196    let mut state = load_state(fs, paths, doc_id)?;
197    let target = match state.redo.pop() {
198        Some(t) => t,
199        None => return Ok(None),
200    };
201    state.head = Some(target);
202    save_state(fs, paths, doc_id, &state)?;
203    current_content(fs, paths, doc_id)
204}
205
206/// Discard all Tier-1 session state for `doc_id` (called on workspace close).
207/// Idempotent: a no-op if no session exists. Durable Tier-2 history is unaffected.
208pub fn clear_session(fs: &impl Fs, paths: &StorePaths, doc_id: &str) -> Result<(), SessionError> {
209    let dir = paths.session_dir(doc_id);
210    if fs.exists(&dir) {
211        fs.remove(&dir)?;
212    }
213    Ok(())
214}
215
216// ── Tests ──────────────────────────────────────────────────────────────────────
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::adapter::{FakeClock, FakeRng, MemFs};
222
223    fn setup() -> (MemFs, StorePaths, FakeClock, FakeRng) {
224        (
225            MemFs::new(),
226            StorePaths::new("/data"),
227            FakeClock(std::time::SystemTime::UNIX_EPOCH),
228            FakeRng(0),
229        )
230    }
231
232    #[test]
233    fn first_record_sets_head() {
234        let (fs, paths, clock, rng) = setup();
235        let outcome = record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap();
236        assert_eq!(
237            outcome,
238            RecordOutcome::Recorded {
239                id: "r0".to_string()
240            }
241        );
242        let content = current_content(&fs, &paths, "doc1").unwrap();
243        assert_eq!(content, Some(b"v1".to_vec()));
244        let state = load_state(&fs, &paths, "doc1").unwrap();
245        assert_eq!(state.head, Some("r0".to_string()));
246        assert!(state.redo.is_empty());
247    }
248
249    #[test]
250    fn dedup_identical_head() {
251        let (fs, paths, clock, rng) = setup();
252        let first = record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap();
253        assert_eq!(
254            first,
255            RecordOutcome::Recorded {
256                id: "r0".to_string()
257            }
258        );
259        let second = record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap();
260        assert_eq!(second, RecordOutcome::Unchanged);
261        let jpath = journal_path(&paths, "doc1");
262        let records = read_records(&fs, &jpath).unwrap();
263        assert_eq!(records.len(), 1);
264    }
265
266    #[test]
267    fn second_distinct_record_advances_head_and_chains_parent() {
268        let (fs, paths, clock, rng) = setup();
269        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap();
270        let outcome = record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap();
271        assert_eq!(
272            outcome,
273            RecordOutcome::Recorded {
274                id: "r1".to_string()
275            }
276        );
277        let content = current_content(&fs, &paths, "doc1").unwrap();
278        assert_eq!(content, Some(b"v2".to_vec()));
279        let jpath = journal_path(&paths, "doc1");
280        let records = read_records(&fs, &jpath).unwrap();
281        assert_eq!(records.len(), 2);
282        let r1 = find_record(&records, "r1").unwrap();
283        assert_eq!(r1.parent, Some("r0".to_string()));
284    }
285
286    #[test]
287    fn op_kind_is_stored() {
288        let (fs, paths, clock, rng) = setup();
289        record_state(&fs, &paths, &clock, &rng, "doc1", b"data", Some("external")).unwrap();
290        let jpath = journal_path(&paths, "doc1");
291        let records = read_records(&fs, &jpath).unwrap();
292        assert_eq!(records.len(), 1);
293        assert_eq!(records[0].op_kind, Some("external".to_string()));
294    }
295
296    #[test]
297    fn current_content_empty_session() {
298        let (fs, paths, _clock, _rng) = setup();
299        let result = current_content(&fs, &paths, "doc1").unwrap();
300        assert_eq!(result, None);
301    }
302
303    #[test]
304    fn new_record_clears_redo() {
305        let (fs, paths, clock, rng) = setup();
306        // First record so HEAD is set (required for redo to mean anything).
307        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap();
308        // Hand-craft a state with a non-empty redo stack.
309        let mut state = load_state(&fs, &paths, "doc1").unwrap();
310        state.redo = vec!["rX".to_string()];
311        save_state(&fs, &paths, "doc1", &state).unwrap();
312        // Record a new distinct state — must clear redo.
313        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap();
314        let reloaded = load_state(&fs, &paths, "doc1").unwrap();
315        assert!(reloaded.redo.is_empty());
316    }
317
318    #[test]
319    fn recording_returns_same_object_for_identical_content_across_branches() {
320        let (fs, paths, clock, rng) = setup();
321        // r0: content A
322        record_state(&fs, &paths, &clock, &rng, "doc1", b"A", None).unwrap();
323        // r1: content B
324        record_state(&fs, &paths, &clock, &rng, "doc1", b"B", None).unwrap();
325        // r2: content A again — distinct from HEAD (B), so a new record is created.
326        let outcome = record_state(&fs, &paths, &clock, &rng, "doc1", b"A", None).unwrap();
327        assert_eq!(
328            outcome,
329            RecordOutcome::Recorded {
330                id: "r2".to_string()
331            }
332        );
333        let jpath = journal_path(&paths, "doc1");
334        let records = read_records(&fs, &jpath).unwrap();
335        assert_eq!(records.len(), 3);
336        // r0 and r2 both point to the same object hash (content dedup).
337        let r0 = find_record(&records, "r0").unwrap();
338        let r2 = find_record(&records, "r2").unwrap();
339        assert_eq!(r0.snapshot, r2.snapshot);
340        // But they are distinct records with distinct ids.
341        assert_ne!(r0.id, r2.id);
342    }
343
344    #[test]
345    fn undo_moves_to_parent() {
346        let (fs, paths, clock, rng) = setup();
347        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
348        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap(); // r1
349        let content = undo(&fs, &paths, "doc1").unwrap();
350        assert_eq!(content, Some(b"v1".to_vec()));
351        let state = load_state(&fs, &paths, "doc1").unwrap();
352        assert_eq!(state.head, Some("r0".to_string()));
353        assert_eq!(state.redo, vec!["r1".to_string()]);
354        assert_eq!(
355            current_content(&fs, &paths, "doc1").unwrap(),
356            Some(b"v1".to_vec())
357        );
358    }
359
360    #[test]
361    fn undo_at_root_returns_none_and_keeps_head() {
362        let (fs, paths, clock, rng) = setup();
363        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
364        let result = undo(&fs, &paths, "doc1").unwrap();
365        assert_eq!(result, None);
366        let state = load_state(&fs, &paths, "doc1").unwrap();
367        assert_eq!(state.head, Some("r0".to_string()));
368        assert!(state.redo.is_empty());
369    }
370
371    #[test]
372    fn undo_empty_session_is_none() {
373        let (fs, paths, _clock, _rng) = setup();
374        let result = undo(&fs, &paths, "doc1").unwrap();
375        assert_eq!(result, None);
376    }
377
378    #[test]
379    fn redo_returns_to_undone_state() {
380        let (fs, paths, clock, rng) = setup();
381        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
382        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap(); // r1
383        undo(&fs, &paths, "doc1").unwrap(); // back to r0
384        let content = redo(&fs, &paths, "doc1").unwrap();
385        assert_eq!(content, Some(b"v2".to_vec()));
386        let state = load_state(&fs, &paths, "doc1").unwrap();
387        assert_eq!(state.head, Some("r1".to_string()));
388        assert!(state.redo.is_empty());
389    }
390
391    #[test]
392    fn redo_without_undo_is_none() {
393        let (fs, paths, clock, rng) = setup();
394        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
395        let result = redo(&fs, &paths, "doc1").unwrap();
396        assert_eq!(result, None);
397    }
398
399    #[test]
400    fn undo_undo_undo_redo_undo_sequence() {
401        let (fs, paths, clock, rng) = setup();
402        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
403        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap(); // r1
404        record_state(&fs, &paths, &clock, &rng, "doc1", b"v3", None).unwrap(); // r2
405        record_state(&fs, &paths, &clock, &rng, "doc1", b"v4", None).unwrap(); // r3
406        // undo x3 → r0 (v1)
407        undo(&fs, &paths, "doc1").unwrap(); // r3 → r2
408        undo(&fs, &paths, "doc1").unwrap(); // r2 → r1
409        let after_third_undo = undo(&fs, &paths, "doc1").unwrap(); // r1 → r0
410        assert_eq!(after_third_undo, Some(b"v1".to_vec()));
411        // redo → r1 (v2)
412        let after_redo = redo(&fs, &paths, "doc1").unwrap();
413        assert_eq!(after_redo, Some(b"v2".to_vec()));
414        // undo → r0 (v1)
415        let after_final_undo = undo(&fs, &paths, "doc1").unwrap();
416        assert_eq!(after_final_undo, Some(b"v1".to_vec()));
417    }
418
419    #[test]
420    fn new_edit_after_undo_clears_redo_and_branches() {
421        let (fs, paths, clock, rng) = setup();
422        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
423        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap(); // r1
424        undo(&fs, &paths, "doc1").unwrap(); // back to r0; redo = [r1]
425        let outcome = record_state(&fs, &paths, &clock, &rng, "doc1", b"v3", None).unwrap(); // r2
426        assert_eq!(
427            outcome,
428            RecordOutcome::Recorded {
429                id: "r2".to_string()
430            }
431        );
432        assert_eq!(
433            current_content(&fs, &paths, "doc1").unwrap(),
434            Some(b"v3".to_vec())
435        );
436        // redo stack must be empty after a new record
437        let redo_result = redo(&fs, &paths, "doc1").unwrap();
438        assert_eq!(redo_result, None);
439        let state = load_state(&fs, &paths, "doc1").unwrap();
440        assert!(state.redo.is_empty());
441    }
442
443    #[test]
444    fn round_trip_external_change_is_undoable() {
445        let (fs, paths, clock, rng) = setup();
446        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap(); // r0
447        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap(); // r1
448        // Simulate an external revert: record the same bytes as r0 with op_kind "external".
449        let outcome =
450            record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", Some("external")).unwrap(); // r2
451        let r2_id = match outcome {
452            RecordOutcome::Recorded { ref id } => id.clone(),
453            RecordOutcome::Unchanged => panic!("expected Recorded"),
454        };
455        // Verify op_kind and object dedup.
456        let jpath = journal_path(&paths, "doc1");
457        let records = read_records(&fs, &jpath).unwrap();
458        let r0 = find_record(&records, "r0").unwrap();
459        let r2 = find_record(&records, &r2_id).unwrap();
460        assert_eq!(r2.op_kind, Some("external".to_string()));
461        assert_eq!(r2.snapshot, r0.snapshot); // same bytes → same object hash
462        assert_eq!(
463            current_content(&fs, &paths, "doc1").unwrap(),
464            Some(b"v1".to_vec())
465        );
466        // undo r2 → r1 (v2)
467        let after_first_undo = undo(&fs, &paths, "doc1").unwrap();
468        assert_eq!(after_first_undo, Some(b"v2".to_vec()));
469        // undo r1 → r0 (v1)
470        let after_second_undo = undo(&fs, &paths, "doc1").unwrap();
471        assert_eq!(after_second_undo, Some(b"v1".to_vec()));
472    }
473
474    #[test]
475    fn clear_session_removes_all_state() {
476        let (fs, paths, clock, rng) = setup();
477        record_state(&fs, &paths, &clock, &rng, "doc1", b"v1", None).unwrap();
478        record_state(&fs, &paths, &clock, &rng, "doc1", b"v2", None).unwrap();
479        clear_session(&fs, &paths, "doc1").unwrap();
480        let state = load_state(&fs, &paths, "doc1").unwrap();
481        assert_eq!(state, SessionState::default());
482        assert_eq!(current_content(&fs, &paths, "doc1").unwrap(), None);
483        // Idempotent: second call is also Ok.
484        clear_session(&fs, &paths, "doc1").unwrap();
485    }
486}