1use crate::canonical::hash_bytes;
29use crate::operation::{OpId, OperationRecord};
30use std::collections::{BTreeMap, BTreeSet, VecDeque};
31use std::fs;
32use std::io::{self, Read, Seek, SeekFrom, Write};
33use std::path::{Path, PathBuf};
34
35pub struct OpLog {
36 dir: PathBuf,
37}
38
39impl OpLog {
40 pub fn open(root: &Path) -> io::Result<Self> {
41 let dir = root.join("ops");
42 fs::create_dir_all(&dir)?;
43 Ok(Self { dir })
44 }
45
46 fn path(&self, op_id: &OpId) -> PathBuf {
47 self.dir.join(format!("{op_id}.json"))
48 }
49
50 pub fn put(&self, rec: &OperationRecord) -> io::Result<()> {
62 let path = self.path(&rec.op_id);
63 if path.exists() {
64 return Ok(());
65 }
66 let bytes = serde_json::to_vec(rec)
67 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
68 let tmp = path.with_extension("json.tmp");
69 let mut f = fs::File::create(&tmp)?;
70 f.write_all(&bytes)?;
71 f.sync_all()?;
72 fs::rename(&tmp, &path)?;
73 Ok(())
74 }
75
76 pub fn get(&self, op_id: &OpId) -> io::Result<Option<OperationRecord>> {
77 let path = self.path(op_id);
78 if path.exists() {
79 let bytes = fs::read(&path)?;
80 let rec: OperationRecord = serde_json::from_slice(&bytes)
81 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
82 return Ok(Some(rec));
83 }
84 for pack_idx in self.list_pack_indices()? {
89 let idx = PackIndex::load(&pack_idx)?;
90 if let Some(&offset) = idx.ops.get(op_id) {
91 let pack_path = pack_idx.with_extension("pack");
92 return read_packed_op(&pack_path, offset).map(Some);
93 }
94 }
95 Ok(None)
96 }
97
98 fn list_pack_indices(&self) -> io::Result<Vec<PathBuf>> {
102 let mut out = Vec::new();
103 for entry in fs::read_dir(&self.dir)? {
104 let entry = entry?;
105 let name = match entry.file_name().into_string() {
106 Ok(s) => s,
107 Err(_) => continue,
108 };
109 if name.starts_with("pack-") && name.ends_with(".idx") {
110 out.push(entry.path());
111 }
112 }
113 Ok(out)
114 }
115
116 pub fn repack(&self, threshold: usize) -> io::Result<usize> {
136 let loose: Vec<(OpId, PathBuf)> = self.list_loose_files()?;
137 if loose.len() < threshold {
138 return Ok(0);
139 }
140 let mut ops: Vec<(OpId, Vec<u8>)> = Vec::with_capacity(loose.len());
144 for (op_id, path) in &loose {
145 let bytes = fs::read(path)?;
146 ops.push((op_id.clone(), bytes));
147 }
148 ops.sort_by(|a, b| a.0.cmp(&b.0));
149 let mut name_input = Vec::new();
150 for (id, _) in &ops {
151 name_input.extend_from_slice(id.as_bytes());
152 name_input.push(b'\n');
153 }
154 let pack_hash = hash_bytes(&name_input);
155 let pack_path = self.dir.join(format!("pack-{pack_hash}.pack"));
156 let idx_path = self.dir.join(format!("pack-{pack_hash}.idx"));
157 if pack_path.exists() && idx_path.exists() {
158 let count = ops.len();
161 for (_, path) in &loose {
162 let _ = fs::remove_file(path);
163 }
164 return Ok(count);
165 }
166
167 let pack_tmp = pack_path.with_extension("pack.tmp");
170 let idx_tmp = idx_path.with_extension("idx.tmp");
171 let mut offsets: BTreeMap<OpId, u64> = BTreeMap::new();
172 {
173 let mut f = fs::File::create(&pack_tmp)?;
174 let mut cursor: u64 = 0;
175 for (op_id, bytes) in &ops {
176 offsets.insert(op_id.clone(), cursor);
177 let len = bytes.len() as u64;
178 f.write_all(&len.to_be_bytes())?;
179 f.write_all(bytes)?;
180 cursor += 8 + len;
181 }
182 f.sync_all()?;
183 }
184 let idx = PackIndex { version: 1, ops: offsets };
188 idx.save(&idx_tmp)?;
189
190 fs::rename(&pack_tmp, &pack_path)?;
191 fs::rename(&idx_tmp, &idx_path)?;
192
193 let count = ops.len();
195 for (_, path) in &loose {
196 let _ = fs::remove_file(path);
197 }
198 Ok(count)
199 }
200
201 pub fn evict(&self, victims: &BTreeSet<OpId>) -> io::Result<usize> {
216 if victims.is_empty() {
217 return Ok(0);
218 }
219 let mut removed = 0;
220 for (op_id, path) in self.list_loose_files()? {
222 if victims.contains(&op_id) {
223 match fs::remove_file(&path) {
224 Ok(()) => removed += 1,
225 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
226 Err(e) => return Err(e),
227 }
228 }
229 }
230 for pack_idx in self.list_pack_indices()? {
232 let idx = PackIndex::load(&pack_idx)?;
233 let pack_path = pack_idx.with_extension("pack");
234 let touched = idx.ops.keys().any(|op_id| victims.contains(op_id));
235 if !touched {
236 continue;
237 }
238 let mut survivors: Vec<(OpId, Vec<u8>)> = Vec::new();
240 for (op_id, &offset) in &idx.ops {
241 if victims.contains(op_id) {
242 removed += 1;
243 continue;
244 }
245 let rec = read_packed_op(&pack_path, offset)?;
246 let bytes = serde_json::to_vec(&rec)
247 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
248 survivors.push((op_id.clone(), bytes));
249 }
250 let _ = fs::remove_file(&pack_path);
253 let _ = fs::remove_file(&pack_idx);
254 if survivors.is_empty() {
255 continue;
256 }
257 self.write_pack_from_survivors(survivors)?;
258 }
259 Ok(removed)
260 }
261
262 fn write_pack_from_survivors(
267 &self,
268 mut ops: Vec<(OpId, Vec<u8>)>,
269 ) -> io::Result<()> {
270 ops.sort_by(|a, b| a.0.cmp(&b.0));
271 let mut name_input = Vec::new();
272 for (id, _) in &ops {
273 name_input.extend_from_slice(id.as_bytes());
274 name_input.push(b'\n');
275 }
276 let pack_hash = hash_bytes(&name_input);
277 let pack_path = self.dir.join(format!("pack-{pack_hash}.pack"));
278 let idx_path = self.dir.join(format!("pack-{pack_hash}.idx"));
279 if pack_path.exists() && idx_path.exists() {
280 return Ok(());
281 }
282 let pack_tmp = pack_path.with_extension("pack.tmp");
283 let idx_tmp = idx_path.with_extension("idx.tmp");
284 let mut offsets: BTreeMap<OpId, u64> = BTreeMap::new();
285 {
286 let mut f = fs::File::create(&pack_tmp)?;
287 let mut cursor: u64 = 0;
288 for (op_id, bytes) in &ops {
289 offsets.insert(op_id.clone(), cursor);
290 let len = bytes.len() as u64;
291 f.write_all(&len.to_be_bytes())?;
292 f.write_all(bytes)?;
293 cursor += 8 + len;
294 }
295 f.sync_all()?;
296 }
297 let idx = PackIndex { version: 1, ops: offsets };
298 idx.save(&idx_tmp)?;
299 fs::rename(&pack_tmp, &pack_path)?;
300 fs::rename(&idx_tmp, &idx_path)?;
301 Ok(())
302 }
303
304 fn list_loose_files(&self) -> io::Result<Vec<(OpId, PathBuf)>> {
307 let mut out = Vec::new();
308 for entry in fs::read_dir(&self.dir)? {
309 let entry = entry?;
310 let name = match entry.file_name().into_string() {
311 Ok(s) => s,
312 Err(_) => continue,
313 };
314 if let Some(id) = name.strip_suffix(".json") {
315 if !id.starts_with("pack-") {
316 out.push((id.to_string(), entry.path()));
317 }
318 }
319 }
320 Ok(out)
321 }
322
323 pub fn delete(&self, op_id: &OpId) -> io::Result<()> {
332 let path = self.path(op_id);
333 match fs::remove_file(&path) {
334 Ok(()) => Ok(()),
335 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
336 Err(e) => Err(e),
337 }
338 }
339
340 pub fn walk_back(
343 &self,
344 head: &OpId,
345 limit: Option<usize>,
346 ) -> io::Result<Vec<OperationRecord>> {
347 let mut out = Vec::new();
348 let mut seen = BTreeSet::new();
349 let mut frontier: VecDeque<OpId> = VecDeque::from([head.clone()]);
350 while let Some(id) = frontier.pop_back() {
351 if !seen.insert(id.clone()) {
352 continue;
353 }
354 if let Some(rec) = self.get(&id)? {
355 for p in &rec.op.parents {
359 if !seen.contains(p) {
360 frontier.push_front(p.clone());
361 }
362 }
363 out.push(rec);
364 if let Some(n) = limit {
365 if out.len() >= n {
366 break;
367 }
368 }
369 }
370 }
371 Ok(out)
372 }
373
374 pub fn walk_forward(
377 &self,
378 head: &OpId,
379 limit: Option<usize>,
380 ) -> io::Result<Vec<OperationRecord>> {
381 let mut all = self.walk_back(head, None)?;
382 all.reverse();
383 if let Some(n) = limit {
384 all.truncate(n);
385 }
386 Ok(all)
387 }
388
389 pub fn lca(&self, a: &OpId, b: &OpId) -> io::Result<Option<OpId>> {
404 let a_anc: BTreeSet<OpId> = self
405 .walk_back(a, None)?
406 .into_iter()
407 .map(|r| r.op_id)
408 .collect();
409 for rec in self.walk_back(b, None)? {
414 if a_anc.contains(&rec.op_id) {
415 return Ok(Some(rec.op_id));
416 }
417 }
418 Ok(None)
419 }
420
421 pub fn list_all(&self) -> io::Result<Vec<OperationRecord>> {
426 let mut out = Vec::new();
427 let mut seen: BTreeSet<OpId> = BTreeSet::new();
428 for (id, _) in self.list_loose_files()? {
433 if let Some(rec) = self.get(&id)? {
434 if seen.insert(rec.op_id.clone()) {
435 out.push(rec);
436 }
437 }
438 }
439 for pack_idx in self.list_pack_indices()? {
440 let idx = PackIndex::load(&pack_idx)?;
441 let pack_path = pack_idx.with_extension("pack");
442 for (op_id, &offset) in &idx.ops {
443 if seen.insert(op_id.clone()) {
444 out.push(read_packed_op(&pack_path, offset)?);
445 }
446 }
447 }
448 Ok(out)
449 }
450
451 pub fn ops_since(
455 &self,
456 head: &OpId,
457 base: Option<&OpId>,
458 ) -> io::Result<Vec<OperationRecord>> {
459 let exclude: BTreeSet<OpId> = match base {
460 Some(b) => self
461 .walk_back(b, None)?
462 .into_iter()
463 .map(|r| r.op_id)
464 .collect(),
465 None => BTreeSet::new(),
466 };
467 Ok(self
468 .walk_back(head, None)?
469 .into_iter()
470 .filter(|r| !exclude.contains(&r.op_id))
471 .collect())
472 }
473}
474
475#[derive(serde::Serialize, serde::Deserialize)]
479struct PackIndex {
480 version: u32,
481 ops: BTreeMap<OpId, u64>,
482}
483
484impl PackIndex {
485 fn load(path: &Path) -> io::Result<Self> {
486 let bytes = fs::read(path)?;
487 serde_json::from_slice(&bytes)
488 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
489 }
490
491 fn save(&self, path: &Path) -> io::Result<()> {
492 let bytes = serde_json::to_vec(self)
493 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
494 let mut f = fs::File::create(path)?;
495 f.write_all(&bytes)?;
496 f.sync_all()?;
497 Ok(())
498 }
499}
500
501fn read_packed_op(pack_path: &Path, offset: u64) -> io::Result<OperationRecord> {
504 let mut f = fs::File::open(pack_path)?;
505 f.seek(SeekFrom::Start(offset))?;
506 let mut len_buf = [0u8; 8];
507 f.read_exact(&mut len_buf)?;
508 let len = u64::from_be_bytes(len_buf) as usize;
509 let mut buf = vec![0u8; len];
510 f.read_exact(&mut buf)?;
511 serde_json::from_slice(&buf)
512 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518 use crate::operation::{Operation, OperationKind, StageTransition};
519 use std::collections::{BTreeMap, BTreeSet};
520
521 fn add_op() -> OperationRecord {
522 let op = Operation::new(
523 OperationKind::AddFunction {
524 sig_id: "fac::Int->Int".into(),
525 stage_id: "abc123".into(),
526 effects: BTreeSet::new(),
527 budget_cost: None,
528 },
529 [],
530 );
531 OperationRecord::new(
532 op,
533 StageTransition::Create {
534 sig_id: "fac::Int->Int".into(),
535 stage_id: "abc123".into(),
536 },
537 )
538 }
539
540 fn modify_op(parent: &OpId, sig: &str, from: &str, to: &str) -> OperationRecord {
541 let op = Operation::new(
542 OperationKind::ModifyBody {
543 sig_id: sig.into(),
544 from_stage_id: from.into(),
545 to_stage_id: to.into(),
546 from_budget: None,
547 to_budget: None,
548 },
549 [parent.clone()],
550 );
551 OperationRecord::new(
552 op,
553 StageTransition::Replace {
554 sig_id: sig.into(),
555 from: from.into(),
556 to: to.into(),
557 },
558 )
559 }
560
561 #[test]
562 fn put_then_get_round_trips() {
563 let tmp = tempfile::tempdir().unwrap();
564 let log = OpLog::open(tmp.path()).unwrap();
565 let rec = add_op();
566 log.put(&rec).unwrap();
567 let back = log.get(&rec.op_id).unwrap().unwrap();
568 assert_eq!(back, rec);
569 }
570
571 #[test]
572 fn put_is_idempotent() {
573 let tmp = tempfile::tempdir().unwrap();
574 let log = OpLog::open(tmp.path()).unwrap();
575 let rec = add_op();
576 log.put(&rec).unwrap();
577 log.put(&rec).unwrap(); assert!(log.get(&rec.op_id).unwrap().is_some());
579 }
580
581 #[test]
582 fn get_missing_returns_none() {
583 let tmp = tempfile::tempdir().unwrap();
584 let log = OpLog::open(tmp.path()).unwrap();
585 assert!(log.get(&"deadbeef".to_string()).unwrap().is_none());
586 }
587
588 #[test]
589 fn walk_back_returns_newest_first() {
590 let tmp = tempfile::tempdir().unwrap();
591 let log = OpLog::open(tmp.path()).unwrap();
592 let a = add_op();
593 log.put(&a).unwrap();
594 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
595 log.put(&b).unwrap();
596 let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
597 log.put(&c).unwrap();
598
599 let walked = log.walk_back(&c.op_id, None).unwrap();
600 let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
601 assert_eq!(
602 ids,
603 vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]
604 );
605 }
606
607 #[test]
608 fn walk_forward_returns_oldest_first() {
609 let tmp = tempfile::tempdir().unwrap();
610 let log = OpLog::open(tmp.path()).unwrap();
611 let a = add_op();
612 log.put(&a).unwrap();
613 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
614 log.put(&b).unwrap();
615
616 let walked = log.walk_forward(&b.op_id, None).unwrap();
617 let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
618 assert_eq!(ids, vec![a.op_id.as_str(), b.op_id.as_str()]);
619 }
620
621 #[test]
622 fn lca_finds_common_ancestor() {
623 let tmp = tempfile::tempdir().unwrap();
624 let log = OpLog::open(tmp.path()).unwrap();
625 let root = add_op();
626 log.put(&root).unwrap();
627 let left = modify_op(&root.op_id, "fac::Int->Int", "abc123", "left1");
628 log.put(&left).unwrap();
629 let right = modify_op(&root.op_id, "fac::Int->Int", "abc123", "right1");
630 log.put(&right).unwrap();
631
632 let lca = log.lca(&left.op_id, &right.op_id).unwrap();
633 assert_eq!(lca, Some(root.op_id));
634 }
635
636 #[test]
637 fn lca_none_for_independent_histories() {
638 let tmp = tempfile::tempdir().unwrap();
639 let log = OpLog::open(tmp.path()).unwrap();
640 let a = add_op();
641 log.put(&a).unwrap();
642 let b = OperationRecord::new(
644 Operation::new(
645 OperationKind::AddFunction {
646 sig_id: "double::Int->Int".into(),
647 stage_id: "ddd111".into(),
648 effects: BTreeSet::new(),
649 budget_cost: None,
650 },
651 [],
652 ),
653 StageTransition::Create {
654 sig_id: "double::Int->Int".into(),
655 stage_id: "ddd111".into(),
656 },
657 );
658 log.put(&b).unwrap();
659
660 assert_eq!(log.lca(&a.op_id, &b.op_id).unwrap(), None);
661 }
662
663 #[test]
664 fn ops_since_excludes_base_history() {
665 let tmp = tempfile::tempdir().unwrap();
666 let log = OpLog::open(tmp.path()).unwrap();
667 let a = add_op();
668 log.put(&a).unwrap();
669 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
670 log.put(&b).unwrap();
671 let c = modify_op(&b.op_id, "fac::Int->Int", "def456", "789aaa");
672 log.put(&c).unwrap();
673
674 let since: Vec<_> = log
675 .ops_since(&c.op_id, Some(&a.op_id))
676 .unwrap()
677 .into_iter()
678 .map(|r| r.op_id)
679 .collect();
680 assert_eq!(since.len(), 2);
681 assert!(since.contains(&b.op_id));
682 assert!(since.contains(&c.op_id));
683 assert!(!since.contains(&a.op_id));
684 }
685
686 #[test]
687 fn repack_consolidates_loose_files_into_a_pack() {
688 let tmp = tempfile::tempdir().unwrap();
689 let log = OpLog::open(tmp.path()).unwrap();
690 let a = add_op();
691 log.put(&a).unwrap();
692 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
693 log.put(&b).unwrap();
694
695 let n = log.repack(0).unwrap(); assert_eq!(n, 2);
697 let ops_dir = tmp.path().join("ops");
698 let loose: Vec<_> = fs::read_dir(&ops_dir).unwrap()
699 .filter_map(|e| e.ok())
700 .filter(|e| e.path().extension().is_some_and(|x| x == "json"))
701 .filter(|e| !e.file_name().to_string_lossy().starts_with("pack-"))
702 .collect();
703 assert!(loose.is_empty(), "loose .json files should be deleted");
704 let packs: Vec<_> = fs::read_dir(&ops_dir).unwrap()
705 .filter_map(|e| e.ok())
706 .filter(|e| e.path().extension().is_some_and(|x| x == "pack"))
707 .collect();
708 assert_eq!(packs.len(), 1);
709
710 assert_eq!(log.get(&a.op_id).unwrap().unwrap(), a);
712 assert_eq!(log.get(&b.op_id).unwrap().unwrap(), b);
713 }
714
715 #[test]
716 fn repack_below_threshold_is_a_noop() {
717 let tmp = tempfile::tempdir().unwrap();
718 let log = OpLog::open(tmp.path()).unwrap();
719 log.put(&add_op()).unwrap();
720 let n = log.repack(10).unwrap();
721 assert_eq!(n, 0);
722 }
723
724 #[test]
725 fn repack_is_deterministic_on_same_input() {
726 let make_log = || {
729 let tmp = tempfile::tempdir().unwrap();
730 let log = OpLog::open(tmp.path()).unwrap();
731 let a = add_op();
732 log.put(&a).unwrap();
733 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "def456");
734 log.put(&b).unwrap();
735 log.repack(0).unwrap();
736 (tmp, log)
737 };
738 let (tmp1, _log1) = make_log();
739 let (tmp2, _log2) = make_log();
740 let pack_name = |dir: &std::path::Path| -> String {
741 fs::read_dir(dir.join("ops")).unwrap()
742 .filter_map(|e| e.ok())
743 .find(|e| e.path().extension().is_some_and(|x| x == "pack"))
744 .unwrap()
745 .file_name().into_string().unwrap()
746 };
747 assert_eq!(pack_name(tmp1.path()), pack_name(tmp2.path()));
748 }
749
750 #[test]
751 fn walk_back_works_across_loose_and_packed_ops() {
752 let tmp = tempfile::tempdir().unwrap();
755 let log = OpLog::open(tmp.path()).unwrap();
756 let a = add_op();
757 log.put(&a).unwrap();
758 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
759 log.put(&b).unwrap();
760 log.repack(0).unwrap();
761 let c = modify_op(&b.op_id, "fac::Int->Int", "b1", "c1");
763 log.put(&c).unwrap();
764
765 let walked = log.walk_back(&c.op_id, None).unwrap();
766 let ids: Vec<_> = walked.iter().map(|r| r.op_id.as_str()).collect();
767 assert_eq!(ids, vec![c.op_id.as_str(), b.op_id.as_str(), a.op_id.as_str()]);
768 }
769
770 #[test]
771 fn list_all_dedups_across_loose_and_pack() {
772 let tmp = tempfile::tempdir().unwrap();
773 let log = OpLog::open(tmp.path()).unwrap();
774 let a = add_op();
775 log.put(&a).unwrap();
776 log.repack(0).unwrap();
777 log.put(&a).unwrap();
781
782 let all = log.list_all().unwrap();
783 assert_eq!(all.len(), 1);
784 assert_eq!(all[0].op_id, a.op_id);
785 }
786
787 #[test]
788 fn walk_back_orders_ancestors_after_descendants() {
789 let tmp = tempfile::tempdir().unwrap();
800 let log = OpLog::open(tmp.path()).unwrap();
801 let a = add_op();
802 log.put(&a).unwrap();
803 let b = modify_op(&a.op_id, "fac::Int->Int", "abc123", "b1");
804 log.put(&b).unwrap();
805 let c = OperationRecord::new(
806 Operation::new(
807 OperationKind::ModifyBody {
808 sig_id: "double::Int->Int".into(),
809 from_stage_id: "ddd000".into(),
810 to_stage_id: "c1".into(),
811 from_budget: None,
812 to_budget: None,
813 },
814 [a.op_id.clone()],
815 ),
816 StageTransition::Replace {
817 sig_id: "double::Int->Int".into(),
818 from: "ddd000".into(),
819 to: "c1".into(),
820 },
821 );
822 log.put(&c).unwrap();
823 let m = OperationRecord::new(
824 Operation::new(
825 OperationKind::Merge { resolved: 0 },
826 [b.op_id.clone(), c.op_id.clone()],
827 ),
828 StageTransition::Merge { entries: BTreeMap::new() },
829 );
830 log.put(&m).unwrap();
831
832 let walked = log.walk_back(&m.op_id, None).unwrap();
833 let pos = |id: &str| walked.iter().position(|r| r.op_id == id).unwrap();
834 let (m_pos, b_pos, c_pos, a_pos) =
835 (pos(&m.op_id), pos(&b.op_id), pos(&c.op_id), pos(&a.op_id));
836 assert!(m_pos < b_pos, "merge before its parent b");
838 assert!(m_pos < c_pos, "merge before its parent c");
839 assert!(b_pos < a_pos, "b before its parent a");
840 assert!(c_pos < a_pos, "c before its parent a");
841 }
842}