1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
//! Persistence + DAG queries for the operation log.
//!
//! Layout: `<root>/ops/<op_id>.json` — one canonical-JSON file
//! per [`OperationRecord`]. Atomic writes via tempfile + rename.
//! Idempotent: writing an existing op_id is a no-op (content
//! addressing guarantees the bytes match).
//!
//! # Packfiles (#261 slice 1)
//!
//! Loose-file storage is fine to ~10k ops; past that the
//! filesystem starts to thrash. [`OpLog::repack`] consolidates
//! loose files into deterministic, content-addressed packfiles:
//!
//! - `<dir>/pack-<hash>.pack`: each record framed as `[8-byte BE
//! length][canonical JSON]`, ops sorted by op_id within the pack.
//! - `<dir>/pack-<hash>.idx`: JSON map of `op_id` → byte offset
//! into the `.pack` (offset of the length header).
//!
//! Pack name is the SHA-256 of the sorted op_ids, newline-joined,
//! so the same input set always produces the same pack hash —
//! a re-run of `lex op repack` is a no-op.
//!
//! [`OpLog::get`] tries loose first, falls back to scanning all
//! `.idx` files in the directory. The write path
//! ([`OpLog::put`]) only ever writes loose; ops migrate into
//! packs via the explicit [`OpLog::repack`] call.
use crate::canonical::hash_bytes;
use crate::operation::{OpId, OperationRecord};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
pub struct OpLog {
dir: PathBuf,
}
impl OpLog {
pub fn open(root: &Path) -> io::Result<Self> {
let dir = root.join("ops");
fs::create_dir_all(&dir)?;
Ok(Self { dir })
}
fn path(&self, op_id: &OpId) -> PathBuf {
self.dir.join(format!("{op_id}.json"))
}
/// Persist a record. Idempotent on existing op_ids (the bytes
/// must match by content addressing).
///
/// Crash safety: the tempfile's data is fsync'd before rename,
/// so a successful return implies a durable file at the final
/// path. The containing directory is not fsync'd; on a crash
/// between rename and the directory's metadata flush, the file
/// can be lost. For a content-addressed log this is acceptable
/// — a lost record can be re-derived from the same source — but
/// callers that *also* persist references to the op_id (e.g.
/// branch heads) should fsync those refs after `put` returns.
pub fn put(&self, rec: &OperationRecord) -> io::Result<()> {
let path = self.path(&rec.op_id);
if path.exists() {
return Ok(());
}
let bytes = serde_json::to_vec(rec)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let tmp = path.with_extension("json.tmp");
let mut f = fs::File::create(&tmp)?;
f.write_all(&bytes)?;
f.sync_all()?;
fs::rename(&tmp, &path)?;
Ok(())
}
pub fn get(&self, op_id: &OpId) -> io::Result<Option<OperationRecord>> {
let path = self.path(op_id);
if path.exists() {
let bytes = fs::read(&path)?;
let rec: OperationRecord = serde_json::from_slice(&bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
return Ok(Some(rec));
}
// Loose miss — scan packfiles. Each `.idx` is a tiny JSON
// map; small constant cost per pack. For larger stores we
// could maintain an in-memory cache keyed off pack mtimes,
// but slice 1 keeps it simple — measure before optimizing.
for pack_idx in self.list_pack_indices()? {
let idx = PackIndex::load(&pack_idx)?;
if let Some(&offset) = idx.ops.get(op_id) {
let pack_path = pack_idx.with_extension("pack");
return read_packed_op(&pack_path, offset).map(Some);
}
}
Ok(None)
}
/// Walk the directory for `pack-*.idx` files. Order is whatever
/// the filesystem gives us — `get` doesn't depend on it (op_ids
/// are unique by content addressing, so the right pack wins).
fn list_pack_indices(&self) -> io::Result<Vec<PathBuf>> {
let mut out = Vec::new();
for entry in fs::read_dir(&self.dir)? {
let entry = entry?;
let name = match entry.file_name().into_string() {
Ok(s) => s,
Err(_) => continue,
};
if name.starts_with("pack-") && name.ends_with(".idx") {
out.push(entry.path());
}
}
Ok(out)
}
/// Consolidate loose op records into a deterministic, content-
/// addressed packfile (#261 slice 1). Returns the number of
/// ops moved into the new pack.
///
/// `threshold` is the minimum number of loose ops required to
/// trigger a repack — under that, returns `0` and leaves the
/// log alone. The idea: small stores stay loose; only repack
/// when the file count starts to matter.
///
/// Determinism: the pack name is the SHA-256 of the sorted
/// op_ids (newline-joined), so two independent runs against the
/// same set of loose ops produce a byte-identical pack.
/// Re-running on an empty loose directory is a no-op.
///
/// Crash safety: the `.pack.tmp` and `.idx.tmp` files are
/// fsync'd before rename; loose files are deleted only after
/// both renames succeed. A crash mid-repack leaves both loose
/// and partial-pack files; a subsequent `get` finds the loose
/// version, and a subsequent `repack` cleans up.
pub fn repack(&self, threshold: usize) -> io::Result<usize> {
let loose: Vec<(OpId, PathBuf)> = self.list_loose_files()?;
if loose.len() < threshold {
return Ok(0);
}
// Sort ops deterministically by op_id (lex order). The pack
// hash is the SHA-256 of those op_ids joined by newlines —
// same input → same name.
let mut ops: Vec<(OpId, Vec<u8>)> = Vec::with_capacity(loose.len());
for (op_id, path) in &loose {
let bytes = fs::read(path)?;
ops.push((op_id.clone(), bytes));
}
ops.sort_by(|a, b| a.0.cmp(&b.0));
let mut name_input = Vec::new();
for (id, _) in &ops {
name_input.extend_from_slice(id.as_bytes());
name_input.push(b'\n');
}
let pack_hash = hash_bytes(&name_input);
let pack_path = self.dir.join(format!("pack-{pack_hash}.pack"));
let idx_path = self.dir.join(format!("pack-{pack_hash}.idx"));
if pack_path.exists() && idx_path.exists() {
// Same input set — pack already exists. Just clean up
// the loose duplicates.
let count = ops.len();
for (_, path) in &loose {
let _ = fs::remove_file(path);
}
return Ok(count);
}
// Write `<pack>.pack.tmp` framed as [8-byte BE length][JSON]
// for each record; record offsets for the index.
let pack_tmp = pack_path.with_extension("pack.tmp");
let idx_tmp = idx_path.with_extension("idx.tmp");
let mut offsets: BTreeMap<OpId, u64> = BTreeMap::new();
{
let mut f = fs::File::create(&pack_tmp)?;
let mut cursor: u64 = 0;
for (op_id, bytes) in &ops {
offsets.insert(op_id.clone(), cursor);
let len = bytes.len() as u64;
f.write_all(&len.to_be_bytes())?;
f.write_all(bytes)?;
cursor += 8 + len;
}
f.sync_all()?;
}
// Write the index. JSON for inspectability and
// forward-compat (we can add fields without breaking
// readers).
let idx = PackIndex { version: 1, ops: offsets };
idx.save(&idx_tmp)?;
fs::rename(&pack_tmp, &pack_path)?;
fs::rename(&idx_tmp, &idx_path)?;
// Now safe to delete the loose files — pack is durable.
let count = ops.len();
for (_, path) in &loose {
let _ = fs::remove_file(path);
}
Ok(count)
}
/// Remove every op_id in `victims` from the log, across both
/// loose files and packfiles (#261 slice 2). Used by
/// `lex op gc` after a retention plan identifies which ops to
/// drop. Idempotent — calling twice with the same set is a
/// no-op on the second pass.
///
/// Pack handling: any pack containing one or more victims is
/// rewritten to a new content-addressed pack with only the
/// surviving ops; the old pack and its index file are deleted.
/// A pack whose every op is a victim is deleted outright.
///
/// Returns the count of ops actually removed (loose files
/// deleted + packed ops dropped). Pre-existing absences don't
/// contribute.
pub fn evict(&self, victims: &BTreeSet<OpId>) -> io::Result<usize> {
if victims.is_empty() {
return Ok(0);
}
let mut removed = 0;
// Loose files: just delete the matching `<op_id>.json`.
for (op_id, path) in self.list_loose_files()? {
if victims.contains(&op_id) {
match fs::remove_file(&path) {
Ok(()) => removed += 1,
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(e),
}
}
}
// Packs: rewrite each affected pack with only surviving ops.
for pack_idx in self.list_pack_indices()? {
let idx = PackIndex::load(&pack_idx)?;
let pack_path = pack_idx.with_extension("pack");
let touched = idx.ops.keys().any(|op_id| victims.contains(op_id));
if !touched {
continue;
}
// Read every surviving op out, then drop the old pack.
let mut survivors: Vec<(OpId, Vec<u8>)> = Vec::new();
for (op_id, &offset) in &idx.ops {
if victims.contains(op_id) {
removed += 1;
continue;
}
let rec = read_packed_op(&pack_path, offset)?;
let bytes = serde_json::to_vec(&rec)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
survivors.push((op_id.clone(), bytes));
}
// Delete old pack + idx first; we re-emit a fresh
// (different-hash) pack from the survivors below.
let _ = fs::remove_file(&pack_path);
let _ = fs::remove_file(&pack_idx);
if survivors.is_empty() {
continue;
}
self.write_pack_from_survivors(survivors)?;
}
Ok(removed)
}
/// Helper: write a new content-addressed pack from
/// already-serialized op bytes. Same shape as
/// [`Self::repack`]'s output path; factored out so
/// [`Self::evict`] can reuse it.
fn write_pack_from_survivors(
&self,
mut ops: Vec<(OpId, Vec<u8>)>,
) -> io::Result<()> {
ops.sort_by(|a, b| a.0.cmp(&b.0));
let mut name_input = Vec::new();
for (id, _) in &ops {
name_input.extend_from_slice(id.as_bytes());
name_input.push(b'\n');
}
let pack_hash = hash_bytes(&name_input);
let pack_path = self.dir.join(format!("pack-{pack_hash}.pack"));
let idx_path = self.dir.join(format!("pack-{pack_hash}.idx"));
if pack_path.exists() && idx_path.exists() {
return Ok(());
}
let pack_tmp = pack_path.with_extension("pack.tmp");
let idx_tmp = idx_path.with_extension("idx.tmp");
let mut offsets: BTreeMap<OpId, u64> = BTreeMap::new();
{
let mut f = fs::File::create(&pack_tmp)?;
let mut cursor: u64 = 0;
for (op_id, bytes) in &ops {
offsets.insert(op_id.clone(), cursor);
let len = bytes.len() as u64;
f.write_all(&len.to_be_bytes())?;
f.write_all(bytes)?;
cursor += 8 + len;
}
f.sync_all()?;
}
let idx = PackIndex { version: 1, ops: offsets };
idx.save(&idx_tmp)?;
fs::rename(&pack_tmp, &pack_path)?;
fs::rename(&idx_tmp, &idx_path)?;
Ok(())
}
/// Enumerate every loose `<op_id>.json` in the ops directory.
/// Used by [`Self::repack`] and [`Self::list_all`].
fn list_loose_files(&self) -> io::Result<Vec<(OpId, PathBuf)>> {
let mut out = Vec::new();
for entry in fs::read_dir(&self.dir)? {
let entry = entry?;
let name = match entry.file_name().into_string() {
Ok(s) => s,
Err(_) => continue,
};
if let Some(id) = name.strip_suffix(".json") {
if !id.starts_with("pack-") {
out.push((id.to_string(), entry.path()));
}
}
}
Ok(out)
}
/// Remove a record from the log. Used by [`crate::migrate`] to
/// delete the old `<op_id>.json` files after a format migration
/// has written their replacements. Idempotent on missing files.
///
/// **Not** part of the day-to-day op-log API — the log is
/// append-only by design (#129). The only legitimate caller is
/// the migration tool, which is supervising a destructive,
/// `--confirm`-gated batch.
pub fn delete(&self, op_id: &OpId) -> io::Result<()> {
let path = self.path(op_id);
match fs::remove_file(&path) {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e),
}
}
/// Walk parents transitively. Newest-first, BFS, dedup'd by op_id.
/// Stops at parentless ops or after `limit` records.
pub fn walk_back(
&self,
head: &OpId,
limit: Option<usize>,
) -> io::Result<Vec<OperationRecord>> {
let mut out = Vec::new();
let mut seen = BTreeSet::new();
let mut frontier: VecDeque<OpId> = VecDeque::from([head.clone()]);
while let Some(id) = frontier.pop_back() {
if !seen.insert(id.clone()) {
continue;
}
if let Some(rec) = self.get(&id)? {
// Push parents before recording so traversal order is
// a stable BFS-by-discovery: children-first, then their
// parents, parents of those, etc.
for p in &rec.op.parents {
if !seen.contains(p) {
frontier.push_front(p.clone());
}
}
out.push(rec);
if let Some(n) = limit {
if out.len() >= n {
break;
}
}
}
}
Ok(out)
}
/// Same set as walk_back but oldest-first. Used by branch_head
/// for left-to-right transition replay.
pub fn walk_forward(
&self,
head: &OpId,
limit: Option<usize>,
) -> io::Result<Vec<OperationRecord>> {
let mut all = self.walk_back(head, None)?;
all.reverse();
if let Some(n) = limit {
all.truncate(n);
}
Ok(all)
}
/// Common ancestor of two op_ids in the DAG.
///
/// On tree-shaped histories and chain merges this is the
/// **lowest** common ancestor — the closest shared op. On
/// criss-cross merges (two ops each with two parents from
/// independent histories) there can be multiple
/// incomparable common ancestors; this picks one
/// deterministically (the first hit when traversing `b`'s
/// ancestors newest-first), but not via a recursive merge.
/// `None` if no shared ancestor exists.
///
/// Tier-1 merge in #129 covers linear and tree-shaped
/// histories; criss-cross resolution is deferred to a
/// future tier (Git's `recursive` strategy is the reference).
pub fn lca(&self, a: &OpId, b: &OpId) -> io::Result<Option<OpId>> {
let a_anc: BTreeSet<OpId> = self
.walk_back(a, None)?
.into_iter()
.map(|r| r.op_id)
.collect();
// Walk b's ancestors newest-first; first hit is the deepest
// common ancestor on tree-shaped histories. In criss-cross
// DAGs this picks deterministically but not via recursive
// resolution — see the doc comment above.
for rec in self.walk_back(b, None)? {
if a_anc.contains(&rec.op_id) {
return Ok(Some(rec.op_id));
}
}
Ok(None)
}
/// Every record in the log. Order is whatever the directory
/// listing produces — undefined and not stable. Used by the
/// [`crate::predicate`] evaluator when no narrower candidate
/// set is available.
pub fn list_all(&self) -> io::Result<Vec<OperationRecord>> {
let mut out = Vec::new();
let mut seen: BTreeSet<OpId> = BTreeSet::new();
// Loose first so dedup wins for them on collision (loose
// and pack should never both exist for the same op_id post-
// repack, but during an interrupted repack both can be
// present transiently).
for (id, _) in self.list_loose_files()? {
if let Some(rec) = self.get(&id)? {
if seen.insert(rec.op_id.clone()) {
out.push(rec);
}
}
}
for pack_idx in self.list_pack_indices()? {
let idx = PackIndex::load(&pack_idx)?;
let pack_path = pack_idx.with_extension("pack");
for (op_id, &offset) in &idx.ops {
if seen.insert(op_id.clone()) {
out.push(read_packed_op(&pack_path, offset)?);
}
}
}
Ok(out)
}
/// Ops in `head`'s history that are not in `base`'s history.
/// `base = None` means "include all of head's history" (used for
/// independent-histories case where the LCA is None).
pub fn ops_since(
&self,
head: &OpId,
base: Option<&OpId>,
) -> io::Result<Vec<OperationRecord>> {
let exclude: BTreeSet<OpId> = match base {
Some(b) => self
.walk_back(b, None)?
.into_iter()
.map(|r| r.op_id)
.collect(),
None => BTreeSet::new(),
};
Ok(self
.walk_back(head, None)?
.into_iter()
.filter(|r| !exclude.contains(&r.op_id))
.collect())
}
}
/// Sidecar index for a packfile. Maps `op_id` to the byte offset
/// of the record's length header inside the `.pack`. JSON for
/// inspectability and forward-compat.
#[derive(serde::Serialize, serde::Deserialize)]
struct PackIndex {
version: u32,
ops: BTreeMap<OpId, u64>,
}
impl PackIndex {
fn load(path: &Path) -> io::Result<Self> {
let bytes = fs::read(path)?;
serde_json::from_slice(&bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
fn save(&self, path: &Path) -> io::Result<()> {
let bytes = serde_json::to_vec(self)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut f = fs::File::create(path)?;
f.write_all(&bytes)?;
f.sync_all()?;
Ok(())
}
}
/// Read one record from a packfile at `offset`. The record is
/// framed as `[8-byte BE length][canonical JSON]`.
fn read_packed_op(pack_path: &Path, offset: u64) -> io::Result<OperationRecord> {
let mut f = fs::File::open(pack_path)?;
f.seek(SeekFrom::Start(offset))?;
let mut len_buf = [0u8; 8];
f.read_exact(&mut len_buf)?;
let len = u64::from_be_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
f.read_exact(&mut buf)?;
serde_json::from_slice(&buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::operation::{Operation, OperationKind, StageTransition};
use std::collections::{BTreeMap, BTreeSet};
fn add_op() -> OperationRecord {
let op = Operation::new(
OperationKind::AddFunction {
sig_id: "fac::Int->Int".into(),
stage_id: "abc123".into(),
effects: BTreeSet::new(),
budget_cost: None,
},
[],
);
OperationRecord::new(
op,
StageTransition::Create {
sig_id: "fac::Int->Int".into(),
stage_id: "abc123".into(),
},
)
}
fn modify_op(parent: &OpId, sig: &str, from: &str, to: &str) -> OperationRecord {
let op = Operation::new(
OperationKind::ModifyBody {
sig_id: sig.into(),
from_stage_id: from.into(),
to_stage_id: to.into(),
from_budget: None,
to_budget: None,
},
[parent.clone()],
);
OperationRecord::new(
op,
StageTransition::Replace {
sig_id: sig.into(),
from: from.into(),
to: to.into(),
},
)
}
#[test]
fn put_then_get_round_trips() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let rec = add_op();
log.put(&rec).unwrap();
let back = log.get(&rec.op_id).unwrap().unwrap();
assert_eq!(back, rec);
}
#[test]
fn put_is_idempotent() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let rec = add_op();
log.put(&rec).unwrap();
log.put(&rec).unwrap(); // second write is a no-op
assert!(log.get(&rec.op_id).unwrap().is_some());
}
#[test]
fn get_missing_returns_none() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
assert!(log.get(&"deadbeef".to_string()).unwrap().is_none());
}
#[test]
fn walk_back_returns_newest_first() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
log.put(&b).unwrap();
let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
log.put(&c).unwrap();
let walked = log.walk_back(&c.op_id, None).unwrap();
let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
assert_eq!(
ids,
vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]
);
}
#[test]
fn walk_forward_returns_oldest_first() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
log.put(&b).unwrap();
let walked = log.walk_forward(&b.op_id, None).unwrap();
let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
assert_eq!(ids, vec![a.op_id.as_str(), b.op_id.as_str()]);
}
#[test]
fn lca_finds_common_ancestor() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let root = add_op();
log.put(&root).unwrap();
let left = modify_op(&root.op_id, "fac::Int->Int", "abc123", "left1");
log.put(&left).unwrap();
let right = modify_op(&root.op_id, "fac::Int->Int", "abc123", "right1");
log.put(&right).unwrap();
let lca = log.lca(&left.op_id, &right.op_id).unwrap();
assert_eq!(lca, Some(root.op_id));
}
#[test]
fn lca_none_for_independent_histories() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
// A second parentless op (different sig, so different op_id).
let b = OperationRecord::new(
Operation::new(
OperationKind::AddFunction {
sig_id: "double::Int->Int".into(),
stage_id: "ddd111".into(),
effects: BTreeSet::new(),
budget_cost: None,
},
[],
),
StageTransition::Create {
sig_id: "double::Int->Int".into(),
stage_id: "ddd111".into(),
},
);
log.put(&b).unwrap();
assert_eq!(log.lca(&a.op_id, &b.op_id).unwrap(), None);
}
#[test]
fn ops_since_excludes_base_history() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
log.put(&b).unwrap();
let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
log.put(&c).unwrap();
let since: Vec<_> = log
.ops_since(&c.op_id, Some(&a.op_id))
.unwrap()
.into_iter()
.map(|r| r.op_id)
.collect();
assert_eq!(since.len(), 2);
assert!(since.contains(&b.op_id));
assert!(since.contains(&c.op_id));
assert!(!since.contains(&a.op_id));
}
#[test]
fn repack_consolidates_loose_files_into_a_pack() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
log.put(&b).unwrap();
let n = log.repack(0).unwrap(); // threshold 0 = always
assert_eq!(n, 2);
let ops_dir = tmp.path().join("ops");
let loose: Vec<_> = fs::read_dir(&ops_dir).unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|x| x == "json"))
.filter(|e| !e.file_name().to_string_lossy().starts_with("pack-"))
.collect();
assert!(loose.is_empty(), "loose .json files should be deleted");
let packs: Vec<_> = fs::read_dir(&ops_dir).unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().is_some_and(|x| x == "pack"))
.collect();
assert_eq!(packs.len(), 1);
// After repack, get() must still return both ops via the pack.
assert_eq!(log.get(&a.op_id).unwrap().unwrap(), a);
assert_eq!(log.get(&b.op_id).unwrap().unwrap(), b);
}
#[test]
fn repack_below_threshold_is_a_noop() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
log.put(&add_op()).unwrap();
let n = log.repack(10).unwrap();
assert_eq!(n, 0);
}
#[test]
fn repack_is_deterministic_on_same_input() {
// Two stores with the same loose ops repack to the same
// pack hash — content addressing all the way down.
let make_log = || {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
log.put(&b).unwrap();
log.repack(0).unwrap();
(tmp, log)
};
let (tmp1, _log1) = make_log();
let (tmp2, _log2) = make_log();
let pack_name = |dir: &std::path::Path| -> String {
fs::read_dir(dir.join("ops")).unwrap()
.filter_map(|e| e.ok())
.find(|e| e.path().extension().is_some_and(|x| x == "pack"))
.unwrap()
.file_name().into_string().unwrap()
};
assert_eq!(pack_name(tmp1.path()), pack_name(tmp2.path()));
}
#[test]
fn walk_back_works_across_loose_and_packed_ops() {
// Pack the older history, leave newer ops loose. walk_back
// must traverse seamlessly.
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
log.put(&b).unwrap();
log.repack(0).unwrap();
// Now add a newer op as a loose file.
let c = modify_op(&b.op_id, "fac::Int->Int", "b1", "c1");
log.put(&c).unwrap();
let walked = log.walk_back(&c.op_id, None).unwrap();
let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
assert_eq!(ids, vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]);
}
#[test]
fn list_all_dedups_across_loose_and_pack() {
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
log.repack(0).unwrap();
// Re-put the same op as a loose file (simulate an
// interrupted repack). list_all should still report
// exactly one record per op_id.
log.put(&a).unwrap();
let all = log.list_all().unwrap();
assert_eq!(all.len(), 1);
assert_eq!(all[0].op_id, a.op_id);
}
#[test]
fn walk_back_orders_ancestors_after_descendants() {
// Build a small DAG with a merge:
//
// a
// / \
// b c
// \ /
// m (merge with parents [b, c])
//
// The merge engine relies on the property that any ancestor of
// X appears strictly after X in the walk_back output. Pin it.
let tmp = tempfile::tempdir().unwrap();
let log = OpLog::open(tmp.path()).unwrap();
let a = add_op();
log.put(&a).unwrap();
let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
log.put(&b).unwrap();
let c = OperationRecord::new(
Operation::new(
OperationKind::ModifyBody {
sig_id: "double::Int->Int".into(),
from_stage_id: "ddd000".into(),
to_stage_id: "c1".into(),
from_budget: None,
to_budget: None,
},
[a.op_id.clone()],
),
StageTransition::Replace {
sig_id: "double::Int->Int".into(),
from: "ddd000".into(),
to: "c1".into(),
},
);
log.put(&c).unwrap();
let m = OperationRecord::new(
Operation::new(
OperationKind::Merge { resolved: 0 },
[b.op_id.clone(), c.op_id.clone()],
),
StageTransition::Merge { entries: BTreeMap::new() },
);
log.put(&m).unwrap();
let walked = log.walk_back(&m.op_id, None).unwrap();
let pos = |id: &str| walked.iter().position(|r| r.op_id == id).unwrap();
let (m_pos, b_pos, c_pos, a_pos) =
(pos(&m.op_id), pos(&b.op_id), pos(&c.op_id), pos(&a.op_id));
// Each ancestor must appear strictly after its descendants.
assert!(m_pos < b_pos, "merge before its parent b");
assert!(m_pos < c_pos, "merge before its parent c");
assert!(b_pos < a_pos, "b before its parent a");
assert!(c_pos < a_pos, "c before its parent a");
}
}