1use std::collections::HashSet;
28use std::fs::{self, File, OpenOptions};
29use std::io::{Read, Seek, SeekFrom, Write};
30use std::path::{Path, PathBuf};
31use std::sync::Mutex;
32
33use anyhow::{bail, Context, Result};
34use blake2::{Blake2b512, Digest};
35use dashmap::DashMap;
36
37const DEFAULT_MAX_SEGMENT_BYTES: u64 = 256 * 1024 * 1024;
39
40const IDX_MAGIC: &[u8; 4] = b"NIX1";
42const IDX_ENTRY_BYTES: usize = 32 + 8 + 4;
44const IDX_HEADER_BYTES: usize = 4 + 8;
46const IDX_CHECKSUM_BYTES: usize = 32;
47
48#[derive(Clone, Copy, Debug)]
50struct SegmentLocation {
51 segment_id: u32,
52 offset: u64,
54 len: u32,
55}
56
57#[derive(Clone, Copy, Debug, Default)]
59pub struct CompactStats {
60 pub live_objects: usize,
62 pub dropped_objects: usize,
64 pub bytes_reclaimed: u64,
66 pub segments_after: usize,
68}
69
70fn blake2b_raw(data: &[u8]) -> [u8; 32] {
72 let mut h = Blake2b512::new();
73 h.update(data);
74 let out = h.finalize();
75 let mut a = [0u8; 32];
76 a.copy_from_slice(&out[..32]);
77 a
78}
79
80fn blake2b(data: &[u8]) -> String {
83 hex::encode(blake2b_raw(data))
84}
85
86struct Active {
88 id: u32,
89 file: File,
90 offset: u64,
92}
93
94pub struct SegmentStore {
96 dir: PathBuf,
97 index: DashMap<String, SegmentLocation>,
98 active: Mutex<Active>,
99 max_segment_bytes: u64,
100}
101
102impl SegmentStore {
103 fn seg_path(dir: &Path, id: u32) -> PathBuf {
104 dir.join(format!("seg-{:06}.dat", id))
105 }
106 fn idx_path(dir: &Path, id: u32) -> PathBuf {
107 dir.join(format!("seg-{:06}.idx", id))
108 }
109
110 pub fn open(objects_root: &Path) -> Result<Self> {
112 Self::open_with_max(objects_root, DEFAULT_MAX_SEGMENT_BYTES)
113 }
114
115 pub fn open_with_max(objects_root: &Path, max_segment_bytes: u64) -> Result<Self> {
117 let dir = objects_root.join("segments");
118 fs::create_dir_all(&dir).context("create objects/segments dir")?;
119
120 let mut ids: Vec<u32> = Vec::new();
122 for entry in fs::read_dir(&dir).context("read segments dir")? {
123 let entry = entry?;
124 let name = entry.file_name().to_string_lossy().to_string();
125 if let Some(rest) = name.strip_prefix("seg-") {
126 if let Some(num) = rest.strip_suffix(".dat") {
127 if let Ok(id) = num.parse::<u32>() {
128 ids.push(id);
129 }
130 }
131 }
132 }
133 ids.sort_unstable();
134
135 let index: DashMap<String, SegmentLocation> = DashMap::new();
136 let mut active_id: u32 = 0;
137 let mut active_end: u64 = 0;
138
139 for (pos, &id) in ids.iter().enumerate() {
140 let is_last = pos + 1 == ids.len();
141 if is_last {
142 let (valid_end, entries) = Self::scan_segment(&dir, id)?;
145 for (h, o, l) in entries {
146 index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
147 }
148 let path = Self::seg_path(&dir, id);
149 let file_len = fs::metadata(&path)?.len();
150 if valid_end < file_len {
151 let f = OpenOptions::new().write(true).open(&path)?;
152 f.set_len(valid_end)?;
153 }
154 active_id = id;
155 active_end = valid_end;
156 } else {
157 match Self::load_idx(&dir, id) {
160 Ok(Some(entries)) => {
161 for (h, o, l) in entries {
162 index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
163 }
164 }
165 _ => {
166 let (_ve, entries) = Self::scan_segment(&dir, id)?;
167 for (h, o, l) in &entries {
168 index.insert(h.clone(), SegmentLocation { segment_id: id, offset: *o, len: *l });
169 }
170 let _ = Self::write_idx(&dir, id, &entries); }
172 }
173 }
174 }
175
176 let active_path = Self::seg_path(&dir, active_id);
178 let mut file = OpenOptions::new()
179 .create(true)
180 .read(true)
181 .write(true)
182 .open(&active_path)
183 .with_context(|| format!("open active segment {:?}", active_path))?;
184 file.seek(SeekFrom::Start(active_end))?;
185
186 Ok(Self {
187 dir,
188 index,
189 active: Mutex::new(Active { id: active_id, file, offset: active_end }),
190 max_segment_bytes,
191 })
192 }
193
194 fn scan_segment(dir: &Path, id: u32) -> Result<(u64, Vec<(String, u64, u32)>)> {
197 let path = Self::seg_path(dir, id);
198 let mut f = match File::open(&path) {
199 Ok(f) => f,
200 Err(_) => return Ok((0, Vec::new())),
201 };
202 let file_len = f.metadata()?.len();
203 let mut pos: u64 = 0;
204 let mut entries: Vec<(String, u64, u32)> = Vec::new();
205 loop {
206 if pos + 4 > file_len {
207 break; }
209 f.seek(SeekFrom::Start(pos))?;
210 let mut len_buf = [0u8; 4];
211 if f.read_exact(&mut len_buf).is_err() {
212 break;
213 }
214 let len = u32::from_le_bytes(len_buf);
215 let content_off = pos + 4;
216 if content_off + (len as u64) > file_len {
217 break; }
219 let mut content = vec![0u8; len as usize];
220 if f.read_exact(&mut content).is_err() {
221 break;
222 }
223 entries.push((blake2b(&content), content_off, len));
224 pos = content_off + len as u64;
225 }
226 Ok((pos, entries))
227 }
228
229 fn read_content(dir: &Path, loc: &SegmentLocation, expect_hash: &str) -> Result<Vec<u8>> {
231 let path = Self::seg_path(dir, loc.segment_id);
232 let mut f = File::open(&path).with_context(|| format!("open segment {:?}", path))?;
233 f.seek(SeekFrom::Start(loc.offset))?;
234 let mut content = vec![0u8; loc.len as usize];
235 f.read_exact(&mut content)
236 .with_context(|| format!("read record from segment {}", loc.segment_id))?;
237 let actual = blake2b(&content);
238 if actual != expect_hash {
239 bail!("segment object {} tampered: recomputed {}", expect_hash, actual);
240 }
241 Ok(content)
242 }
243
244 fn write_idx(dir: &Path, id: u32, entries: &[(String, u64, u32)]) -> Result<()> {
249 let mut body: Vec<u8> = Vec::with_capacity(IDX_HEADER_BYTES + entries.len() * IDX_ENTRY_BYTES);
250 body.extend_from_slice(IDX_MAGIC);
251 body.extend_from_slice(&(entries.len() as u64).to_le_bytes());
252 for (hash, off, len) in entries {
253 let raw = hex::decode(hash).map_err(|_| anyhow::anyhow!("bad hash hex in idx write"))?;
254 if raw.len() != 32 {
255 bail!("idx write: hash not 32 bytes");
256 }
257 body.extend_from_slice(&raw);
258 body.extend_from_slice(&off.to_le_bytes());
259 body.extend_from_slice(&len.to_le_bytes());
260 }
261 let checksum = blake2b_raw(&body);
262 body.extend_from_slice(&checksum);
263
264 let path = Self::idx_path(dir, id);
265 let tmp = path.with_extension("idx.tmp");
266 fs::write(&tmp, &body)?;
267 fs::rename(&tmp, &path)?;
268 Ok(())
269 }
270
271 fn load_idx(dir: &Path, id: u32) -> Result<Option<Vec<(String, u64, u32)>>> {
274 let path = Self::idx_path(dir, id);
275 let data = match fs::read(&path) {
276 Ok(d) => d,
277 Err(_) => return Ok(None),
278 };
279 if data.len() < IDX_HEADER_BYTES + IDX_CHECKSUM_BYTES {
280 return Ok(None);
281 }
282 if &data[0..4] != IDX_MAGIC {
283 return Ok(None);
284 }
285 let count = u64::from_le_bytes(data[4..12].try_into().unwrap()) as usize;
286 let expected = IDX_HEADER_BYTES + count * IDX_ENTRY_BYTES + IDX_CHECKSUM_BYTES;
287 if data.len() != expected {
288 return Ok(None);
289 }
290 let body = &data[..data.len() - IDX_CHECKSUM_BYTES];
291 let stored: [u8; 32] = match data[data.len() - IDX_CHECKSUM_BYTES..].try_into() {
292 Ok(a) => a,
293 Err(_) => return Ok(None),
294 };
295 if blake2b_raw(body) != stored {
296 return Ok(None); }
298 let mut entries = Vec::with_capacity(count);
299 let mut p = IDX_HEADER_BYTES;
300 for _ in 0..count {
301 let hash = hex::encode(&data[p..p + 32]);
302 let off = u64::from_le_bytes(data[p + 32..p + 40].try_into().unwrap());
303 let len = u32::from_le_bytes(data[p + 40..p + 44].try_into().unwrap());
304 entries.push((hash, off, len));
305 p += IDX_ENTRY_BYTES;
306 }
307 Ok(Some(entries))
308 }
309
310 fn entries_for_segment(&self, id: u32) -> Vec<(String, u64, u32)> {
312 self.index
313 .iter()
314 .filter(|e| e.value().segment_id == id)
315 .map(|e| (e.key().clone(), e.value().offset, e.value().len))
316 .collect()
317 }
318
319 pub fn contains(&self, hash: &str) -> bool {
323 self.index.contains_key(hash)
324 }
325
326 pub fn put(&self, hash: &str, content: &[u8]) -> Result<()> {
329 if self.index.contains_key(hash) {
330 return Ok(());
331 }
332 let len = content.len() as u32;
333 let record_size = 4u64 + content.len() as u64;
334
335 let mut active = self.active.lock().unwrap();
336 if self.index.contains_key(hash) {
337 return Ok(());
338 }
339
340 if active.offset > 0 && active.offset + record_size > self.max_segment_bytes {
342 let _ = active.file.flush();
343 let _ = active.file.sync_all();
344 let sealed_id = active.id;
346 let entries = self.entries_for_segment(sealed_id);
347 let _ = Self::write_idx(&self.dir, sealed_id, &entries);
348 let next_id = sealed_id + 1;
349 let path = Self::seg_path(&self.dir, next_id);
350 let file = OpenOptions::new()
351 .create(true)
352 .read(true)
353 .write(true)
354 .open(&path)
355 .with_context(|| format!("open new segment {:?}", path))?;
356 *active = Active { id: next_id, file, offset: 0 };
357 }
358
359 let content_off = active.offset + 4;
360 let mut rec = Vec::with_capacity(4 + content.len());
361 rec.extend_from_slice(&len.to_le_bytes());
362 rec.extend_from_slice(content);
363 active.file.write_all(&rec)?;
364
365 let seg_id = active.id;
366 active.offset += record_size;
367 self.index.insert(
368 hash.to_string(),
369 SegmentLocation { segment_id: seg_id, offset: content_off, len },
370 );
371 Ok(())
372 }
373
374 pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
377 let loc = match self.index.get(hash) {
378 Some(entry) => *entry.value(),
379 None => return Ok(None),
380 };
381 Ok(Some(Self::read_content(&self.dir, &loc, hash)?))
382 }
383
384 pub fn all_hashes(&self) -> Vec<String> {
386 self.index.iter().map(|e| e.key().clone()).collect()
387 }
388
389 pub fn sync(&self) -> Result<()> {
391 let mut active = self.active.lock().unwrap();
392 let _ = active.file.flush();
393 active.file.sync_all().context("fsync active segment")?;
394 Ok(())
395 }
396
397 pub fn compact(&self, live: &HashSet<String>) -> Result<CompactStats> {
416 let mut active = self.active.lock().unwrap();
417
418 let total_before = self.index.len();
419 let old_max = active.id;
420 let new_base = old_max + 1;
421
422 let to_copy: Vec<(String, SegmentLocation)> = self
424 .index
425 .iter()
426 .filter(|e| live.contains(e.key()))
427 .map(|e| (e.key().clone(), *e.value()))
428 .collect();
429
430 let new_index: DashMap<String, SegmentLocation> = DashMap::new();
432 let mut cur_id = new_base;
433 let mut cur_path = Self::seg_path(&self.dir, cur_id);
434 let mut cur_file = OpenOptions::new()
435 .create(true)
436 .truncate(true)
437 .read(true)
438 .write(true)
439 .open(&cur_path)
440 .with_context(|| format!("open compaction segment {:?}", cur_path))?;
441 let mut cur_off: u64 = 0;
442
443 for (hash, loc) in &to_copy {
444 let content = Self::read_content(&self.dir, loc, hash)?;
445 let len = content.len() as u32;
446 let record_size = 4u64 + content.len() as u64;
447
448 if cur_off > 0 && cur_off + record_size > self.max_segment_bytes {
449 let _ = cur_file.flush();
450 cur_file.sync_all().context("fsync sealed compaction segment")?;
451 let entries: Vec<(String, u64, u32)> = new_index
452 .iter()
453 .filter(|e| e.value().segment_id == cur_id)
454 .map(|e| (e.key().clone(), e.value().offset, e.value().len))
455 .collect();
456 let _ = Self::write_idx(&self.dir, cur_id, &entries);
457 cur_id += 1;
458 cur_path = Self::seg_path(&self.dir, cur_id);
459 cur_file = OpenOptions::new()
460 .create(true)
461 .truncate(true)
462 .read(true)
463 .write(true)
464 .open(&cur_path)
465 .with_context(|| format!("open compaction segment {:?}", cur_path))?;
466 cur_off = 0;
467 }
468
469 let content_off = cur_off + 4;
470 let mut rec = Vec::with_capacity(4 + content.len());
471 rec.extend_from_slice(&len.to_le_bytes());
472 rec.extend_from_slice(&content);
473 cur_file.write_all(&rec)?;
474 new_index.insert(hash.clone(), SegmentLocation { segment_id: cur_id, offset: content_off, len });
475 cur_off += record_size;
476 }
477 let _ = cur_file.flush();
478 cur_file.sync_all().context("fsync active compaction segment")?;
479
480 let live_objects = to_copy.len();
482
483 self.index.clear();
485 for e in new_index.iter() {
486 self.index.insert(e.key().clone(), *e.value());
487 }
488 *active = Active { id: cur_id, file: cur_file, offset: cur_off };
489
490 let mut bytes_reclaimed: u64 = 0;
493 if let Ok(rd) = fs::read_dir(&self.dir) {
494 for entry in rd.flatten() {
495 let name = entry.file_name().to_string_lossy().to_string();
496 let id_of = name
497 .strip_prefix("seg-")
498 .and_then(|r| r.strip_suffix(".dat").or_else(|| r.strip_suffix(".idx")))
499 .and_then(|n| n.parse::<u32>().ok());
500 if let Some(id) = id_of {
501 if id < new_base {
502 if name.ends_with(".dat") {
503 if let Ok(m) = entry.metadata() {
504 bytes_reclaimed += m.len();
505 }
506 }
507 let _ = fs::remove_file(entry.path());
508 }
509 }
510 }
511 }
512
513 let segments_after = (cur_id - new_base + 1) as usize;
514 Ok(CompactStats {
515 live_objects,
516 dropped_objects: total_before.saturating_sub(live_objects),
517 bytes_reclaimed,
518 segments_after,
519 })
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use tempfile::tempdir;
527
528 fn put_get_hash(s: &SegmentStore, content: &[u8]) -> String {
529 let h = blake2b(content);
530 s.put(&h, content).unwrap();
531 h
532 }
533
534 #[test]
535 fn put_get_roundtrip() {
536 let dir = tempdir().unwrap();
537 let s = SegmentStore::open(dir.path()).unwrap();
538 let h = put_get_hash(&s, b"hello nedb v3");
539 assert_eq!(s.get(&h).unwrap().unwrap(), b"hello nedb v3");
540 assert!(s.contains(&h));
541 assert!(s.get(&"0".repeat(64)).unwrap().is_none());
542 }
543
544 #[test]
545 fn idempotent_put() {
546 let dir = tempdir().unwrap();
547 let s = SegmentStore::open(dir.path()).unwrap();
548 let h1 = put_get_hash(&s, b"dup");
549 let h2 = put_get_hash(&s, b"dup");
550 assert_eq!(h1, h2);
551 assert_eq!(s.all_hashes().len(), 1);
552 }
553
554 #[test]
555 fn index_rebuilt_on_reopen() {
556 let dir = tempdir().unwrap();
557 let h = {
558 let s = SegmentStore::open(dir.path()).unwrap();
559 let h = put_get_hash(&s, b"persisted");
560 s.sync().unwrap();
561 h
562 };
563 let s2 = SegmentStore::open(dir.path()).unwrap();
564 assert_eq!(s2.get(&h).unwrap().unwrap(), b"persisted");
565 }
566
567 #[test]
568 fn rollover_writes_idx_and_reopen_uses_it() {
569 let dir = tempdir().unwrap();
570 let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
571 let mut hashes = Vec::new();
572 for i in 0..8u32 {
573 hashes.push(put_get_hash(&s, format!("record-{}", i).as_bytes()));
574 }
575 s.sync().unwrap();
576 let idx_files = fs::read_dir(dir.path().join("segments"))
578 .unwrap()
579 .flatten()
580 .filter(|e| e.file_name().to_string_lossy().ends_with(".idx"))
581 .count();
582 assert!(idx_files >= 1, "expected at least one sealed .idx");
583 let s2 = SegmentStore::open(dir.path()).unwrap();
585 for h in &hashes {
586 assert!(s2.get(h).unwrap().is_some());
587 }
588 }
589
590 #[test]
591 fn corrupt_idx_falls_back_to_scan() {
592 let dir = tempdir().unwrap();
593 let mut hashes = Vec::new();
594 {
595 let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
596 for i in 0..6u32 {
597 hashes.push(put_get_hash(&s, format!("rec-{}", i).as_bytes()));
598 }
599 s.sync().unwrap();
600 }
601 for e in fs::read_dir(dir.path().join("segments")).unwrap().flatten() {
603 if e.file_name().to_string_lossy().ends_with(".idx") {
604 fs::write(e.path(), b"garbage").unwrap();
605 }
606 }
607 let s2 = SegmentStore::open(dir.path()).unwrap();
608 for h in &hashes {
609 assert!(s2.get(h).unwrap().is_some(), "scan fallback must recover the object");
610 }
611 }
612
613 #[test]
614 fn torn_tail_is_truncated_on_open() {
615 let dir = tempdir().unwrap();
616 let good = {
617 let s = SegmentStore::open(dir.path()).unwrap();
618 let h = put_get_hash(&s, b"good record");
619 s.sync().unwrap();
620 h
621 };
622 let seg = dir.path().join("segments").join("seg-000000.dat");
623 {
624 let mut f = OpenOptions::new().append(true).open(&seg).unwrap();
625 f.write_all(&9999u32.to_le_bytes()).unwrap();
626 f.write_all(b"short").unwrap();
627 }
628 let s2 = SegmentStore::open(dir.path()).unwrap();
629 assert_eq!(s2.get(&good).unwrap().unwrap(), b"good record");
630 let h2 = put_get_hash(&s2, b"after recovery");
631 assert!(s2.get(&h2).unwrap().is_some());
632 }
633
634 #[test]
635 fn tamper_detected_on_read() {
636 let dir = tempdir().unwrap();
637 let h = {
638 let s = SegmentStore::open(dir.path()).unwrap();
639 let h = put_get_hash(&s, b"authentic");
640 s.sync().unwrap();
641 h
642 };
643 let seg = dir.path().join("segments").join("seg-000000.dat");
644 let mut bytes = fs::read(&seg).unwrap();
645 let n = bytes.len();
646 bytes[n - 1] ^= 0xff;
647 fs::write(&seg, bytes).unwrap();
648 let s2 = SegmentStore::open(dir.path()).unwrap();
649 match s2.get(&h) {
650 Ok(None) => {}
651 Err(_) => {}
652 Ok(Some(_)) => panic!("tampered content must not verify under original hash"),
653 }
654 }
655
656 #[test]
657 fn compaction_keeps_live_drops_dead() {
658 let dir = tempdir().unwrap();
659 let s = SegmentStore::open(dir.path()).unwrap();
660 let keep = put_get_hash(&s, b"keep me");
661 let _drop1 = put_get_hash(&s, b"drop me 1");
662 let _drop2 = put_get_hash(&s, b"drop me 2");
663 s.sync().unwrap();
664 assert_eq!(s.all_hashes().len(), 3);
665
666 let mut live = HashSet::new();
667 live.insert(keep.clone());
668 let stats = s.compact(&live).unwrap();
669 assert_eq!(stats.live_objects, 1);
670 assert_eq!(stats.dropped_objects, 2);
671
672 assert_eq!(s.get(&keep).unwrap().unwrap(), b"keep me");
674 assert_eq!(s.all_hashes().len(), 1);
675
676 let s2 = SegmentStore::open(dir.path()).unwrap();
678 assert_eq!(s2.get(&keep).unwrap().unwrap(), b"keep me");
679 assert!(s2.get(&_drop1).unwrap().is_none());
680
681 let after = put_get_hash(&s, b"post-compaction");
683 assert!(s.get(&after).unwrap().is_some());
684 }
685
686 #[test]
687 fn compaction_reclaims_and_writes_still_read() {
688 let dir = tempdir().unwrap();
689 let s = SegmentStore::open_with_max(dir.path(), 64).unwrap();
690 let mut all = Vec::new();
691 for i in 0..20u32 {
692 all.push(put_get_hash(&s, format!("obj-{:03}", i).as_bytes()));
693 }
694 s.sync().unwrap();
695 let mut live = HashSet::new();
697 for (i, h) in all.iter().enumerate() {
698 if i % 2 == 0 {
699 live.insert(h.clone());
700 }
701 }
702 let stats = s.compact(&live).unwrap();
703 assert_eq!(stats.live_objects, 10);
704 assert_eq!(stats.dropped_objects, 10);
705 for (i, h) in all.iter().enumerate() {
706 let got = s.get(h).unwrap();
707 if i % 2 == 0 {
708 assert!(got.is_some(), "live object {} must survive", i);
709 } else {
710 assert!(got.is_none(), "dead object {} must be pruned", i);
711 }
712 }
713 }
714}