Skip to main content

lex_vcs/
op_log.rs

1//! Persistence + DAG queries for the operation log.
2//!
3//! Layout: `<root>/ops/<op_id>.json` — one canonical-JSON file
4//! per [`OperationRecord`]. Atomic writes via tempfile + rename.
5//! Idempotent: writing an existing op_id is a no-op (content
6//! addressing guarantees the bytes match).
7//!
8//! # Packfiles (#261 slice 1)
9//!
10//! Loose-file storage is fine to ~10k ops; past that the
11//! filesystem starts to thrash. [`OpLog::repack`] consolidates
12//! loose files into deterministic, content-addressed packfiles:
13//!
14//! - `<dir>/pack-<hash>.pack`: each record framed as `[8-byte BE
15//!   length][canonical JSON]`, ops sorted by op_id within the pack.
16//! - `<dir>/pack-<hash>.idx`: JSON map of `op_id` → byte offset
17//!   into the `.pack` (offset of the length header).
18//!
19//! Pack name is the SHA-256 of the sorted op_ids, newline-joined,
20//! so the same input set always produces the same pack hash —
21//! a re-run of `lex op repack` is a no-op.
22//!
23//! [`OpLog::get`] tries loose first, falls back to scanning all
24//! `.idx` files in the directory. The write path
25//! ([`OpLog::put`]) only ever writes loose; ops migrate into
26//! packs via the explicit [`OpLog::repack`] call.
27
28use crate::canonical::hash_bytes;
29use crate::operation::{OpId, OperationRecord};
30use std::collections::{BTreeMap, BTreeSet, VecDeque};
31use std::fs;
32use std::io::{self, Read, Seek, SeekFrom, Write};
33use std::path::{Path, PathBuf};
34
35pub struct OpLog {
36    dir: PathBuf,
37}
38
39impl OpLog {
40    pub fn open(root: &Path) -> io::Result<Self> {
41        let dir = root.join("ops");
42        fs::create_dir_all(&dir)?;
43        Ok(Self { dir })
44    }
45
46    fn path(&self, op_id: &OpId) -> PathBuf {
47        self.dir.join(format!("{op_id}.json"))
48    }
49
50    /// Persist a record. Idempotent on existing op_ids (the bytes
51    /// must match by content addressing).
52    ///
53    /// Crash safety: the tempfile's data is fsync'd before rename,
54    /// so a successful return implies a durable file at the final
55    /// path. The containing directory is not fsync'd; on a crash
56    /// between rename and the directory's metadata flush, the file
57    /// can be lost. For a content-addressed log this is acceptable
58    /// — a lost record can be re-derived from the same source — but
59    /// callers that *also* persist references to the op_id (e.g.
60    /// branch heads) should fsync those refs after `put` returns.
61    pub fn put(&self, rec: &OperationRecord) -> io::Result<()> {
62        let path = self.path(&rec.op_id);
63        if path.exists() {
64            return Ok(());
65        }
66        let bytes = serde_json::to_vec(rec)
67            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
68        let tmp = path.with_extension("json.tmp");
69        let mut f = fs::File::create(&tmp)?;
70        f.write_all(&bytes)?;
71        f.sync_all()?;
72        fs::rename(&tmp, &path)?;
73        Ok(())
74    }
75
76    pub fn get(&self, op_id: &OpId) -> io::Result<Option<OperationRecord>> {
77        let path = self.path(op_id);
78        if path.exists() {
79            let bytes = fs::read(&path)?;
80            let rec: OperationRecord = serde_json::from_slice(&bytes)
81                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
82            return Ok(Some(rec));
83        }
84        // Loose miss — scan packfiles. Each `.idx` is a tiny JSON
85        // map; small constant cost per pack. For larger stores we
86        // could maintain an in-memory cache keyed off pack mtimes,
87        // but slice 1 keeps it simple — measure before optimizing.
88        for pack_idx in self.list_pack_indices()? {
89            let idx = PackIndex::load(&pack_idx)?;
90            if let Some(&offset) = idx.ops.get(op_id) {
91                let pack_path = pack_idx.with_extension("pack");
92                return read_packed_op(&pack_path, offset).map(Some);
93            }
94        }
95        Ok(None)
96    }
97
98    /// Walk the directory for `pack-*.idx` files. Order is whatever
99    /// the filesystem gives us — `get` doesn't depend on it (op_ids
100    /// are unique by content addressing, so the right pack wins).
101    fn list_pack_indices(&self) -> io::Result<Vec<PathBuf>> {
102        let mut out = Vec::new();
103        for entry in fs::read_dir(&self.dir)? {
104            let entry = entry?;
105            let name = match entry.file_name().into_string() {
106                Ok(s) => s,
107                Err(_) => continue,
108            };
109            if name.starts_with("pack-") && name.ends_with(".idx") {
110                out.push(entry.path());
111            }
112        }
113        Ok(out)
114    }
115
116    /// Consolidate loose op records into a deterministic, content-
117    /// addressed packfile (#261 slice 1). Returns the number of
118    /// ops moved into the new pack.
119    ///
120    /// `threshold` is the minimum number of loose ops required to
121    /// trigger a repack — under that, returns `0` and leaves the
122    /// log alone. The idea: small stores stay loose; only repack
123    /// when the file count starts to matter.
124    ///
125    /// Determinism: the pack name is the SHA-256 of the sorted
126    /// op_ids (newline-joined), so two independent runs against the
127    /// same set of loose ops produce a byte-identical pack.
128    /// Re-running on an empty loose directory is a no-op.
129    ///
130    /// Crash safety: the `.pack.tmp` and `.idx.tmp` files are
131    /// fsync'd before rename; loose files are deleted only after
132    /// both renames succeed. A crash mid-repack leaves both loose
133    /// and partial-pack files; a subsequent `get` finds the loose
134    /// version, and a subsequent `repack` cleans up.
135    pub fn repack(&self, threshold: usize) -> io::Result<usize> {
136        let loose: Vec<(OpId, PathBuf)> = self.list_loose_files()?;
137        if loose.len() < threshold {
138            return Ok(0);
139        }
140        // Sort ops deterministically by op_id (lex order). The pack
141        // hash is the SHA-256 of those op_ids joined by newlines —
142        // same input → same name.
143        let mut ops: Vec<(OpId, Vec<u8>)> = Vec::with_capacity(loose.len());
144        for (op_id, path) in &loose {
145            let bytes = fs::read(path)?;
146            ops.push((op_id.clone(), bytes));
147        }
148        ops.sort_by(|a, b| a.0.cmp(&b.0));
149        let mut name_input = Vec::new();
150        for (id, _) in &ops {
151            name_input.extend_from_slice(id.as_bytes());
152            name_input.push(b'\n');
153        }
154        let pack_hash = hash_bytes(&name_input);
155        let pack_path = self.dir.join(format!("pack-{pack_hash}.pack"));
156        let idx_path = self.dir.join(format!("pack-{pack_hash}.idx"));
157        if pack_path.exists() && idx_path.exists() {
158            // Same input set — pack already exists. Just clean up
159            // the loose duplicates.
160            let count = ops.len();
161            for (_, path) in &loose {
162                let _ = fs::remove_file(path);
163            }
164            return Ok(count);
165        }
166
167        // Write `<pack>.pack.tmp` framed as [8-byte BE length][JSON]
168        // for each record; record offsets for the index.
169        let pack_tmp = pack_path.with_extension("pack.tmp");
170        let idx_tmp = idx_path.with_extension("idx.tmp");
171        let mut offsets: BTreeMap<OpId, u64> = BTreeMap::new();
172        {
173            let mut f = fs::File::create(&pack_tmp)?;
174            let mut cursor: u64 = 0;
175            for (op_id, bytes) in &ops {
176                offsets.insert(op_id.clone(), cursor);
177                let len = bytes.len() as u64;
178                f.write_all(&len.to_be_bytes())?;
179                f.write_all(bytes)?;
180                cursor += 8 + len;
181            }
182            f.sync_all()?;
183        }
184        // Write the index. JSON for inspectability and
185        // forward-compat (we can add fields without breaking
186        // readers).
187        let idx = PackIndex { version: 1, ops: offsets };
188        idx.save(&idx_tmp)?;
189
190        fs::rename(&pack_tmp, &pack_path)?;
191        fs::rename(&idx_tmp, &idx_path)?;
192
193        // Now safe to delete the loose files — pack is durable.
194        let count = ops.len();
195        for (_, path) in &loose {
196            let _ = fs::remove_file(path);
197        }
198        Ok(count)
199    }
200
201    /// Remove every op_id in `victims` from the log, across both
202    /// loose files and packfiles (#261 slice 2). Used by
203    /// `lex op gc` after a retention plan identifies which ops to
204    /// drop. Idempotent — calling twice with the same set is a
205    /// no-op on the second pass.
206    ///
207    /// Pack handling: any pack containing one or more victims is
208    /// rewritten to a new content-addressed pack with only the
209    /// surviving ops; the old pack and its index file are deleted.
210    /// A pack whose every op is a victim is deleted outright.
211    ///
212    /// Returns the count of ops actually removed (loose files
213    /// deleted + packed ops dropped). Pre-existing absences don't
214    /// contribute.
215    pub fn evict(&self, victims: &BTreeSet<OpId>) -> io::Result<usize> {
216        if victims.is_empty() {
217            return Ok(0);
218        }
219        let mut removed = 0;
220        // Loose files: just delete the matching `<op_id>.json`.
221        for (op_id, path) in self.list_loose_files()? {
222            if victims.contains(&op_id) {
223                match fs::remove_file(&path) {
224                    Ok(()) => removed += 1,
225                    Err(e) if e.kind() == io::ErrorKind::NotFound => {}
226                    Err(e) => return Err(e),
227                }
228            }
229        }
230        // Packs: rewrite each affected pack with only surviving ops.
231        for pack_idx in self.list_pack_indices()? {
232            let idx = PackIndex::load(&pack_idx)?;
233            let pack_path = pack_idx.with_extension("pack");
234            let touched = idx.ops.keys().any(|op_id| victims.contains(op_id));
235            if !touched {
236                continue;
237            }
238            // Read every surviving op out, then drop the old pack.
239            let mut survivors: Vec<(OpId, Vec<u8>)> = Vec::new();
240            for (op_id, &offset) in &idx.ops {
241                if victims.contains(op_id) {
242                    removed += 1;
243                    continue;
244                }
245                let rec = read_packed_op(&pack_path, offset)?;
246                let bytes = serde_json::to_vec(&rec)
247                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
248                survivors.push((op_id.clone(), bytes));
249            }
250            // Delete old pack + idx first; we re-emit a fresh
251            // (different-hash) pack from the survivors below.
252            let _ = fs::remove_file(&pack_path);
253            let _ = fs::remove_file(&pack_idx);
254            if survivors.is_empty() {
255                continue;
256            }
257            self.write_pack_from_survivors(survivors)?;
258        }
259        Ok(removed)
260    }
261
262    /// Helper: write a new content-addressed pack from
263    /// already-serialized op bytes. Same shape as
264    /// [`Self::repack`]'s output path; factored out so
265    /// [`Self::evict`] can reuse it.
266    fn write_pack_from_survivors(
267        &self,
268        mut ops: Vec<(OpId, Vec<u8>)>,
269    ) -> io::Result<()> {
270        ops.sort_by(|a, b| a.0.cmp(&b.0));
271        let mut name_input = Vec::new();
272        for (id, _) in &ops {
273            name_input.extend_from_slice(id.as_bytes());
274            name_input.push(b'\n');
275        }
276        let pack_hash = hash_bytes(&name_input);
277        let pack_path = self.dir.join(format!("pack-{pack_hash}.pack"));
278        let idx_path = self.dir.join(format!("pack-{pack_hash}.idx"));
279        if pack_path.exists() && idx_path.exists() {
280            return Ok(());
281        }
282        let pack_tmp = pack_path.with_extension("pack.tmp");
283        let idx_tmp = idx_path.with_extension("idx.tmp");
284        let mut offsets: BTreeMap<OpId, u64> = BTreeMap::new();
285        {
286            let mut f = fs::File::create(&pack_tmp)?;
287            let mut cursor: u64 = 0;
288            for (op_id, bytes) in &ops {
289                offsets.insert(op_id.clone(), cursor);
290                let len = bytes.len() as u64;
291                f.write_all(&len.to_be_bytes())?;
292                f.write_all(bytes)?;
293                cursor += 8 + len;
294            }
295            f.sync_all()?;
296        }
297        let idx = PackIndex { version: 1, ops: offsets };
298        idx.save(&idx_tmp)?;
299        fs::rename(&pack_tmp, &pack_path)?;
300        fs::rename(&idx_tmp, &idx_path)?;
301        Ok(())
302    }
303
304    /// Enumerate every loose `<op_id>.json` in the ops directory.
305    /// Used by [`Self::repack`] and [`Self::list_all`].
306    fn list_loose_files(&self) -> io::Result<Vec<(OpId, PathBuf)>> {
307        let mut out = Vec::new();
308        for entry in fs::read_dir(&self.dir)? {
309            let entry = entry?;
310            let name = match entry.file_name().into_string() {
311                Ok(s) => s,
312                Err(_) => continue,
313            };
314            if let Some(id) = name.strip_suffix(".json") {
315                if !id.starts_with("pack-") {
316                    out.push((id.to_string(), entry.path()));
317                }
318            }
319        }
320        Ok(out)
321    }
322
323    /// Remove a record from the log. Used by [`crate::migrate`] to
324    /// delete the old `<op_id>.json` files after a format migration
325    /// has written their replacements. Idempotent on missing files.
326    ///
327    /// **Not** part of the day-to-day op-log API — the log is
328    /// append-only by design (#129). The only legitimate caller is
329    /// the migration tool, which is supervising a destructive,
330    /// `--confirm`-gated batch.
331    pub fn delete(&self, op_id: &OpId) -> io::Result<()> {
332        let path = self.path(op_id);
333        match fs::remove_file(&path) {
334            Ok(()) => Ok(()),
335            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
336            Err(e) => Err(e),
337        }
338    }
339
340    /// Walk parents transitively. Newest-first, BFS, dedup'd by op_id.
341    /// Stops at parentless ops or after `limit` records.
342    pub fn walk_back(
343        &self,
344        head: &OpId,
345        limit: Option<usize>,
346    ) -> io::Result<Vec<OperationRecord>> {
347        let mut out = Vec::new();
348        let mut seen = BTreeSet::new();
349        let mut frontier: VecDeque<OpId> = VecDeque::from([head.clone()]);
350        while let Some(id) = frontier.pop_back() {
351            if !seen.insert(id.clone()) {
352                continue;
353            }
354            if let Some(rec) = self.get(&id)? {
355                // Push parents before recording so traversal order is
356                // a stable BFS-by-discovery: children-first, then their
357                // parents, parents of those, etc.
358                for p in &rec.op.parents {
359                    if !seen.contains(p) {
360                        frontier.push_front(p.clone());
361                    }
362                }
363                out.push(rec);
364                if let Some(n) = limit {
365                    if out.len() >= n {
366                        break;
367                    }
368                }
369            }
370        }
371        Ok(out)
372    }
373
374    /// Same set as walk_back but oldest-first. Used by branch_head
375    /// for left-to-right transition replay.
376    pub fn walk_forward(
377        &self,
378        head: &OpId,
379        limit: Option<usize>,
380    ) -> io::Result<Vec<OperationRecord>> {
381        let mut all = self.walk_back(head, None)?;
382        all.reverse();
383        if let Some(n) = limit {
384            all.truncate(n);
385        }
386        Ok(all)
387    }
388
389    /// Common ancestor of two op_ids in the DAG.
390    ///
391    /// On tree-shaped histories and chain merges this is the
392    /// **lowest** common ancestor — the closest shared op. On
393    /// criss-cross merges (two ops each with two parents from
394    /// independent histories) there can be multiple
395    /// incomparable common ancestors; this picks one
396    /// deterministically (the first hit when traversing `b`'s
397    /// ancestors newest-first), but not via a recursive merge.
398    /// `None` if no shared ancestor exists.
399    ///
400    /// Tier-1 merge in #129 covers linear and tree-shaped
401    /// histories; criss-cross resolution is deferred to a
402    /// future tier (Git's `recursive` strategy is the reference).
403    pub fn lca(&self, a: &OpId, b: &OpId) -> io::Result<Option<OpId>> {
404        let a_anc: BTreeSet<OpId> = self
405            .walk_back(a, None)?
406            .into_iter()
407            .map(|r| r.op_id)
408            .collect();
409        // Walk b's ancestors newest-first; first hit is the deepest
410        // common ancestor on tree-shaped histories. In criss-cross
411        // DAGs this picks deterministically but not via recursive
412        // resolution — see the doc comment above.
413        for rec in self.walk_back(b, None)? {
414            if a_anc.contains(&rec.op_id) {
415                return Ok(Some(rec.op_id));
416            }
417        }
418        Ok(None)
419    }
420
421    /// Every record in the log. Order is whatever the directory
422    /// listing produces — undefined and not stable. Used by the
423    /// [`crate::predicate`] evaluator when no narrower candidate
424    /// set is available.
425    pub fn list_all(&self) -> io::Result<Vec<OperationRecord>> {
426        let mut out = Vec::new();
427        let mut seen: BTreeSet<OpId> = BTreeSet::new();
428        // Loose first so dedup wins for them on collision (loose
429        // and pack should never both exist for the same op_id post-
430        // repack, but during an interrupted repack both can be
431        // present transiently).
432        for (id, _) in self.list_loose_files()? {
433            if let Some(rec) = self.get(&id)? {
434                if seen.insert(rec.op_id.clone()) {
435                    out.push(rec);
436                }
437            }
438        }
439        for pack_idx in self.list_pack_indices()? {
440            let idx = PackIndex::load(&pack_idx)?;
441            let pack_path = pack_idx.with_extension("pack");
442            for (op_id, &offset) in &idx.ops {
443                if seen.insert(op_id.clone()) {
444                    out.push(read_packed_op(&pack_path, offset)?);
445                }
446            }
447        }
448        Ok(out)
449    }
450
451    /// Ops in `head`'s history that are not in `base`'s history.
452    /// `base = None` means "include all of head's history" (used for
453    /// independent-histories case where the LCA is None).
454    pub fn ops_since(
455        &self,
456        head: &OpId,
457        base: Option<&OpId>,
458    ) -> io::Result<Vec<OperationRecord>> {
459        let exclude: BTreeSet<OpId> = match base {
460            Some(b) => self
461                .walk_back(b, None)?
462                .into_iter()
463                .map(|r| r.op_id)
464                .collect(),
465            None => BTreeSet::new(),
466        };
467        Ok(self
468            .walk_back(head, None)?
469            .into_iter()
470            .filter(|r| !exclude.contains(&r.op_id))
471            .collect())
472    }
473}
474
475/// Sidecar index for a packfile. Maps `op_id` to the byte offset
476/// of the record's length header inside the `.pack`. JSON for
477/// inspectability and forward-compat.
478#[derive(serde::Serialize, serde::Deserialize)]
479struct PackIndex {
480    version: u32,
481    ops: BTreeMap<OpId, u64>,
482}
483
484impl PackIndex {
485    fn load(path: &Path) -> io::Result<Self> {
486        let bytes = fs::read(path)?;
487        serde_json::from_slice(&bytes)
488            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
489    }
490
491    fn save(&self, path: &Path) -> io::Result<()> {
492        let bytes = serde_json::to_vec(self)
493            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
494        let mut f = fs::File::create(path)?;
495        f.write_all(&bytes)?;
496        f.sync_all()?;
497        Ok(())
498    }
499}
500
501/// Read one record from a packfile at `offset`. The record is
502/// framed as `[8-byte BE length][canonical JSON]`.
503fn read_packed_op(pack_path: &Path, offset: u64) -> io::Result<OperationRecord> {
504    let mut f = fs::File::open(pack_path)?;
505    f.seek(SeekFrom::Start(offset))?;
506    let mut len_buf = [0u8; 8];
507    f.read_exact(&mut len_buf)?;
508    let len = u64::from_be_bytes(len_buf) as usize;
509    let mut buf = vec![0u8; len];
510    f.read_exact(&mut buf)?;
511    serde_json::from_slice(&buf)
512        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518    use crate::operation::{Operation, OperationKind, StageTransition};
519    use std::collections::{BTreeMap, BTreeSet};
520
521    fn add_op() -> OperationRecord {
522        let op = Operation::new(
523            OperationKind::AddFunction {
524                sig_id: "fac::Int->Int".into(),
525                stage_id: "abc123".into(),
526                effects: BTreeSet::new(),
527                budget_cost: None,
528            },
529            [],
530        );
531        OperationRecord::new(
532            op,
533            StageTransition::Create {
534                sig_id: "fac::Int->Int".into(),
535                stage_id: "abc123".into(),
536            },
537        )
538    }
539
540    fn modify_op(parent: &OpId, sig: &str, from: &str, to: &str) -> OperationRecord {
541        let op = Operation::new(
542            OperationKind::ModifyBody {
543                sig_id: sig.into(),
544                from_stage_id: from.into(),
545                to_stage_id: to.into(),
546                from_budget: None,
547                to_budget: None,
548            },
549            [parent.clone()],
550        );
551        OperationRecord::new(
552            op,
553            StageTransition::Replace {
554                sig_id: sig.into(),
555                from: from.into(),
556                to: to.into(),
557            },
558        )
559    }
560
561    #[test]
562    fn put_then_get_round_trips() {
563        let tmp = tempfile::tempdir().unwrap();
564        let log = OpLog::open(tmp.path()).unwrap();
565        let rec = add_op();
566        log.put(&rec).unwrap();
567        let back = log.get(&rec.op_id).unwrap().unwrap();
568        assert_eq!(back, rec);
569    }
570
571    #[test]
572    fn put_is_idempotent() {
573        let tmp = tempfile::tempdir().unwrap();
574        let log = OpLog::open(tmp.path()).unwrap();
575        let rec = add_op();
576        log.put(&rec).unwrap();
577        log.put(&rec).unwrap(); // second write is a no-op
578        assert!(log.get(&rec.op_id).unwrap().is_some());
579    }
580
581    #[test]
582    fn get_missing_returns_none() {
583        let tmp = tempfile::tempdir().unwrap();
584        let log = OpLog::open(tmp.path()).unwrap();
585        assert!(log.get(&"deadbeef".to_string()).unwrap().is_none());
586    }
587
588    #[test]
589    fn walk_back_returns_newest_first() {
590        let tmp = tempfile::tempdir().unwrap();
591        let log = OpLog::open(tmp.path()).unwrap();
592        let a = add_op();
593        log.put(&a).unwrap();
594        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
595        log.put(&b).unwrap();
596        let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
597        log.put(&c).unwrap();
598
599        let walked = log.walk_back(&c.op_id, None).unwrap();
600        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
601        assert_eq!(
602            ids,
603            vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]
604        );
605    }
606
607    #[test]
608    fn walk_forward_returns_oldest_first() {
609        let tmp = tempfile::tempdir().unwrap();
610        let log = OpLog::open(tmp.path()).unwrap();
611        let a = add_op();
612        log.put(&a).unwrap();
613        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
614        log.put(&b).unwrap();
615
616        let walked = log.walk_forward(&b.op_id, None).unwrap();
617        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
618        assert_eq!(ids, vec![a.op_id.as_str(), b.op_id.as_str()]);
619    }
620
621    #[test]
622    fn lca_finds_common_ancestor() {
623        let tmp = tempfile::tempdir().unwrap();
624        let log = OpLog::open(tmp.path()).unwrap();
625        let root = add_op();
626        log.put(&root).unwrap();
627        let left = modify_op(&root.op_id, "fac::Int->Int", "abc123", "left1");
628        log.put(&left).unwrap();
629        let right = modify_op(&root.op_id, "fac::Int->Int", "abc123", "right1");
630        log.put(&right).unwrap();
631
632        let lca = log.lca(&left.op_id, &right.op_id).unwrap();
633        assert_eq!(lca, Some(root.op_id));
634    }
635
636    #[test]
637    fn lca_none_for_independent_histories() {
638        let tmp = tempfile::tempdir().unwrap();
639        let log = OpLog::open(tmp.path()).unwrap();
640        let a = add_op();
641        log.put(&a).unwrap();
642        // A second parentless op (different sig, so different op_id).
643        let b = OperationRecord::new(
644            Operation::new(
645                OperationKind::AddFunction {
646                    sig_id: "double::Int->Int".into(),
647                    stage_id: "ddd111".into(),
648                    effects: BTreeSet::new(),
649                    budget_cost: None,
650                },
651                [],
652            ),
653            StageTransition::Create {
654                sig_id: "double::Int->Int".into(),
655                stage_id: "ddd111".into(),
656            },
657        );
658        log.put(&b).unwrap();
659
660        assert_eq!(log.lca(&a.op_id, &b.op_id).unwrap(), None);
661    }
662
663    #[test]
664    fn ops_since_excludes_base_history() {
665        let tmp = tempfile::tempdir().unwrap();
666        let log = OpLog::open(tmp.path()).unwrap();
667        let a = add_op();
668        log.put(&a).unwrap();
669        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
670        log.put(&b).unwrap();
671        let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
672        log.put(&c).unwrap();
673
674        let since: Vec<_> = log
675            .ops_since(&c.op_id, Some(&a.op_id))
676            .unwrap()
677            .into_iter()
678            .map(|r| r.op_id)
679            .collect();
680        assert_eq!(since.len(), 2);
681        assert!(since.contains(&b.op_id));
682        assert!(since.contains(&c.op_id));
683        assert!(!since.contains(&a.op_id));
684    }
685
686    #[test]
687    fn repack_consolidates_loose_files_into_a_pack() {
688        let tmp = tempfile::tempdir().unwrap();
689        let log = OpLog::open(tmp.path()).unwrap();
690        let a = add_op();
691        log.put(&a).unwrap();
692        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
693        log.put(&b).unwrap();
694
695        let n = log.repack(0).unwrap();  // threshold 0 = always
696        assert_eq!(n, 2);
697        let ops_dir = tmp.path().join("ops");
698        let loose: Vec<_> = fs::read_dir(&ops_dir).unwrap()
699            .filter_map(|e| e.ok())
700            .filter(|e| e.path().extension().is_some_and(|x| x == "json"))
701            .filter(|e| !e.file_name().to_string_lossy().starts_with("pack-"))
702            .collect();
703        assert!(loose.is_empty(), "loose .json files should be deleted");
704        let packs: Vec<_> = fs::read_dir(&ops_dir).unwrap()
705            .filter_map(|e| e.ok())
706            .filter(|e| e.path().extension().is_some_and(|x| x == "pack"))
707            .collect();
708        assert_eq!(packs.len(), 1);
709
710        // After repack, get() must still return both ops via the pack.
711        assert_eq!(log.get(&a.op_id).unwrap().unwrap(), a);
712        assert_eq!(log.get(&b.op_id).unwrap().unwrap(), b);
713    }
714
715    #[test]
716    fn repack_below_threshold_is_a_noop() {
717        let tmp = tempfile::tempdir().unwrap();
718        let log = OpLog::open(tmp.path()).unwrap();
719        log.put(&add_op()).unwrap();
720        let n = log.repack(10).unwrap();
721        assert_eq!(n, 0);
722    }
723
724    #[test]
725    fn repack_is_deterministic_on_same_input() {
726        // Two stores with the same loose ops repack to the same
727        // pack hash — content addressing all the way down.
728        let make_log = || {
729            let tmp = tempfile::tempdir().unwrap();
730            let log = OpLog::open(tmp.path()).unwrap();
731            let a = add_op();
732            log.put(&a).unwrap();
733            let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
734            log.put(&b).unwrap();
735            log.repack(0).unwrap();
736            (tmp, log)
737        };
738        let (tmp1, _log1) = make_log();
739        let (tmp2, _log2) = make_log();
740        let pack_name = |dir: &std::path::Path| -> String {
741            fs::read_dir(dir.join("ops")).unwrap()
742                .filter_map(|e| e.ok())
743                .find(|e| e.path().extension().is_some_and(|x| x == "pack"))
744                .unwrap()
745                .file_name().into_string().unwrap()
746        };
747        assert_eq!(pack_name(tmp1.path()), pack_name(tmp2.path()));
748    }
749
750    #[test]
751    fn walk_back_works_across_loose_and_packed_ops() {
752        // Pack the older history, leave newer ops loose. walk_back
753        // must traverse seamlessly.
754        let tmp = tempfile::tempdir().unwrap();
755        let log = OpLog::open(tmp.path()).unwrap();
756        let a = add_op();
757        log.put(&a).unwrap();
758        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
759        log.put(&b).unwrap();
760        log.repack(0).unwrap();
761        // Now add a newer op as a loose file.
762        let c = modify_op(&b.op_id, "fac::Int->Int", "b1", "c1");
763        log.put(&c).unwrap();
764
765        let walked = log.walk_back(&c.op_id, None).unwrap();
766        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
767        assert_eq!(ids, vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]);
768    }
769
770    #[test]
771    fn list_all_dedups_across_loose_and_pack() {
772        let tmp = tempfile::tempdir().unwrap();
773        let log = OpLog::open(tmp.path()).unwrap();
774        let a = add_op();
775        log.put(&a).unwrap();
776        log.repack(0).unwrap();
777        // Re-put the same op as a loose file (simulate an
778        // interrupted repack). list_all should still report
779        // exactly one record per op_id.
780        log.put(&a).unwrap();
781
782        let all = log.list_all().unwrap();
783        assert_eq!(all.len(), 1);
784        assert_eq!(all[0].op_id, a.op_id);
785    }
786
787    #[test]
788    fn walk_back_orders_ancestors_after_descendants() {
789        // Build a small DAG with a merge:
790        //
791        //     a
792        //    / \
793        //   b   c
794        //    \ /
795        //     m  (merge with parents [b, c])
796        //
797        // The merge engine relies on the property that any ancestor of
798        // X appears strictly after X in the walk_back output. Pin it.
799        let tmp = tempfile::tempdir().unwrap();
800        let log = OpLog::open(tmp.path()).unwrap();
801        let a = add_op();
802        log.put(&a).unwrap();
803        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
804        log.put(&b).unwrap();
805        let c = OperationRecord::new(
806            Operation::new(
807                OperationKind::ModifyBody {
808                    sig_id: "double::Int->Int".into(),
809                    from_stage_id: "ddd000".into(),
810                    to_stage_id: "c1".into(),
811                    from_budget: None,
812                    to_budget: None,
813                },
814                [a.op_id.clone()],
815            ),
816            StageTransition::Replace {
817                sig_id: "double::Int->Int".into(),
818                from: "ddd000".into(),
819                to: "c1".into(),
820            },
821        );
822        log.put(&c).unwrap();
823        let m = OperationRecord::new(
824            Operation::new(
825                OperationKind::Merge { resolved: 0 },
826                [b.op_id.clone(), c.op_id.clone()],
827            ),
828            StageTransition::Merge { entries: BTreeMap::new() },
829        );
830        log.put(&m).unwrap();
831
832        let walked = log.walk_back(&m.op_id, None).unwrap();
833        let pos = |id: &str| walked.iter().position(|r| r.op_id == id).unwrap();
834        let (m_pos, b_pos, c_pos, a_pos) =
835            (pos(&m.op_id), pos(&b.op_id), pos(&c.op_id), pos(&a.op_id));
836        // Each ancestor must appear strictly after its descendants.
837        assert!(m_pos < b_pos, "merge before its parent b");
838        assert!(m_pos < c_pos, "merge before its parent c");
839        assert!(b_pos < a_pos, "b before its parent a");
840        assert!(c_pos < a_pos, "c before its parent a");
841    }
842}