1use std::collections::HashSet;
28use std::fs::{self, File, OpenOptions};
29use std::io::{Read, Seek, SeekFrom, Write};
30use std::path::{Path, PathBuf};
31use std::sync::Mutex;
32
33use anyhow::{bail, Context, Result};
34use blake2::{Blake2b512, Digest};
35use dashmap::DashMap;
36
37const DEFAULT_MAX_SEGMENT_BYTES: u64 = 256 * 1024 * 1024;
39
40const IDX_MAGIC: &[u8; 4] = b"NIX1";
42const IDX_ENTRY_BYTES: usize = 32 + 8 + 4;
44const IDX_HEADER_BYTES: usize = 4 + 8;
46const IDX_CHECKSUM_BYTES: usize = 32;
47
48#[derive(Clone, Copy, Debug)]
50struct SegmentLocation {
51 segment_id: u32,
52 offset: u64,
54 len: u32,
55}
56
57#[derive(Clone, Copy, Debug, Default)]
59pub struct CompactStats {
60 pub live_objects: usize,
62 pub dropped_objects: usize,
64 pub bytes_reclaimed: u64,
66 pub segments_after: usize,
68}
69
70fn blake2b_raw(data: &[u8]) -> [u8; 32] {
72 let mut h = Blake2b512::new();
73 h.update(data);
74 let out = h.finalize();
75 let mut a = [0u8; 32];
76 a.copy_from_slice(&out[..32]);
77 a
78}
79
80fn blake2b(data: &[u8]) -> String {
83 hex::encode(blake2b_raw(data))
84}
85
86struct Active {
88 id: u32,
89 file: File,
90 offset: u64,
92}
93
94pub struct SegmentStore {
96 dir: PathBuf,
97 index: DashMap<String, SegmentLocation>,
98 active: Mutex<Active>,
99 max_segment_bytes: u64,
100 fast_fsync: bool,
104}
105
106fn durable_sync(file: &File, fast: bool) -> std::io::Result<()> {
121 #[cfg(target_os = "macos")]
122 if fast {
123 use std::os::unix::io::AsRawFd;
124 let rc = unsafe { libc::fsync(file.as_raw_fd()) };
126 return if rc == 0 { Ok(()) } else { Err(std::io::Error::last_os_error()) };
127 }
128 #[cfg(not(target_os = "macos"))]
129 let _ = fast;
130 file.sync_all()
131}
132
133impl SegmentStore {
134 fn seg_path(dir: &Path, id: u32) -> PathBuf {
135 dir.join(format!("seg-{:06}.dat", id))
136 }
137 fn idx_path(dir: &Path, id: u32) -> PathBuf {
138 dir.join(format!("seg-{:06}.idx", id))
139 }
140
141 pub fn open(objects_root: &Path) -> Result<Self> {
143 Self::open_with_max(objects_root, DEFAULT_MAX_SEGMENT_BYTES)
144 }
145
146 pub fn open_with_max(objects_root: &Path, max_segment_bytes: u64) -> Result<Self> {
148 let dir = objects_root.join("segments");
149 fs::create_dir_all(&dir).context("create objects/segments dir")?;
150
151 let mut ids: Vec<u32> = Vec::new();
153 for entry in fs::read_dir(&dir).context("read segments dir")? {
154 let entry = entry?;
155 let name = entry.file_name().to_string_lossy().to_string();
156 if let Some(rest) = name.strip_prefix("seg-") {
157 if let Some(num) = rest.strip_suffix(".dat") {
158 if let Ok(id) = num.parse::<u32>() {
159 ids.push(id);
160 }
161 }
162 }
163 }
164 ids.sort_unstable();
165
166 let index: DashMap<String, SegmentLocation> = DashMap::new();
167 let mut active_id: u32 = 0;
168 let mut active_end: u64 = 0;
169
170 for (pos, &id) in ids.iter().enumerate() {
171 let is_last = pos + 1 == ids.len();
172 if is_last {
173 let (valid_end, entries) = Self::scan_segment(&dir, id)?;
176 for (h, o, l) in entries {
177 index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
178 }
179 let path = Self::seg_path(&dir, id);
180 let file_len = fs::metadata(&path)?.len();
181 if valid_end < file_len {
182 let f = OpenOptions::new().write(true).open(&path)?;
183 f.set_len(valid_end)?;
184 }
185 active_id = id;
186 active_end = valid_end;
187 } else {
188 match Self::load_idx(&dir, id) {
191 Ok(Some(entries)) => {
192 for (h, o, l) in entries {
193 index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
194 }
195 }
196 _ => {
197 let (_ve, entries) = Self::scan_segment(&dir, id)?;
198 for (h, o, l) in &entries {
199 index.insert(h.clone(), SegmentLocation { segment_id: id, offset: *o, len: *l });
200 }
201 let _ = Self::write_idx(&dir, id, &entries); }
203 }
204 }
205 }
206
207 let active_path = Self::seg_path(&dir, active_id);
209 let mut file = OpenOptions::new()
210 .create(true)
211 .read(true)
212 .write(true)
213 .open(&active_path)
214 .with_context(|| format!("open active segment {:?}", active_path))?;
215 file.seek(SeekFrom::Start(active_end))?;
216
217 let fast_fsync = std::env::var("NEDB_FAST_FSYNC")
218 .map(|v| {
219 let v = v.trim();
220 v == "1" || v.eq_ignore_ascii_case("true")
221 || v.eq_ignore_ascii_case("on")
222 || v.eq_ignore_ascii_case("yes")
223 })
224 .unwrap_or(false);
225
226 Ok(Self {
227 dir,
228 index,
229 active: Mutex::new(Active { id: active_id, file, offset: active_end }),
230 max_segment_bytes,
231 fast_fsync,
232 })
233 }
234
235 fn scan_segment(dir: &Path, id: u32) -> Result<(u64, Vec<(String, u64, u32)>)> {
238 let path = Self::seg_path(dir, id);
239 let mut f = match File::open(&path) {
240 Ok(f) => f,
241 Err(_) => return Ok((0, Vec::new())),
242 };
243 let file_len = f.metadata()?.len();
244 let mut pos: u64 = 0;
245 let mut entries: Vec<(String, u64, u32)> = Vec::new();
246 loop {
247 if pos + 4 > file_len {
248 break; }
250 f.seek(SeekFrom::Start(pos))?;
251 let mut len_buf = [0u8; 4];
252 if f.read_exact(&mut len_buf).is_err() {
253 break;
254 }
255 let len = u32::from_le_bytes(len_buf);
256 let content_off = pos + 4;
257 if content_off + (len as u64) > file_len {
258 break; }
260 let mut content = vec![0u8; len as usize];
261 if f.read_exact(&mut content).is_err() {
262 break;
263 }
264 entries.push((blake2b(&content), content_off, len));
265 pos = content_off + len as u64;
266 }
267 Ok((pos, entries))
268 }
269
270 fn read_content(dir: &Path, loc: &SegmentLocation, expect_hash: &str) -> Result<Vec<u8>> {
272 let path = Self::seg_path(dir, loc.segment_id);
273 let mut f = File::open(&path).with_context(|| format!("open segment {:?}", path))?;
274 f.seek(SeekFrom::Start(loc.offset))?;
275 let mut content = vec![0u8; loc.len as usize];
276 f.read_exact(&mut content)
277 .with_context(|| format!("read record from segment {}", loc.segment_id))?;
278 let actual = blake2b(&content);
279 if actual != expect_hash {
280 bail!("segment object {} tampered: recomputed {}", expect_hash, actual);
281 }
282 Ok(content)
283 }
284
285 fn write_idx(dir: &Path, id: u32, entries: &[(String, u64, u32)]) -> Result<()> {
290 let mut body: Vec<u8> = Vec::with_capacity(IDX_HEADER_BYTES + entries.len() * IDX_ENTRY_BYTES);
291 body.extend_from_slice(IDX_MAGIC);
292 body.extend_from_slice(&(entries.len() as u64).to_le_bytes());
293 for (hash, off, len) in entries {
294 let raw = hex::decode(hash).map_err(|_| anyhow::anyhow!("bad hash hex in idx write"))?;
295 if raw.len() != 32 {
296 bail!("idx write: hash not 32 bytes");
297 }
298 body.extend_from_slice(&raw);
299 body.extend_from_slice(&off.to_le_bytes());
300 body.extend_from_slice(&len.to_le_bytes());
301 }
302 let checksum = blake2b_raw(&body);
303 body.extend_from_slice(&checksum);
304
305 let path = Self::idx_path(dir, id);
306 let tmp = path.with_extension("idx.tmp");
307 fs::write(&tmp, &body)?;
308 fs::rename(&tmp, &path)?;
309 Ok(())
310 }
311
312 fn load_idx(dir: &Path, id: u32) -> Result<Option<Vec<(String, u64, u32)>>> {
315 let path = Self::idx_path(dir, id);
316 let data = match fs::read(&path) {
317 Ok(d) => d,
318 Err(_) => return Ok(None),
319 };
320 if data.len() < IDX_HEADER_BYTES + IDX_CHECKSUM_BYTES {
321 return Ok(None);
322 }
323 if &data[0..4] != IDX_MAGIC {
324 return Ok(None);
325 }
326 let count = u64::from_le_bytes(data[4..12].try_into().unwrap()) as usize;
327 let expected = IDX_HEADER_BYTES + count * IDX_ENTRY_BYTES + IDX_CHECKSUM_BYTES;
328 if data.len() != expected {
329 return Ok(None);
330 }
331 let body = &data[..data.len() - IDX_CHECKSUM_BYTES];
332 let stored: [u8; 32] = match data[data.len() - IDX_CHECKSUM_BYTES..].try_into() {
333 Ok(a) => a,
334 Err(_) => return Ok(None),
335 };
336 if blake2b_raw(body) != stored {
337 return Ok(None); }
339 let mut entries = Vec::with_capacity(count);
340 let mut p = IDX_HEADER_BYTES;
341 for _ in 0..count {
342 let hash = hex::encode(&data[p..p + 32]);
343 let off = u64::from_le_bytes(data[p + 32..p + 40].try_into().unwrap());
344 let len = u32::from_le_bytes(data[p + 40..p + 44].try_into().unwrap());
345 entries.push((hash, off, len));
346 p += IDX_ENTRY_BYTES;
347 }
348 Ok(Some(entries))
349 }
350
351 fn entries_for_segment(&self, id: u32) -> Vec<(String, u64, u32)> {
353 self.index
354 .iter()
355 .filter(|e| e.value().segment_id == id)
356 .map(|e| (e.key().clone(), e.value().offset, e.value().len))
357 .collect()
358 }
359
360 pub fn contains(&self, hash: &str) -> bool {
364 self.index.contains_key(hash)
365 }
366
367 pub fn put(&self, hash: &str, content: &[u8]) -> Result<()> {
370 if self.index.contains_key(hash) {
371 return Ok(());
372 }
373 let len = content.len() as u32;
374 let record_size = 4u64 + content.len() as u64;
375
376 let mut active = self.active.lock().unwrap();
377 if self.index.contains_key(hash) {
378 return Ok(());
379 }
380
381 if active.offset > 0 && active.offset + record_size > self.max_segment_bytes {
383 let _ = active.file.flush();
384 let _ = durable_sync(&active.file, self.fast_fsync);
385 let sealed_id = active.id;
387 let entries = self.entries_for_segment(sealed_id);
388 let _ = Self::write_idx(&self.dir, sealed_id, &entries);
389 let next_id = sealed_id + 1;
390 let path = Self::seg_path(&self.dir, next_id);
391 let file = OpenOptions::new()
392 .create(true)
393 .read(true)
394 .write(true)
395 .open(&path)
396 .with_context(|| format!("open new segment {:?}", path))?;
397 *active = Active { id: next_id, file, offset: 0 };
398 }
399
400 let content_off = active.offset + 4;
401 let mut rec = Vec::with_capacity(4 + content.len());
402 rec.extend_from_slice(&len.to_le_bytes());
403 rec.extend_from_slice(content);
404 active.file.write_all(&rec)?;
405
406 let seg_id = active.id;
407 active.offset += record_size;
408 self.index.insert(
409 hash.to_string(),
410 SegmentLocation { segment_id: seg_id, offset: content_off, len },
411 );
412 Ok(())
413 }
414
415 pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
418 let loc = match self.index.get(hash) {
419 Some(entry) => *entry.value(),
420 None => return Ok(None),
421 };
422 Ok(Some(Self::read_content(&self.dir, &loc, hash)?))
423 }
424
425 pub fn all_hashes(&self) -> Vec<String> {
427 self.index.iter().map(|e| e.key().clone()).collect()
428 }
429
430 pub fn sync(&self) -> Result<()> {
432 let mut active = self.active.lock().unwrap();
433 let _ = active.file.flush();
434 durable_sync(&active.file, self.fast_fsync).context("fsync active segment")?;
435 Ok(())
436 }
437
438 pub fn compact(&self, live: &HashSet<String>) -> Result<CompactStats> {
457 let mut active = self.active.lock().unwrap();
458
459 let total_before = self.index.len();
460 let old_max = active.id;
461 let new_base = old_max + 1;
462
463 let to_copy: Vec<(String, SegmentLocation)> = self
465 .index
466 .iter()
467 .filter(|e| live.contains(e.key()))
468 .map(|e| (e.key().clone(), *e.value()))
469 .collect();
470
471 let new_index: DashMap<String, SegmentLocation> = DashMap::new();
473 let mut cur_id = new_base;
474 let mut cur_path = Self::seg_path(&self.dir, cur_id);
475 let mut cur_file = OpenOptions::new()
476 .create(true)
477 .truncate(true)
478 .read(true)
479 .write(true)
480 .open(&cur_path)
481 .with_context(|| format!("open compaction segment {:?}", cur_path))?;
482 let mut cur_off: u64 = 0;
483
484 for (hash, loc) in &to_copy {
485 let content = Self::read_content(&self.dir, loc, hash)?;
486 let len = content.len() as u32;
487 let record_size = 4u64 + content.len() as u64;
488
489 if cur_off > 0 && cur_off + record_size > self.max_segment_bytes {
490 let _ = cur_file.flush();
491 durable_sync(&cur_file, self.fast_fsync).context("fsync sealed compaction segment")?;
492 let entries: Vec<(String, u64, u32)> = new_index
493 .iter()
494 .filter(|e| e.value().segment_id == cur_id)
495 .map(|e| (e.key().clone(), e.value().offset, e.value().len))
496 .collect();
497 let _ = Self::write_idx(&self.dir, cur_id, &entries);
498 cur_id += 1;
499 cur_path = Self::seg_path(&self.dir, cur_id);
500 cur_file = OpenOptions::new()
501 .create(true)
502 .truncate(true)
503 .read(true)
504 .write(true)
505 .open(&cur_path)
506 .with_context(|| format!("open compaction segment {:?}", cur_path))?;
507 cur_off = 0;
508 }
509
510 let content_off = cur_off + 4;
511 let mut rec = Vec::with_capacity(4 + content.len());
512 rec.extend_from_slice(&len.to_le_bytes());
513 rec.extend_from_slice(&content);
514 cur_file.write_all(&rec)?;
515 new_index.insert(hash.clone(), SegmentLocation { segment_id: cur_id, offset: content_off, len });
516 cur_off += record_size;
517 }
518 let _ = cur_file.flush();
519 durable_sync(&cur_file, self.fast_fsync).context("fsync active compaction segment")?;
520
521 let live_objects = to_copy.len();
523
524 self.index.clear();
526 for e in new_index.iter() {
527 self.index.insert(e.key().clone(), *e.value());
528 }
529 *active = Active { id: cur_id, file: cur_file, offset: cur_off };
530
531 let mut bytes_reclaimed: u64 = 0;
534 if let Ok(rd) = fs::read_dir(&self.dir) {
535 for entry in rd.flatten() {
536 let name = entry.file_name().to_string_lossy().to_string();
537 let id_of = name
538 .strip_prefix("seg-")
539 .and_then(|r| r.strip_suffix(".dat").or_else(|| r.strip_suffix(".idx")))
540 .and_then(|n| n.parse::<u32>().ok());
541 if let Some(id) = id_of {
542 if id < new_base {
543 if name.ends_with(".dat") {
544 if let Ok(m) = entry.metadata() {
545 bytes_reclaimed += m.len();
546 }
547 }
548 let _ = fs::remove_file(entry.path());
549 }
550 }
551 }
552 }
553
554 let segments_after = (cur_id - new_base + 1) as usize;
555 Ok(CompactStats {
556 live_objects,
557 dropped_objects: total_before.saturating_sub(live_objects),
558 bytes_reclaimed,
559 segments_after,
560 })
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use tempfile::tempdir;
568
569 fn put_get_hash(s: &SegmentStore, content: &[u8]) -> String {
570 let h = blake2b(content);
571 s.put(&h, content).unwrap();
572 h
573 }
574
575 #[test]
576 fn put_get_roundtrip() {
577 let dir = tempdir().unwrap();
578 let s = SegmentStore::open(dir.path()).unwrap();
579 let h = put_get_hash(&s, b"hello nedb v3");
580 assert_eq!(s.get(&h).unwrap().unwrap(), b"hello nedb v3");
581 assert!(s.contains(&h));
582 assert!(s.get(&"0".repeat(64)).unwrap().is_none());
583 }
584
585 #[test]
586 fn idempotent_put() {
587 let dir = tempdir().unwrap();
588 let s = SegmentStore::open(dir.path()).unwrap();
589 let h1 = put_get_hash(&s, b"dup");
590 let h2 = put_get_hash(&s, b"dup");
591 assert_eq!(h1, h2);
592 assert_eq!(s.all_hashes().len(), 1);
593 }
594
595 #[test]
596 fn index_rebuilt_on_reopen() {
597 let dir = tempdir().unwrap();
598 let h = {
599 let s = SegmentStore::open(dir.path()).unwrap();
600 let h = put_get_hash(&s, b"persisted");
601 s.sync().unwrap();
602 h
603 };
604 let s2 = SegmentStore::open(dir.path()).unwrap();
605 assert_eq!(s2.get(&h).unwrap().unwrap(), b"persisted");
606 }
607
608 #[test]
609 fn rollover_writes_idx_and_reopen_uses_it() {
610 let dir = tempdir().unwrap();
611 let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
612 let mut hashes = Vec::new();
613 for i in 0..8u32 {
614 hashes.push(put_get_hash(&s, format!("record-{}", i).as_bytes()));
615 }
616 s.sync().unwrap();
617 let idx_files = fs::read_dir(dir.path().join("segments"))
619 .unwrap()
620 .flatten()
621 .filter(|e| e.file_name().to_string_lossy().ends_with(".idx"))
622 .count();
623 assert!(idx_files >= 1, "expected at least one sealed .idx");
624 let s2 = SegmentStore::open(dir.path()).unwrap();
626 for h in &hashes {
627 assert!(s2.get(h).unwrap().is_some());
628 }
629 }
630
631 #[test]
632 fn corrupt_idx_falls_back_to_scan() {
633 let dir = tempdir().unwrap();
634 let mut hashes = Vec::new();
635 {
636 let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
637 for i in 0..6u32 {
638 hashes.push(put_get_hash(&s, format!("rec-{}", i).as_bytes()));
639 }
640 s.sync().unwrap();
641 }
642 for e in fs::read_dir(dir.path().join("segments")).unwrap().flatten() {
644 if e.file_name().to_string_lossy().ends_with(".idx") {
645 fs::write(e.path(), b"garbage").unwrap();
646 }
647 }
648 let s2 = SegmentStore::open(dir.path()).unwrap();
649 for h in &hashes {
650 assert!(s2.get(h).unwrap().is_some(), "scan fallback must recover the object");
651 }
652 }
653
654 #[test]
655 fn torn_tail_is_truncated_on_open() {
656 let dir = tempdir().unwrap();
657 let good = {
658 let s = SegmentStore::open(dir.path()).unwrap();
659 let h = put_get_hash(&s, b"good record");
660 s.sync().unwrap();
661 h
662 };
663 let seg = dir.path().join("segments").join("seg-000000.dat");
664 {
665 let mut f = OpenOptions::new().append(true).open(&seg).unwrap();
666 f.write_all(&9999u32.to_le_bytes()).unwrap();
667 f.write_all(b"short").unwrap();
668 }
669 let s2 = SegmentStore::open(dir.path()).unwrap();
670 assert_eq!(s2.get(&good).unwrap().unwrap(), b"good record");
671 let h2 = put_get_hash(&s2, b"after recovery");
672 assert!(s2.get(&h2).unwrap().is_some());
673 }
674
675 #[test]
676 fn tamper_detected_on_read() {
677 let dir = tempdir().unwrap();
678 let h = {
679 let s = SegmentStore::open(dir.path()).unwrap();
680 let h = put_get_hash(&s, b"authentic");
681 s.sync().unwrap();
682 h
683 };
684 let seg = dir.path().join("segments").join("seg-000000.dat");
685 let mut bytes = fs::read(&seg).unwrap();
686 let n = bytes.len();
687 bytes[n - 1] ^= 0xff;
688 fs::write(&seg, bytes).unwrap();
689 let s2 = SegmentStore::open(dir.path()).unwrap();
690 match s2.get(&h) {
691 Ok(None) => {}
692 Err(_) => {}
693 Ok(Some(_)) => panic!("tampered content must not verify under original hash"),
694 }
695 }
696
697 #[test]
698 fn compaction_keeps_live_drops_dead() {
699 let dir = tempdir().unwrap();
700 let s = SegmentStore::open(dir.path()).unwrap();
701 let keep = put_get_hash(&s, b"keep me");
702 let _drop1 = put_get_hash(&s, b"drop me 1");
703 let _drop2 = put_get_hash(&s, b"drop me 2");
704 s.sync().unwrap();
705 assert_eq!(s.all_hashes().len(), 3);
706
707 let mut live = HashSet::new();
708 live.insert(keep.clone());
709 let stats = s.compact(&live).unwrap();
710 assert_eq!(stats.live_objects, 1);
711 assert_eq!(stats.dropped_objects, 2);
712
713 assert_eq!(s.get(&keep).unwrap().unwrap(), b"keep me");
715 assert_eq!(s.all_hashes().len(), 1);
716
717 let s2 = SegmentStore::open(dir.path()).unwrap();
719 assert_eq!(s2.get(&keep).unwrap().unwrap(), b"keep me");
720 assert!(s2.get(&_drop1).unwrap().is_none());
721
722 let after = put_get_hash(&s, b"post-compaction");
724 assert!(s.get(&after).unwrap().is_some());
725 }
726
727 #[test]
728 fn compaction_reclaims_and_writes_still_read() {
729 let dir = tempdir().unwrap();
730 let s = SegmentStore::open_with_max(dir.path(), 64).unwrap();
731 let mut all = Vec::new();
732 for i in 0..20u32 {
733 all.push(put_get_hash(&s, format!("obj-{:03}", i).as_bytes()));
734 }
735 s.sync().unwrap();
736 let mut live = HashSet::new();
738 for (i, h) in all.iter().enumerate() {
739 if i % 2 == 0 {
740 live.insert(h.clone());
741 }
742 }
743 let stats = s.compact(&live).unwrap();
744 assert_eq!(stats.live_objects, 10);
745 assert_eq!(stats.dropped_objects, 10);
746 for (i, h) in all.iter().enumerate() {
747 let got = s.get(h).unwrap();
748 if i % 2 == 0 {
749 assert!(got.is_some(), "live object {} must survive", i);
750 } else {
751 assert!(got.is_none(), "dead object {} must be pruned", i);
752 }
753 }
754 }
755}