Skip to main content

lex_vcs/
op_log.rs

1//! Persistence + DAG queries for the operation log.
2//!
3//! Layout: `<root>/ops/<op_id>.json` — one canonical-JSON file
4//! per [`OperationRecord`]. Atomic writes via tempfile + rename.
5//! Idempotent: writing an existing op_id is a no-op (content
6//! addressing guarantees the bytes match).
7
8use crate::operation::{OpId, OperationRecord};
9use std::collections::{BTreeSet, VecDeque};
10use std::fs;
11use std::io::{self, Write};
12use std::path::{Path, PathBuf};
13
14pub struct OpLog {
15    dir: PathBuf,
16}
17
18impl OpLog {
19    pub fn open(root: &Path) -> io::Result<Self> {
20        let dir = root.join("ops");
21        fs::create_dir_all(&dir)?;
22        Ok(Self { dir })
23    }
24
25    fn path(&self, op_id: &OpId) -> PathBuf {
26        self.dir.join(format!("{op_id}.json"))
27    }
28
29    /// Persist a record. Idempotent on existing op_ids (the bytes
30    /// must match by content addressing).
31    ///
32    /// Crash safety: the tempfile's data is fsync'd before rename,
33    /// so a successful return implies a durable file at the final
34    /// path. The containing directory is not fsync'd; on a crash
35    /// between rename and the directory's metadata flush, the file
36    /// can be lost. For a content-addressed log this is acceptable
37    /// — a lost record can be re-derived from the same source — but
38    /// callers that *also* persist references to the op_id (e.g.
39    /// branch heads) should fsync those refs after `put` returns.
40    pub fn put(&self, rec: &OperationRecord) -> io::Result<()> {
41        let path = self.path(&rec.op_id);
42        if path.exists() {
43            return Ok(());
44        }
45        let bytes = serde_json::to_vec(rec)
46            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
47        let tmp = path.with_extension("json.tmp");
48        let mut f = fs::File::create(&tmp)?;
49        f.write_all(&bytes)?;
50        f.sync_all()?;
51        fs::rename(&tmp, &path)?;
52        Ok(())
53    }
54
55    pub fn get(&self, op_id: &OpId) -> io::Result<Option<OperationRecord>> {
56        let path = self.path(op_id);
57        if !path.exists() {
58            return Ok(None);
59        }
60        let bytes = fs::read(&path)?;
61        let rec: OperationRecord = serde_json::from_slice(&bytes)
62            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
63        Ok(Some(rec))
64    }
65
66    /// Remove a record from the log. Used by [`crate::migrate`] to
67    /// delete the old `<op_id>.json` files after a format migration
68    /// has written their replacements. Idempotent on missing files.
69    ///
70    /// **Not** part of the day-to-day op-log API — the log is
71    /// append-only by design (#129). The only legitimate caller is
72    /// the migration tool, which is supervising a destructive,
73    /// `--confirm`-gated batch.
74    pub fn delete(&self, op_id: &OpId) -> io::Result<()> {
75        let path = self.path(op_id);
76        match fs::remove_file(&path) {
77            Ok(()) => Ok(()),
78            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
79            Err(e) => Err(e),
80        }
81    }
82
83    /// Walk parents transitively. Newest-first, BFS, dedup'd by op_id.
84    /// Stops at parentless ops or after `limit` records.
85    pub fn walk_back(
86        &self,
87        head: &OpId,
88        limit: Option<usize>,
89    ) -> io::Result<Vec<OperationRecord>> {
90        let mut out = Vec::new();
91        let mut seen = BTreeSet::new();
92        let mut frontier: VecDeque<OpId> = VecDeque::from([head.clone()]);
93        while let Some(id) = frontier.pop_back() {
94            if !seen.insert(id.clone()) {
95                continue;
96            }
97            if let Some(rec) = self.get(&id)? {
98                // Push parents before recording so traversal order is
99                // a stable BFS-by-discovery: children-first, then their
100                // parents, parents of those, etc.
101                for p in &rec.op.parents {
102                    if !seen.contains(p) {
103                        frontier.push_front(p.clone());
104                    }
105                }
106                out.push(rec);
107                if let Some(n) = limit {
108                    if out.len() >= n {
109                        break;
110                    }
111                }
112            }
113        }
114        Ok(out)
115    }
116
117    /// Same set as walk_back but oldest-first. Used by branch_head
118    /// for left-to-right transition replay.
119    pub fn walk_forward(
120        &self,
121        head: &OpId,
122        limit: Option<usize>,
123    ) -> io::Result<Vec<OperationRecord>> {
124        let mut all = self.walk_back(head, None)?;
125        all.reverse();
126        if let Some(n) = limit {
127            all.truncate(n);
128        }
129        Ok(all)
130    }
131
132    /// Common ancestor of two op_ids in the DAG.
133    ///
134    /// On tree-shaped histories and chain merges this is the
135    /// **lowest** common ancestor — the closest shared op. On
136    /// criss-cross merges (two ops each with two parents from
137    /// independent histories) there can be multiple
138    /// incomparable common ancestors; this picks one
139    /// deterministically (the first hit when traversing `b`'s
140    /// ancestors newest-first), but not via a recursive merge.
141    /// `None` if no shared ancestor exists.
142    ///
143    /// Tier-1 merge in #129 covers linear and tree-shaped
144    /// histories; criss-cross resolution is deferred to a
145    /// future tier (Git's `recursive` strategy is the reference).
146    pub fn lca(&self, a: &OpId, b: &OpId) -> io::Result<Option<OpId>> {
147        let a_anc: BTreeSet<OpId> = self
148            .walk_back(a, None)?
149            .into_iter()
150            .map(|r| r.op_id)
151            .collect();
152        // Walk b's ancestors newest-first; first hit is the deepest
153        // common ancestor on tree-shaped histories. In criss-cross
154        // DAGs this picks deterministically but not via recursive
155        // resolution — see the doc comment above.
156        for rec in self.walk_back(b, None)? {
157            if a_anc.contains(&rec.op_id) {
158                return Ok(Some(rec.op_id));
159            }
160        }
161        Ok(None)
162    }
163
164    /// Every record in the log. Order is whatever the directory
165    /// listing produces — undefined and not stable. Used by the
166    /// [`crate::predicate`] evaluator when no narrower candidate
167    /// set is available.
168    pub fn list_all(&self) -> io::Result<Vec<OperationRecord>> {
169        let mut out = Vec::new();
170        for entry in fs::read_dir(&self.dir)? {
171            let entry = entry?;
172            let name = match entry.file_name().into_string() {
173                Ok(s) => s,
174                Err(_) => continue,
175            };
176            if let Some(id) = name.strip_suffix(".json") {
177                if let Some(rec) = self.get(&id.to_string())? {
178                    out.push(rec);
179                }
180            }
181        }
182        Ok(out)
183    }
184
185    /// Ops in `head`'s history that are not in `base`'s history.
186    /// `base = None` means "include all of head's history" (used for
187    /// independent-histories case where the LCA is None).
188    pub fn ops_since(
189        &self,
190        head: &OpId,
191        base: Option<&OpId>,
192    ) -> io::Result<Vec<OperationRecord>> {
193        let exclude: BTreeSet<OpId> = match base {
194            Some(b) => self
195                .walk_back(b, None)?
196                .into_iter()
197                .map(|r| r.op_id)
198                .collect(),
199            None => BTreeSet::new(),
200        };
201        Ok(self
202            .walk_back(head, None)?
203            .into_iter()
204            .filter(|r| !exclude.contains(&r.op_id))
205            .collect())
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use crate::operation::{Operation, OperationKind, StageTransition};
213    use std::collections::{BTreeMap, BTreeSet};
214
215    fn add_op() -> OperationRecord {
216        let op = Operation::new(
217            OperationKind::AddFunction {
218                sig_id: "fac::Int->Int".into(),
219                stage_id: "abc123".into(),
220                effects: BTreeSet::new(),
221                budget_cost: None,
222            },
223            [],
224        );
225        OperationRecord::new(
226            op,
227            StageTransition::Create {
228                sig_id: "fac::Int->Int".into(),
229                stage_id: "abc123".into(),
230            },
231        )
232    }
233
234    fn modify_op(parent: &OpId, sig: &str, from: &str, to: &str) -> OperationRecord {
235        let op = Operation::new(
236            OperationKind::ModifyBody {
237                sig_id: sig.into(),
238                from_stage_id: from.into(),
239                to_stage_id: to.into(),
240                from_budget: None,
241                to_budget: None,
242            },
243            [parent.clone()],
244        );
245        OperationRecord::new(
246            op,
247            StageTransition::Replace {
248                sig_id: sig.into(),
249                from: from.into(),
250                to: to.into(),
251            },
252        )
253    }
254
255    #[test]
256    fn put_then_get_round_trips() {
257        let tmp = tempfile::tempdir().unwrap();
258        let log = OpLog::open(tmp.path()).unwrap();
259        let rec = add_op();
260        log.put(&rec).unwrap();
261        let back = log.get(&rec.op_id).unwrap().unwrap();
262        assert_eq!(back, rec);
263    }
264
265    #[test]
266    fn put_is_idempotent() {
267        let tmp = tempfile::tempdir().unwrap();
268        let log = OpLog::open(tmp.path()).unwrap();
269        let rec = add_op();
270        log.put(&rec).unwrap();
271        log.put(&rec).unwrap(); // second write is a no-op
272        assert!(log.get(&rec.op_id).unwrap().is_some());
273    }
274
275    #[test]
276    fn get_missing_returns_none() {
277        let tmp = tempfile::tempdir().unwrap();
278        let log = OpLog::open(tmp.path()).unwrap();
279        assert!(log.get(&"deadbeef".to_string()).unwrap().is_none());
280    }
281
282    #[test]
283    fn walk_back_returns_newest_first() {
284        let tmp = tempfile::tempdir().unwrap();
285        let log = OpLog::open(tmp.path()).unwrap();
286        let a = add_op();
287        log.put(&a).unwrap();
288        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
289        log.put(&b).unwrap();
290        let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
291        log.put(&c).unwrap();
292
293        let walked = log.walk_back(&c.op_id, None).unwrap();
294        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
295        assert_eq!(
296            ids,
297            vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]
298        );
299    }
300
301    #[test]
302    fn walk_forward_returns_oldest_first() {
303        let tmp = tempfile::tempdir().unwrap();
304        let log = OpLog::open(tmp.path()).unwrap();
305        let a = add_op();
306        log.put(&a).unwrap();
307        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
308        log.put(&b).unwrap();
309
310        let walked = log.walk_forward(&b.op_id, None).unwrap();
311        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
312        assert_eq!(ids, vec![a.op_id.as_str(), b.op_id.as_str()]);
313    }
314
315    #[test]
316    fn lca_finds_common_ancestor() {
317        let tmp = tempfile::tempdir().unwrap();
318        let log = OpLog::open(tmp.path()).unwrap();
319        let root = add_op();
320        log.put(&root).unwrap();
321        let left = modify_op(&root.op_id, "fac::Int->Int", "abc123", "left1");
322        log.put(&left).unwrap();
323        let right = modify_op(&root.op_id, "fac::Int->Int", "abc123", "right1");
324        log.put(&right).unwrap();
325
326        let lca = log.lca(&left.op_id, &right.op_id).unwrap();
327        assert_eq!(lca, Some(root.op_id));
328    }
329
330    #[test]
331    fn lca_none_for_independent_histories() {
332        let tmp = tempfile::tempdir().unwrap();
333        let log = OpLog::open(tmp.path()).unwrap();
334        let a = add_op();
335        log.put(&a).unwrap();
336        // A second parentless op (different sig, so different op_id).
337        let b = OperationRecord::new(
338            Operation::new(
339                OperationKind::AddFunction {
340                    sig_id: "double::Int->Int".into(),
341                    stage_id: "ddd111".into(),
342                    effects: BTreeSet::new(),
343                    budget_cost: None,
344                },
345                [],
346            ),
347            StageTransition::Create {
348                sig_id: "double::Int->Int".into(),
349                stage_id: "ddd111".into(),
350            },
351        );
352        log.put(&b).unwrap();
353
354        assert_eq!(log.lca(&a.op_id, &b.op_id).unwrap(), None);
355    }
356
357    #[test]
358    fn ops_since_excludes_base_history() {
359        let tmp = tempfile::tempdir().unwrap();
360        let log = OpLog::open(tmp.path()).unwrap();
361        let a = add_op();
362        log.put(&a).unwrap();
363        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
364        log.put(&b).unwrap();
365        let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
366        log.put(&c).unwrap();
367
368        let since: Vec<_> = log
369            .ops_since(&c.op_id, Some(&a.op_id))
370            .unwrap()
371            .into_iter()
372            .map(|r| r.op_id)
373            .collect();
374        assert_eq!(since.len(), 2);
375        assert!(since.contains(&b.op_id));
376        assert!(since.contains(&c.op_id));
377        assert!(!since.contains(&a.op_id));
378    }
379
380    #[test]
381    fn walk_back_orders_ancestors_after_descendants() {
382        // Build a small DAG with a merge:
383        //
384        //     a
385        //    / \
386        //   b   c
387        //    \ /
388        //     m  (merge with parents [b, c])
389        //
390        // The merge engine relies on the property that any ancestor of
391        // X appears strictly after X in the walk_back output. Pin it.
392        let tmp = tempfile::tempdir().unwrap();
393        let log = OpLog::open(tmp.path()).unwrap();
394        let a = add_op();
395        log.put(&a).unwrap();
396        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
397        log.put(&b).unwrap();
398        let c = OperationRecord::new(
399            Operation::new(
400                OperationKind::ModifyBody {
401                    sig_id: "double::Int->Int".into(),
402                    from_stage_id: "ddd000".into(),
403                    to_stage_id: "c1".into(),
404                    from_budget: None,
405                    to_budget: None,
406                },
407                [a.op_id.clone()],
408            ),
409            StageTransition::Replace {
410                sig_id: "double::Int->Int".into(),
411                from: "ddd000".into(),
412                to: "c1".into(),
413            },
414        );
415        log.put(&c).unwrap();
416        let m = OperationRecord::new(
417            Operation::new(
418                OperationKind::Merge { resolved: 0 },
419                [b.op_id.clone(), c.op_id.clone()],
420            ),
421            StageTransition::Merge { entries: BTreeMap::new() },
422        );
423        log.put(&m).unwrap();
424
425        let walked = log.walk_back(&m.op_id, None).unwrap();
426        let pos = |id: &str| walked.iter().position(|r| r.op_id == id).unwrap();
427        let (m_pos, b_pos, c_pos, a_pos) =
428            (pos(&m.op_id), pos(&b.op_id), pos(&c.op_id), pos(&a.op_id));
429        // Each ancestor must appear strictly after its descendants.
430        assert!(m_pos < b_pos, "merge before its parent b");
431        assert!(m_pos < c_pos, "merge before its parent c");
432        assert!(b_pos < a_pos, "b before its parent a");
433        assert!(c_pos < a_pos, "c before its parent a");
434    }
435}