Skip to main content

zenith_session/
tier2.rs

1//! Tier-2 durable version history: a bounded, flat list of state snapshots in
2//! `versions.jsonl` (the "light git per file" — a Google-Docs-style version
3//! panel, NOT a full VCS: no branches/merge). Each version is a full content
4//! snapshot (content-addressed); named versions carry a label. Restore returns
5//! a past version's content for the caller to write back to the `.zen`.
6
7use std::time::UNIX_EPOCH;
8
9use crate::adapter::{Clock, Fs};
10use crate::error::SessionError;
11use crate::layout::StorePaths;
12use crate::manifest::{CheckpointMeta, HistoryRecord, append_record, read_records};
13use crate::revspec::resolve_revspec;
14use crate::session::find_record;
15use crate::store::{get_object, object_hash, put_object_with_hash};
16
17// ── Outcome ───────────────────────────────────────────────────────────────────
18
19/// The outcome of a [`record_version`] call.
20#[derive(Debug, Clone, PartialEq)]
21pub enum VersionOutcome {
22    /// Content was byte-identical to the latest version; no new version created.
23    Unchanged,
24    /// A new version was recorded.
25    Recorded { id: String },
26}
27
28// ── Metadata ──────────────────────────────────────────────────────────────────
29
30/// Metadata recorded alongside a durable version: an optional human label, an
31/// optional op-kind tag, and optional agent-checkpoint metadata.
32#[derive(Debug, Clone, Copy, Default)]
33pub struct VersionMeta<'a> {
34    pub label: Option<&'a str>,
35    pub op_kind: Option<&'a str>,
36    pub checkpoint: Option<&'a CheckpointMeta>,
37}
38
39// ── Public API ────────────────────────────────────────────────────────────────
40
41/// List all durable versions for `doc_id`, oldest first (append order).
42pub fn list_versions(
43    fs: &impl Fs,
44    paths: &StorePaths,
45    doc_id: &str,
46) -> Result<Vec<HistoryRecord>, SessionError> {
47    read_records(fs, &paths.versions_file(doc_id))
48}
49
50/// Record `content` as a new durable version. `meta.label` names it (a named
51/// version is retained forever by the retention pass). `meta.op_kind` is an
52/// optional category label. UNNAMED (auto) versions deduplicate against the
53/// LATEST version: if `content` is byte-identical to it, the call returns
54/// `Unchanged` and appends nothing. A NAMED version (`meta.label.is_some()`) is
55/// an explicit checkpoint and is ALWAYS recorded, even when its content matches
56/// the latest version.
57pub fn record_version(
58    fs: &impl Fs,
59    paths: &StorePaths,
60    clock: &impl Clock,
61    doc_id: &str,
62    content: &[u8],
63    meta: VersionMeta<'_>,
64) -> Result<VersionOutcome, SessionError> {
65    let vpath = paths.versions_file(doc_id);
66    let versions = read_records(fs, &vpath)?;
67    let new_hash = object_hash(content);
68
69    // Dedup auto (unnamed) versions against the latest version (highest seq =
70    // last appended). A NAMED version is an explicit checkpoint and is always
71    // recorded, even when its content matches the latest version — the object
72    // store still dedups the bytes, so only a lightweight record is added.
73    if meta.label.is_none()
74        && let Some(last) = versions.last()
75        && last.snapshot == new_hash
76    {
77        return Ok(VersionOutcome::Unchanged);
78    }
79
80    // Store at the address we already computed for the dedup check above.
81    put_object_with_hash(fs, paths, doc_id, content, &new_hash)?;
82    let seq = u64::try_from(versions.len())
83        .map_err(|_| SessionError::new("version count exceeds u64"))?;
84    let id = format!("v{seq}");
85    let parent = versions.last().map(|r| r.id.clone());
86    let mut rec = HistoryRecord::new(id.clone(), seq, parent, new_hash);
87    rec.label = meta.label.map(str::to_owned);
88    rec.op_kind = meta.op_kind.map(str::to_owned);
89    rec.timestamp_ms = clock
90        .now()
91        .duration_since(UNIX_EPOCH)
92        .ok()
93        .map(|d| d.as_millis());
94    if let Some(cm) = meta.checkpoint {
95        rec.action_id = cm.action_id.clone();
96        rec.action_version = cm.action_version.clone();
97        rec.preview_hash = cm.preview_hash.clone();
98        rec.replay_eligible = cm.replay_eligible;
99    }
100    append_record(fs, &vpath, &rec)?;
101    Ok(VersionOutcome::Recorded { id })
102}
103
104/// The content of the version with the given id.
105pub fn version_content(
106    fs: &impl Fs,
107    paths: &StorePaths,
108    doc_id: &str,
109    version_id: &str,
110) -> Result<Vec<u8>, SessionError> {
111    let versions = read_records(fs, &paths.versions_file(doc_id))?;
112    let rec = find_record(&versions, version_id)
113        .ok_or_else(|| SessionError::new(format!("no version with id {version_id}")))?;
114    get_object(fs, paths, doc_id, &rec.snapshot)
115}
116
117/// Resolve a revspec against the version list (HEAD = the latest version) to a
118/// version id. Supports the same forms as the session resolver (`@head`,
119/// `@head~N`, seq, `@latest:<label>`, id/prefix, `@time:`).
120pub fn resolve_version(
121    fs: &impl Fs,
122    paths: &StorePaths,
123    doc_id: &str,
124    spec: &str,
125) -> Result<String, SessionError> {
126    let versions = read_records(fs, &paths.versions_file(doc_id))?;
127    let head = versions.last().map(|r| r.id.as_str());
128    resolve_revspec(&versions, head, spec)
129}
130
131/// Restore: resolve `spec` to a version and return its content (the caller writes
132/// it back to the `.zen`). Does NOT itself mutate the working file or record a new
133/// version — that is the caller's responsibility so the write-through stays in one
134/// place.
135pub fn restore_content(
136    fs: &impl Fs,
137    paths: &StorePaths,
138    doc_id: &str,
139    spec: &str,
140) -> Result<Vec<u8>, SessionError> {
141    let id = resolve_version(fs, paths, doc_id, spec)?;
142    version_content(fs, paths, doc_id, &id)
143}
144
145// ── Tests ─────────────────────────────────────────────────────────────────────
146
147#[cfg(test)]
148mod tests {
149    use std::time::Duration;
150
151    use super::*;
152    use crate::adapter::{FakeClock, MemFs};
153    use crate::layout::StorePaths;
154
155    fn setup() -> (MemFs, StorePaths) {
156        (MemFs::new(), StorePaths::new("/data"))
157    }
158
159    fn clock_at(ms: u64) -> FakeClock {
160        FakeClock(UNIX_EPOCH + Duration::from_millis(ms))
161    }
162
163    #[test]
164    fn first_version_recorded() {
165        let (fs, paths) = setup();
166        let clock = clock_at(100);
167        let outcome =
168            record_version(&fs, &paths, &clock, "doc1", b"v1", VersionMeta::default()).unwrap();
169        assert_eq!(
170            outcome,
171            VersionOutcome::Recorded {
172                id: "v0".to_owned()
173            }
174        );
175        let versions = list_versions(&fs, &paths, "doc1").unwrap();
176        assert_eq!(versions.len(), 1);
177        assert_eq!(version_content(&fs, &paths, "doc1", "v0").unwrap(), b"v1");
178    }
179
180    #[test]
181    fn dedup_latest() {
182        let (fs, paths) = setup();
183        let clock = clock_at(100);
184        record_version(&fs, &paths, &clock, "doc1", b"v1", VersionMeta::default()).unwrap();
185        let second =
186            record_version(&fs, &paths, &clock, "doc1", b"v1", VersionMeta::default()).unwrap();
187        assert_eq!(second, VersionOutcome::Unchanged);
188        let versions = list_versions(&fs, &paths, "doc1").unwrap();
189        assert_eq!(versions.len(), 1);
190    }
191
192    #[test]
193    fn named_version_not_deduped_when_content_matches() {
194        // Naming a checkpoint must always record a version, even when the content
195        // is identical to the latest auto-version (the label must not be dropped).
196        let (fs, paths) = setup();
197        let clock = clock_at(100);
198        record_version(&fs, &paths, &clock, "doc1", b"v1", VersionMeta::default()).unwrap();
199        let named = record_version(
200            &fs,
201            &paths,
202            &clock,
203            "doc1",
204            b"v1",
205            VersionMeta {
206                label: Some("release-1"),
207                ..Default::default()
208            },
209        )
210        .unwrap();
211        assert_eq!(
212            named,
213            VersionOutcome::Recorded {
214                id: "v1".to_owned()
215            }
216        );
217        let versions = list_versions(&fs, &paths, "doc1").unwrap();
218        assert_eq!(
219            versions.len(),
220            2,
221            "named checkpoint must append a new record"
222        );
223        assert_eq!(versions[1].label, Some("release-1".to_owned()));
224        // Both records share the same underlying object (bytes deduped in store).
225        assert_eq!(versions[0].snapshot, versions[1].snapshot);
226    }
227
228    #[test]
229    fn second_version_chains_parent() {
230        let (fs, paths) = setup();
231        let clock = clock_at(100);
232        record_version(&fs, &paths, &clock, "doc1", b"v1", VersionMeta::default()).unwrap();
233        record_version(&fs, &paths, &clock, "doc1", b"v2", VersionMeta::default()).unwrap();
234        let versions = list_versions(&fs, &paths, "doc1").unwrap();
235        assert_eq!(versions.len(), 2);
236        assert_eq!(versions[1].parent, Some("v0".to_owned()));
237        assert_eq!(version_content(&fs, &paths, "doc1", "v1").unwrap(), b"v2");
238    }
239
240    #[test]
241    fn named_version_stores_label() {
242        let (fs, paths) = setup();
243        let clock = clock_at(100);
244        record_version(
245            &fs,
246            &paths,
247            &clock,
248            "doc1",
249            b"v1",
250            VersionMeta {
251                label: Some("release-1.0"),
252                ..Default::default()
253            },
254        )
255        .unwrap();
256        let versions = list_versions(&fs, &paths, "doc1").unwrap();
257        assert_eq!(versions[0].label, Some("release-1.0".to_owned()));
258    }
259
260    #[test]
261    fn resolve_version_forms() {
262        let (fs, paths) = setup();
263        // v0 at 100ms, v1 at 200ms (label "rc1"), v2 at 300ms
264        record_version(
265            &fs,
266            &paths,
267            &clock_at(100),
268            "doc1",
269            b"content-0",
270            VersionMeta::default(),
271        )
272        .unwrap();
273        record_version(
274            &fs,
275            &paths,
276            &clock_at(200),
277            "doc1",
278            b"content-1",
279            VersionMeta {
280                label: Some("rc1"),
281                ..Default::default()
282            },
283        )
284        .unwrap();
285        record_version(
286            &fs,
287            &paths,
288            &clock_at(300),
289            "doc1",
290            b"content-2",
291            VersionMeta::default(),
292        )
293        .unwrap();
294
295        assert_eq!(resolve_version(&fs, &paths, "doc1", "@head").unwrap(), "v2");
296        assert_eq!(
297            resolve_version(&fs, &paths, "doc1", "@head~1").unwrap(),
298            "v1"
299        );
300        assert_eq!(resolve_version(&fs, &paths, "doc1", "1").unwrap(), "v1");
301        assert_eq!(
302            resolve_version(&fs, &paths, "doc1", "@latest:rc1").unwrap(),
303            "v1"
304        );
305    }
306
307    #[test]
308    fn restore_content_returns_past_bytes() {
309        let (fs, paths) = setup();
310        let clock = clock_at(100);
311        record_version(&fs, &paths, &clock, "doc1", b"A", VersionMeta::default()).unwrap();
312        record_version(&fs, &paths, &clock, "doc1", b"B", VersionMeta::default()).unwrap();
313        assert_eq!(
314            restore_content(&fs, &paths, "doc1", "@head~1").unwrap(),
315            b"A"
316        );
317        assert_eq!(restore_content(&fs, &paths, "doc1", "v1").unwrap(), b"B");
318    }
319
320    #[test]
321    fn restore_unknown_errors() {
322        let (fs, paths) = setup();
323        let clock = clock_at(100);
324        record_version(&fs, &paths, &clock, "doc1", b"A", VersionMeta::default()).unwrap();
325        assert!(restore_content(&fs, &paths, "doc1", "v99").is_err());
326    }
327
328    #[test]
329    fn list_empty() {
330        let (fs, paths) = setup();
331        let versions = list_versions(&fs, &paths, "doc1").unwrap();
332        assert!(versions.is_empty());
333    }
334
335    #[test]
336    fn checkpoint_metadata_is_persisted() {
337        let (fs, paths) = setup();
338        let clock = clock_at(100);
339        let cm = CheckpointMeta {
340            action_id: Some("act-99".to_string()),
341            action_version: Some("rev-2".to_string()),
342            preview_hash: Some("abc123".to_string()),
343            replay_eligible: true,
344        };
345        record_version(
346            &fs,
347            &paths,
348            &clock,
349            "doc1",
350            b"content",
351            VersionMeta {
352                checkpoint: Some(&cm),
353                ..Default::default()
354            },
355        )
356        .unwrap();
357        let versions = list_versions(&fs, &paths, "doc1").unwrap();
358        assert_eq!(versions.len(), 1);
359        assert_eq!(versions[0].action_id, Some("act-99".to_string()));
360        assert_eq!(versions[0].action_version, Some("rev-2".to_string()));
361        assert_eq!(versions[0].preview_hash, Some("abc123".to_string()));
362        assert!(versions[0].replay_eligible);
363    }
364
365    #[test]
366    fn no_checkpoint_leaves_fields_unset() {
367        let (fs, paths) = setup();
368        let clock = clock_at(100);
369        record_version(
370            &fs,
371            &paths,
372            &clock,
373            "doc1",
374            b"content",
375            VersionMeta::default(),
376        )
377        .unwrap();
378        let versions = list_versions(&fs, &paths, "doc1").unwrap();
379        assert_eq!(versions.len(), 1);
380        assert_eq!(versions[0].action_id, None);
381        assert_eq!(versions[0].action_version, None);
382        assert_eq!(versions[0].preview_hash, None);
383        assert!(!versions[0].replay_eligible);
384    }
385}