Skip to main content

lex_vcs/
op_log.rs

1//! Persistence + DAG queries for the operation log.
2//!
3//! Layout: `<root>/ops/<op_id>.json` — one canonical-JSON file
4//! per [`OperationRecord`]. Atomic writes via tempfile + rename.
5//! Idempotent: writing an existing op_id is a no-op (content
6//! addressing guarantees the bytes match).
7
8use crate::operation::{OpId, OperationRecord};
9use std::collections::{BTreeSet, VecDeque};
10use std::fs;
11use std::io::{self, Write};
12use std::path::{Path, PathBuf};
13
14pub struct OpLog {
15    dir: PathBuf,
16}
17
18impl OpLog {
19    pub fn open(root: &Path) -> io::Result<Self> {
20        let dir = root.join("ops");
21        fs::create_dir_all(&dir)?;
22        Ok(Self { dir })
23    }
24
25    fn path(&self, op_id: &OpId) -> PathBuf {
26        self.dir.join(format!("{op_id}.json"))
27    }
28
29    /// Persist a record. Idempotent on existing op_ids (the bytes
30    /// must match by content addressing).
31    ///
32    /// Crash safety: the tempfile's data is fsync'd before rename,
33    /// so a successful return implies a durable file at the final
34    /// path. The containing directory is not fsync'd; on a crash
35    /// between rename and the directory's metadata flush, the file
36    /// can be lost. For a content-addressed log this is acceptable
37    /// — a lost record can be re-derived from the same source — but
38    /// callers that *also* persist references to the op_id (e.g.
39    /// branch heads) should fsync those refs after `put` returns.
40    pub fn put(&self, rec: &OperationRecord) -> io::Result<()> {
41        let path = self.path(&rec.op_id);
42        if path.exists() {
43            return Ok(());
44        }
45        let bytes = serde_json::to_vec(rec)
46            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
47        let tmp = path.with_extension("json.tmp");
48        let mut f = fs::File::create(&tmp)?;
49        f.write_all(&bytes)?;
50        f.sync_all()?;
51        fs::rename(&tmp, &path)?;
52        Ok(())
53    }
54
55    pub fn get(&self, op_id: &OpId) -> io::Result<Option<OperationRecord>> {
56        let path = self.path(op_id);
57        if !path.exists() {
58            return Ok(None);
59        }
60        let bytes = fs::read(&path)?;
61        let rec: OperationRecord = serde_json::from_slice(&bytes)
62            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
63        Ok(Some(rec))
64    }
65
66    /// Walk parents transitively. Newest-first, BFS, dedup'd by op_id.
67    /// Stops at parentless ops or after `limit` records.
68    pub fn walk_back(
69        &self,
70        head: &OpId,
71        limit: Option<usize>,
72    ) -> io::Result<Vec<OperationRecord>> {
73        let mut out = Vec::new();
74        let mut seen = BTreeSet::new();
75        let mut frontier: VecDeque<OpId> = VecDeque::from([head.clone()]);
76        while let Some(id) = frontier.pop_back() {
77            if !seen.insert(id.clone()) {
78                continue;
79            }
80            if let Some(rec) = self.get(&id)? {
81                // Push parents before recording so traversal order is
82                // a stable BFS-by-discovery: children-first, then their
83                // parents, parents of those, etc.
84                for p in &rec.op.parents {
85                    if !seen.contains(p) {
86                        frontier.push_front(p.clone());
87                    }
88                }
89                out.push(rec);
90                if let Some(n) = limit {
91                    if out.len() >= n {
92                        break;
93                    }
94                }
95            }
96        }
97        Ok(out)
98    }
99
100    /// Same set as walk_back but oldest-first. Used by branch_head
101    /// for left-to-right transition replay.
102    pub fn walk_forward(
103        &self,
104        head: &OpId,
105        limit: Option<usize>,
106    ) -> io::Result<Vec<OperationRecord>> {
107        let mut all = self.walk_back(head, None)?;
108        all.reverse();
109        if let Some(n) = limit {
110            all.truncate(n);
111        }
112        Ok(all)
113    }
114
115    /// Common ancestor of two op_ids in the DAG.
116    ///
117    /// On tree-shaped histories and chain merges this is the
118    /// **lowest** common ancestor — the closest shared op. On
119    /// criss-cross merges (two ops each with two parents from
120    /// independent histories) there can be multiple
121    /// incomparable common ancestors; this picks one
122    /// deterministically (the first hit when traversing `b`'s
123    /// ancestors newest-first), but not via a recursive merge.
124    /// `None` if no shared ancestor exists.
125    ///
126    /// Tier-1 merge in #129 covers linear and tree-shaped
127    /// histories; criss-cross resolution is deferred to a
128    /// future tier (Git's `recursive` strategy is the reference).
129    pub fn lca(&self, a: &OpId, b: &OpId) -> io::Result<Option<OpId>> {
130        let a_anc: BTreeSet<OpId> = self
131            .walk_back(a, None)?
132            .into_iter()
133            .map(|r| r.op_id)
134            .collect();
135        // Walk b's ancestors newest-first; first hit is the deepest
136        // common ancestor on tree-shaped histories. In criss-cross
137        // DAGs this picks deterministically but not via recursive
138        // resolution — see the doc comment above.
139        for rec in self.walk_back(b, None)? {
140            if a_anc.contains(&rec.op_id) {
141                return Ok(Some(rec.op_id));
142            }
143        }
144        Ok(None)
145    }
146
147    /// Every record in the log. Order is whatever the directory
148    /// listing produces — undefined and not stable. Used by the
149    /// [`crate::predicate`] evaluator when no narrower candidate
150    /// set is available.
151    pub fn list_all(&self) -> io::Result<Vec<OperationRecord>> {
152        let mut out = Vec::new();
153        for entry in fs::read_dir(&self.dir)? {
154            let entry = entry?;
155            let name = match entry.file_name().into_string() {
156                Ok(s) => s,
157                Err(_) => continue,
158            };
159            if let Some(id) = name.strip_suffix(".json") {
160                if let Some(rec) = self.get(&id.to_string())? {
161                    out.push(rec);
162                }
163            }
164        }
165        Ok(out)
166    }
167
168    /// Ops in `head`'s history that are not in `base`'s history.
169    /// `base = None` means "include all of head's history" (used for
170    /// independent-histories case where the LCA is None).
171    pub fn ops_since(
172        &self,
173        head: &OpId,
174        base: Option<&OpId>,
175    ) -> io::Result<Vec<OperationRecord>> {
176        let exclude: BTreeSet<OpId> = match base {
177            Some(b) => self
178                .walk_back(b, None)?
179                .into_iter()
180                .map(|r| r.op_id)
181                .collect(),
182            None => BTreeSet::new(),
183        };
184        Ok(self
185            .walk_back(head, None)?
186            .into_iter()
187            .filter(|r| !exclude.contains(&r.op_id))
188            .collect())
189    }
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use crate::operation::{Operation, OperationKind, StageTransition};
196    use std::collections::{BTreeMap, BTreeSet};
197
198    fn add_op() -> OperationRecord {
199        let op = Operation::new(
200            OperationKind::AddFunction {
201                sig_id: "fac::Int->Int".into(),
202                stage_id: "abc123".into(),
203                effects: BTreeSet::new(),
204            },
205            [],
206        );
207        OperationRecord::new(
208            op,
209            StageTransition::Create {
210                sig_id: "fac::Int->Int".into(),
211                stage_id: "abc123".into(),
212            },
213        )
214    }
215
216    fn modify_op(parent: &OpId, sig: &str, from: &str, to: &str) -> OperationRecord {
217        let op = Operation::new(
218            OperationKind::ModifyBody {
219                sig_id: sig.into(),
220                from_stage_id: from.into(),
221                to_stage_id: to.into(),
222            },
223            [parent.clone()],
224        );
225        OperationRecord::new(
226            op,
227            StageTransition::Replace {
228                sig_id: sig.into(),
229                from: from.into(),
230                to: to.into(),
231            },
232        )
233    }
234
235    #[test]
236    fn put_then_get_round_trips() {
237        let tmp = tempfile::tempdir().unwrap();
238        let log = OpLog::open(tmp.path()).unwrap();
239        let rec = add_op();
240        log.put(&rec).unwrap();
241        let back = log.get(&rec.op_id).unwrap().unwrap();
242        assert_eq!(back, rec);
243    }
244
245    #[test]
246    fn put_is_idempotent() {
247        let tmp = tempfile::tempdir().unwrap();
248        let log = OpLog::open(tmp.path()).unwrap();
249        let rec = add_op();
250        log.put(&rec).unwrap();
251        log.put(&rec).unwrap(); // second write is a no-op
252        assert!(log.get(&rec.op_id).unwrap().is_some());
253    }
254
255    #[test]
256    fn get_missing_returns_none() {
257        let tmp = tempfile::tempdir().unwrap();
258        let log = OpLog::open(tmp.path()).unwrap();
259        assert!(log.get(&"deadbeef".to_string()).unwrap().is_none());
260    }
261
262    #[test]
263    fn walk_back_returns_newest_first() {
264        let tmp = tempfile::tempdir().unwrap();
265        let log = OpLog::open(tmp.path()).unwrap();
266        let a = add_op();
267        log.put(&a).unwrap();
268        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
269        log.put(&b).unwrap();
270        let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
271        log.put(&c).unwrap();
272
273        let walked = log.walk_back(&c.op_id, None).unwrap();
274        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
275        assert_eq!(
276            ids,
277            vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]
278        );
279    }
280
281    #[test]
282    fn walk_forward_returns_oldest_first() {
283        let tmp = tempfile::tempdir().unwrap();
284        let log = OpLog::open(tmp.path()).unwrap();
285        let a = add_op();
286        log.put(&a).unwrap();
287        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
288        log.put(&b).unwrap();
289
290        let walked = log.walk_forward(&b.op_id, None).unwrap();
291        let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
292        assert_eq!(ids, vec![a.op_id.as_str(), b.op_id.as_str()]);
293    }
294
295    #[test]
296    fn lca_finds_common_ancestor() {
297        let tmp = tempfile::tempdir().unwrap();
298        let log = OpLog::open(tmp.path()).unwrap();
299        let root = add_op();
300        log.put(&root).unwrap();
301        let left = modify_op(&root.op_id, "fac::Int->Int", "abc123", "left1");
302        log.put(&left).unwrap();
303        let right = modify_op(&root.op_id, "fac::Int->Int", "abc123", "right1");
304        log.put(&right).unwrap();
305
306        let lca = log.lca(&left.op_id, &right.op_id).unwrap();
307        assert_eq!(lca, Some(root.op_id));
308    }
309
310    #[test]
311    fn lca_none_for_independent_histories() {
312        let tmp = tempfile::tempdir().unwrap();
313        let log = OpLog::open(tmp.path()).unwrap();
314        let a = add_op();
315        log.put(&a).unwrap();
316        // A second parentless op (different sig, so different op_id).
317        let b = OperationRecord::new(
318            Operation::new(
319                OperationKind::AddFunction {
320                    sig_id: "double::Int->Int".into(),
321                    stage_id: "ddd111".into(),
322                    effects: BTreeSet::new(),
323                },
324                [],
325            ),
326            StageTransition::Create {
327                sig_id: "double::Int->Int".into(),
328                stage_id: "ddd111".into(),
329            },
330        );
331        log.put(&b).unwrap();
332
333        assert_eq!(log.lca(&a.op_id, &b.op_id).unwrap(), None);
334    }
335
336    #[test]
337    fn ops_since_excludes_base_history() {
338        let tmp = tempfile::tempdir().unwrap();
339        let log = OpLog::open(tmp.path()).unwrap();
340        let a = add_op();
341        log.put(&a).unwrap();
342        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
343        log.put(&b).unwrap();
344        let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
345        log.put(&c).unwrap();
346
347        let since: Vec<_> = log
348            .ops_since(&c.op_id, Some(&a.op_id))
349            .unwrap()
350            .into_iter()
351            .map(|r| r.op_id)
352            .collect();
353        assert_eq!(since.len(), 2);
354        assert!(since.contains(&b.op_id));
355        assert!(since.contains(&c.op_id));
356        assert!(!since.contains(&a.op_id));
357    }
358
359    #[test]
360    fn walk_back_orders_ancestors_after_descendants() {
361        // Build a small DAG with a merge:
362        //
363        //     a
364        //    / \
365        //   b   c
366        //    \ /
367        //     m  (merge with parents [b, c])
368        //
369        // The merge engine relies on the property that any ancestor of
370        // X appears strictly after X in the walk_back output. Pin it.
371        let tmp = tempfile::tempdir().unwrap();
372        let log = OpLog::open(tmp.path()).unwrap();
373        let a = add_op();
374        log.put(&a).unwrap();
375        let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
376        log.put(&b).unwrap();
377        let c = OperationRecord::new(
378            Operation::new(
379                OperationKind::ModifyBody {
380                    sig_id: "double::Int->Int".into(),
381                    from_stage_id: "ddd000".into(),
382                    to_stage_id: "c1".into(),
383                },
384                [a.op_id.clone()],
385            ),
386            StageTransition::Replace {
387                sig_id: "double::Int->Int".into(),
388                from: "ddd000".into(),
389                to: "c1".into(),
390            },
391        );
392        log.put(&c).unwrap();
393        let m = OperationRecord::new(
394            Operation::new(
395                OperationKind::Merge { resolved: 0 },
396                [b.op_id.clone(), c.op_id.clone()],
397            ),
398            StageTransition::Merge { entries: BTreeMap::new() },
399        );
400        log.put(&m).unwrap();
401
402        let walked = log.walk_back(&m.op_id, None).unwrap();
403        let pos = |id: &str| walked.iter().position(|r| r.op_id == id).unwrap();
404        let (m_pos, b_pos, c_pos, a_pos) =
405            (pos(&m.op_id), pos(&b.op_id), pos(&c.op_id), pos(&a.op_id));
406        // Each ancestor must appear strictly after its descendants.
407        assert!(m_pos < b_pos, "merge before its parent b");
408        assert!(m_pos < c_pos, "merge before its parent c");
409        assert!(b_pos < a_pos, "b before its parent a");
410        assert!(c_pos < a_pos, "c before its parent a");
411    }
412}