Skip to main content

nedb_engine/
segment.rs

1//! segment.rs — NEDB v3 packed object substrate.
2//!
3//! v3 keeps the v2 logical model intact (content-addressed, immutable,
4//! BLAKE2b-verified DAG nodes) and changes only *where the bytes live*: instead
5//! of one filesystem object per node — which caps throughput at the OS
6//! small-file metadata rate — many immutable objects are appended into
7//! **segment files** addressed through an in-memory `hash -> (segment, offset,
8//! len)` index. The hash is still `BLAKE2b(content)` and is re-verified on every
9//! read, so content-addressing and tamper-evidence are unchanged.
10//!
11//! This module knows nothing about `Node`, JSON, or encryption: callers
12//! (`ObjectStore`) pass already-serialized/encrypted `content` bytes and the
13//! precomputed hash. That keeps the segment store a pure content<->location
14//! layer and leaves all crypto/serialization in `store.rs`.
15//!
16//! Phases:
17//!   1. Segment append + in-memory index + startup scan + tail-truncation.
18//!   2. Compaction/pruning — rewrite the live object set into fresh segments,
19//!      reclaiming dead (superseded/spent) records. `compact(live)`.
20//!   3. On-disk `.idx` sidecars — a sealed segment gets a checksummed
21//!      `hash -> (offset,len)` index file so cold start loads it instead of
22//!      rescanning every record. Missing/corrupt `.idx` falls back to a scan.
23//!
24//! Opt-in only: `ObjectStore` instantiates this when `NEDB_DAG_V3` is set
25//! (surfaced as the `--dag-v3` flag). Default storage is byte-for-byte v2.
26
27use std::collections::HashSet;
28use std::fs::{self, File, OpenOptions};
29use std::io::{Read, Seek, SeekFrom, Write};
30use std::path::{Path, PathBuf};
31use std::sync::Mutex;
32
33use anyhow::{bail, Context, Result};
34use blake2::{Blake2b512, Digest};
35use dashmap::DashMap;
36
37/// Default segment rollover size (256 MiB).
38const DEFAULT_MAX_SEGMENT_BYTES: u64 = 256 * 1024 * 1024;
39
40/// Magic prefix for `.idx` sidecar files (NEDB v3 index, format 1).
41const IDX_MAGIC: &[u8; 4] = b"NIX1";
42/// `.idx` on-disk entry: 32-byte raw digest + u64 offset + u32 len.
43const IDX_ENTRY_BYTES: usize = 32 + 8 + 4;
44/// `.idx` header: magic(4) + count(8). Trailer: blake2b-256 checksum (32).
45const IDX_HEADER_BYTES: usize = 4 + 8;
46const IDX_CHECKSUM_BYTES: usize = 32;
47
48/// Location of one content record inside the segment set.
49#[derive(Clone, Copy, Debug)]
50struct SegmentLocation {
51    segment_id: u32,
52    /// Byte offset of the CONTENT (immediately after the u32 length prefix).
53    offset: u64,
54    len: u32,
55}
56
57/// Result of a `compact()` pass.
58#[derive(Clone, Copy, Debug, Default)]
59pub struct CompactStats {
60    /// Live objects copied forward into fresh segments.
61    pub live_objects: usize,
62    /// Dead objects dropped (superseded versions / pruned history).
63    pub dropped_objects: usize,
64    /// Bytes reclaimed by deleting the old segment files.
65    pub bytes_reclaimed: u64,
66    /// Number of segment files after compaction.
67    pub segments_after: usize,
68}
69
70/// BLAKE2b-256 (first 32 bytes of Blake2b-512) raw digest.
71fn blake2b_raw(data: &[u8]) -> [u8; 32] {
72    let mut h = Blake2b512::new();
73    h.update(data);
74    let out = h.finalize();
75    let mut a = [0u8; 32];
76    a.copy_from_slice(&out[..32]);
77    a
78}
79
80/// Hex-encoded BLAKE2b-256. MUST match `store::blake2b` so segment hashes equal
81/// loose-object hashes.
82fn blake2b(data: &[u8]) -> String {
83    hex::encode(blake2b_raw(data))
84}
85
86/// The currently-appended-to segment.
87struct Active {
88    id: u32,
89    file: File,
90    /// End-of-file = next append position (kept in sync with the file cursor).
91    offset: u64,
92}
93
94/// Append-only, content-addressed packed object store with an in-memory index.
95pub struct SegmentStore {
96    dir: PathBuf,
97    index: DashMap<String, SegmentLocation>,
98    active: Mutex<Active>,
99    max_segment_bytes: u64,
100    /// macOS fast-fsync opt-in (from `NEDB_FAST_FSYNC`). When true, durability
101    /// points use a plain `fsync(2)` instead of std's `sync_all` (= F_FULLFSYNC
102    /// on macOS). Off by default → identical, full-durability behavior.
103    fast_fsync: bool,
104}
105
106/// Durability point for a segment file.
107///
108/// Default (`fast = false`) is `File::sync_all()` everywhere — the safe,
109/// power-loss-durable choice. On macOS that maps to `F_FULLFSYNC` (a full
110/// hardware-cache flush to platter), which is often 10-100x slower than the
111/// plain `fsync(2)` Linux/Windows already use, especially on Fusion/SATA disks.
112///
113/// With `fast = true` (set via `NEDB_FAST_FSYNC`), macOS uses plain `fsync(2)`:
114/// the write reaches the drive (crash-safe) but the drive may still hold it in a
115/// volatile cache, so a sudden power cut could lose the last unsynced batch. NEDB
116/// v3 tolerates that — content-addressed objects, torn-tail truncation on open,
117/// and a reconstructible chainstate (re-sync from peers) — which is exactly why
118/// Bitcoin Core itself uses plain fsync for LevelDB on macOS. On non-macOS the
119/// flag is a no-op (`sync_all` is already a plain fsync / FlushFileBuffers).
120fn durable_sync(file: &File, fast: bool) -> std::io::Result<()> {
121    #[cfg(target_os = "macos")]
122    if fast {
123        use std::os::unix::io::AsRawFd;
124        // SAFETY: `file` is a valid, open descriptor for the duration of the call.
125        let rc = unsafe { libc::fsync(file.as_raw_fd()) };
126        return if rc == 0 { Ok(()) } else { Err(std::io::Error::last_os_error()) };
127    }
128    #[cfg(not(target_os = "macos"))]
129    let _ = fast;
130    file.sync_all()
131}
132
133impl SegmentStore {
134    fn seg_path(dir: &Path, id: u32) -> PathBuf {
135        dir.join(format!("seg-{:06}.dat", id))
136    }
137    fn idx_path(dir: &Path, id: u32) -> PathBuf {
138        dir.join(format!("seg-{:06}.idx", id))
139    }
140
141    /// Open (or create) the segment store under `{objects_root}/segments`.
142    pub fn open(objects_root: &Path) -> Result<Self> {
143        Self::open_with_max(objects_root, DEFAULT_MAX_SEGMENT_BYTES)
144    }
145
146    /// Like `open`, with an explicit rollover size (used by tests).
147    pub fn open_with_max(objects_root: &Path, max_segment_bytes: u64) -> Result<Self> {
148        let dir = objects_root.join("segments");
149        fs::create_dir_all(&dir).context("create objects/segments dir")?;
150
151        // Discover existing segment ids.
152        let mut ids: Vec<u32> = Vec::new();
153        for entry in fs::read_dir(&dir).context("read segments dir")? {
154            let entry = entry?;
155            let name = entry.file_name().to_string_lossy().to_string();
156            if let Some(rest) = name.strip_prefix("seg-") {
157                if let Some(num) = rest.strip_suffix(".dat") {
158                    if let Ok(id) = num.parse::<u32>() {
159                        ids.push(id);
160                    }
161                }
162            }
163        }
164        ids.sort_unstable();
165
166        let index: DashMap<String, SegmentLocation> = DashMap::new();
167        let mut active_id: u32 = 0;
168        let mut active_end: u64 = 0;
169
170        for (pos, &id) in ids.iter().enumerate() {
171            let is_last = pos + 1 == ids.len();
172            if is_last {
173                // Active (last) segment: always scan (no .idx — it's still
174                // mutable) and truncate any torn tail from a crash.
175                let (valid_end, entries) = Self::scan_segment(&dir, id)?;
176                for (h, o, l) in entries {
177                    index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
178                }
179                let path = Self::seg_path(&dir, id);
180                let file_len = fs::metadata(&path)?.len();
181                if valid_end < file_len {
182                    let f = OpenOptions::new().write(true).open(&path)?;
183                    f.set_len(valid_end)?;
184                }
185                active_id = id;
186                active_end = valid_end;
187            } else {
188                // Sealed segment: load its checksummed .idx if present+valid;
189                // otherwise scan it and heal by writing a fresh .idx.
190                match Self::load_idx(&dir, id) {
191                    Ok(Some(entries)) => {
192                        for (h, o, l) in entries {
193                            index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
194                        }
195                    }
196                    _ => {
197                        let (_ve, entries) = Self::scan_segment(&dir, id)?;
198                        for (h, o, l) in &entries {
199                            index.insert(h.clone(), SegmentLocation { segment_id: id, offset: *o, len: *l });
200                        }
201                        let _ = Self::write_idx(&dir, id, &entries); // best-effort heal
202                    }
203                }
204            }
205        }
206
207        // Open (creating if necessary) the active segment for appending.
208        let active_path = Self::seg_path(&dir, active_id);
209        let mut file = OpenOptions::new()
210            .create(true)
211            .read(true)
212            .write(true)
213            .open(&active_path)
214            .with_context(|| format!("open active segment {:?}", active_path))?;
215        file.seek(SeekFrom::Start(active_end))?;
216
217        let fast_fsync = std::env::var("NEDB_FAST_FSYNC")
218            .map(|v| {
219                let v = v.trim();
220                v == "1" || v.eq_ignore_ascii_case("true")
221                         || v.eq_ignore_ascii_case("on")
222                         || v.eq_ignore_ascii_case("yes")
223            })
224            .unwrap_or(false);
225
226        Ok(Self {
227            dir,
228            index,
229            active: Mutex::new(Active { id: active_id, file, offset: active_end }),
230            max_segment_bytes,
231            fast_fsync,
232        })
233    }
234
235    /// Scan one segment, returning (valid_end_offset, records). Records past a
236    /// torn tail are not returned; `valid_end` marks where to truncate.
237    fn scan_segment(dir: &Path, id: u32) -> Result<(u64, Vec<(String, u64, u32)>)> {
238        let path = Self::seg_path(dir, id);
239        let mut f = match File::open(&path) {
240            Ok(f) => f,
241            Err(_) => return Ok((0, Vec::new())),
242        };
243        let file_len = f.metadata()?.len();
244        let mut pos: u64 = 0;
245        let mut entries: Vec<(String, u64, u32)> = Vec::new();
246        loop {
247            if pos + 4 > file_len {
248                break; // no room for a length prefix → torn tail
249            }
250            f.seek(SeekFrom::Start(pos))?;
251            let mut len_buf = [0u8; 4];
252            if f.read_exact(&mut len_buf).is_err() {
253                break;
254            }
255            let len = u32::from_le_bytes(len_buf);
256            let content_off = pos + 4;
257            if content_off + (len as u64) > file_len {
258                break; // declared length overruns EOF → torn content
259            }
260            let mut content = vec![0u8; len as usize];
261            if f.read_exact(&mut content).is_err() {
262                break;
263            }
264            entries.push((blake2b(&content), content_off, len));
265            pos = content_off + len as u64;
266        }
267        Ok((pos, entries))
268    }
269
270    /// Read+verify the raw content at a location. Errors on tamper.
271    fn read_content(dir: &Path, loc: &SegmentLocation, expect_hash: &str) -> Result<Vec<u8>> {
272        let path = Self::seg_path(dir, loc.segment_id);
273        let mut f = File::open(&path).with_context(|| format!("open segment {:?}", path))?;
274        f.seek(SeekFrom::Start(loc.offset))?;
275        let mut content = vec![0u8; loc.len as usize];
276        f.read_exact(&mut content)
277            .with_context(|| format!("read record from segment {}", loc.segment_id))?;
278        let actual = blake2b(&content);
279        if actual != expect_hash {
280            bail!("segment object {} tampered: recomputed {}", expect_hash, actual);
281        }
282        Ok(content)
283    }
284
285    // ── Phase 3: .idx sidecars ────────────────────────────────────────────────
286
287    /// Write a checksummed `.idx` for a SEALED segment (atomic via tmp→rename).
288    /// Best-effort: a failure just means the next open scans the segment.
289    fn write_idx(dir: &Path, id: u32, entries: &[(String, u64, u32)]) -> Result<()> {
290        let mut body: Vec<u8> = Vec::with_capacity(IDX_HEADER_BYTES + entries.len() * IDX_ENTRY_BYTES);
291        body.extend_from_slice(IDX_MAGIC);
292        body.extend_from_slice(&(entries.len() as u64).to_le_bytes());
293        for (hash, off, len) in entries {
294            let raw = hex::decode(hash).map_err(|_| anyhow::anyhow!("bad hash hex in idx write"))?;
295            if raw.len() != 32 {
296                bail!("idx write: hash not 32 bytes");
297            }
298            body.extend_from_slice(&raw);
299            body.extend_from_slice(&off.to_le_bytes());
300            body.extend_from_slice(&len.to_le_bytes());
301        }
302        let checksum = blake2b_raw(&body);
303        body.extend_from_slice(&checksum);
304
305        let path = Self::idx_path(dir, id);
306        let tmp = path.with_extension("idx.tmp");
307        fs::write(&tmp, &body)?;
308        fs::rename(&tmp, &path)?;
309        Ok(())
310    }
311
312    /// Load a `.idx` if present and checksum-valid. Returns Ok(None) if absent
313    /// or in any way unusable (caller then scans the segment).
314    fn load_idx(dir: &Path, id: u32) -> Result<Option<Vec<(String, u64, u32)>>> {
315        let path = Self::idx_path(dir, id);
316        let data = match fs::read(&path) {
317            Ok(d) => d,
318            Err(_) => return Ok(None),
319        };
320        if data.len() < IDX_HEADER_BYTES + IDX_CHECKSUM_BYTES {
321            return Ok(None);
322        }
323        if &data[0..4] != IDX_MAGIC {
324            return Ok(None);
325        }
326        let count = u64::from_le_bytes(data[4..12].try_into().unwrap()) as usize;
327        let expected = IDX_HEADER_BYTES + count * IDX_ENTRY_BYTES + IDX_CHECKSUM_BYTES;
328        if data.len() != expected {
329            return Ok(None);
330        }
331        let body = &data[..data.len() - IDX_CHECKSUM_BYTES];
332        let stored: [u8; 32] = match data[data.len() - IDX_CHECKSUM_BYTES..].try_into() {
333            Ok(a) => a,
334            Err(_) => return Ok(None),
335        };
336        if blake2b_raw(body) != stored {
337            return Ok(None); // corrupt/stale → fall back to scan
338        }
339        let mut entries = Vec::with_capacity(count);
340        let mut p = IDX_HEADER_BYTES;
341        for _ in 0..count {
342            let hash = hex::encode(&data[p..p + 32]);
343            let off = u64::from_le_bytes(data[p + 32..p + 40].try_into().unwrap());
344            let len = u32::from_le_bytes(data[p + 40..p + 44].try_into().unwrap());
345            entries.push((hash, off, len));
346            p += IDX_ENTRY_BYTES;
347        }
348        Ok(Some(entries))
349    }
350
351    /// Collect the index entries belonging to one segment (for sealing → .idx).
352    fn entries_for_segment(&self, id: u32) -> Vec<(String, u64, u32)> {
353        self.index
354            .iter()
355            .filter(|e| e.value().segment_id == id)
356            .map(|e| (e.key().clone(), e.value().offset, e.value().len))
357            .collect()
358    }
359
360    // ── core API ──────────────────────────────────────────────────────────────
361
362    /// True if this hash is already stored in a segment.
363    pub fn contains(&self, hash: &str) -> bool {
364        self.index.contains_key(hash)
365    }
366
367    /// Append `content` under `hash` (idempotent). `hash` must equal
368    /// `BLAKE2b(content)`; the caller computes it (parallel, outside the lock).
369    pub fn put(&self, hash: &str, content: &[u8]) -> Result<()> {
370        if self.index.contains_key(hash) {
371            return Ok(());
372        }
373        let len = content.len() as u32;
374        let record_size = 4u64 + content.len() as u64;
375
376        let mut active = self.active.lock().unwrap();
377        if self.index.contains_key(hash) {
378            return Ok(());
379        }
380
381        // Roll over if this record would push the active segment past the cap.
382        if active.offset > 0 && active.offset + record_size > self.max_segment_bytes {
383            let _ = active.file.flush();
384            let _ = durable_sync(&active.file, self.fast_fsync);
385            // Seal: write the .idx for the segment we're leaving behind.
386            let sealed_id = active.id;
387            let entries = self.entries_for_segment(sealed_id);
388            let _ = Self::write_idx(&self.dir, sealed_id, &entries);
389            let next_id = sealed_id + 1;
390            let path = Self::seg_path(&self.dir, next_id);
391            let file = OpenOptions::new()
392                .create(true)
393                .read(true)
394                .write(true)
395                .open(&path)
396                .with_context(|| format!("open new segment {:?}", path))?;
397            *active = Active { id: next_id, file, offset: 0 };
398        }
399
400        let content_off = active.offset + 4;
401        let mut rec = Vec::with_capacity(4 + content.len());
402        rec.extend_from_slice(&len.to_le_bytes());
403        rec.extend_from_slice(content);
404        active.file.write_all(&rec)?;
405
406        let seg_id = active.id;
407        active.offset += record_size;
408        self.index.insert(
409            hash.to_string(),
410            SegmentLocation { segment_id: seg_id, offset: content_off, len },
411        );
412        Ok(())
413    }
414
415    /// Read the raw content bytes for `hash`, or `None` if not stored in any
416    /// segment (caller then falls back to the loose-object path). Re-verifies.
417    pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
418        let loc = match self.index.get(hash) {
419            Some(entry) => *entry.value(),
420            None => return Ok(None),
421        };
422        Ok(Some(Self::read_content(&self.dir, &loc, hash)?))
423    }
424
425    /// All hashes currently stored in segments.
426    pub fn all_hashes(&self) -> Vec<String> {
427        self.index.iter().map(|e| e.key().clone()).collect()
428    }
429
430    /// Flush + fsync the active segment. One durability point per batch.
431    pub fn sync(&self) -> Result<()> {
432        let mut active = self.active.lock().unwrap();
433        let _ = active.file.flush();
434        durable_sync(&active.file, self.fast_fsync).context("fsync active segment")?;
435        Ok(())
436    }
437
438    // ── Phase 2: compaction / pruning ─────────────────────────────────────────
439
440    /// Rewrite the **live** object set into fresh segments and drop everything
441    /// else, reclaiming dead (superseded/spent/pruned) records.
442    ///
443    /// `live` is the set of hashes to KEEP — typically the current version of
444    /// every document (from the id-index). Hashes not in `live` are pruned, so
445    /// historical versions / AS OF / TRACE for dropped objects are discarded by
446    /// design (that is what reclaims space).
447    ///
448    /// Crash-safe: new segments are written + fsynced BEFORE any old segment is
449    /// deleted, so live data is never lost. A crash mid-compaction leaves both
450    /// the old and new segments (a re-open re-indexes the union — dead objects
451    /// merely linger until the next compaction); it never loses a live object.
452    ///
453    /// Must be called when the store is quiescent (no concurrent reads): writes
454    /// are blocked for the duration via the active lock, and the in-memory index
455    /// is swapped in place.
456    pub fn compact(&self, live: &HashSet<String>) -> Result<CompactStats> {
457        let mut active = self.active.lock().unwrap();
458
459        let total_before = self.index.len();
460        let old_max = active.id;
461        let new_base = old_max + 1;
462
463        // Snapshot the live entries to copy forward.
464        let to_copy: Vec<(String, SegmentLocation)> = self
465            .index
466            .iter()
467            .filter(|e| live.contains(e.key()))
468            .map(|e| (e.key().clone(), *e.value()))
469            .collect();
470
471        // Write live objects into fresh segments starting at new_base.
472        let new_index: DashMap<String, SegmentLocation> = DashMap::new();
473        let mut cur_id = new_base;
474        let mut cur_path = Self::seg_path(&self.dir, cur_id);
475        let mut cur_file = OpenOptions::new()
476            .create(true)
477            .truncate(true)
478            .read(true)
479            .write(true)
480            .open(&cur_path)
481            .with_context(|| format!("open compaction segment {:?}", cur_path))?;
482        let mut cur_off: u64 = 0;
483
484        for (hash, loc) in &to_copy {
485            let content = Self::read_content(&self.dir, loc, hash)?;
486            let len = content.len() as u32;
487            let record_size = 4u64 + content.len() as u64;
488
489            if cur_off > 0 && cur_off + record_size > self.max_segment_bytes {
490                let _ = cur_file.flush();
491                durable_sync(&cur_file, self.fast_fsync).context("fsync sealed compaction segment")?;
492                let entries: Vec<(String, u64, u32)> = new_index
493                    .iter()
494                    .filter(|e| e.value().segment_id == cur_id)
495                    .map(|e| (e.key().clone(), e.value().offset, e.value().len))
496                    .collect();
497                let _ = Self::write_idx(&self.dir, cur_id, &entries);
498                cur_id += 1;
499                cur_path = Self::seg_path(&self.dir, cur_id);
500                cur_file = OpenOptions::new()
501                    .create(true)
502                    .truncate(true)
503                    .read(true)
504                    .write(true)
505                    .open(&cur_path)
506                    .with_context(|| format!("open compaction segment {:?}", cur_path))?;
507                cur_off = 0;
508            }
509
510            let content_off = cur_off + 4;
511            let mut rec = Vec::with_capacity(4 + content.len());
512            rec.extend_from_slice(&len.to_le_bytes());
513            rec.extend_from_slice(&content);
514            cur_file.write_all(&rec)?;
515            new_index.insert(hash.clone(), SegmentLocation { segment_id: cur_id, offset: content_off, len });
516            cur_off += record_size;
517        }
518        let _ = cur_file.flush();
519        durable_sync(&cur_file, self.fast_fsync).context("fsync active compaction segment")?;
520
521        // The last new segment becomes the active one (reuse its handle).
522        let live_objects = to_copy.len();
523
524        // Swap the in-memory index to the rebuilt one.
525        self.index.clear();
526        for e in new_index.iter() {
527            self.index.insert(e.key().clone(), *e.value());
528        }
529        *active = Active { id: cur_id, file: cur_file, offset: cur_off };
530
531        // Delete every old segment (id < new_base) + its .idx, after the new
532        // ones are durable. Re-list so we only touch files that actually exist.
533        let mut bytes_reclaimed: u64 = 0;
534        if let Ok(rd) = fs::read_dir(&self.dir) {
535            for entry in rd.flatten() {
536                let name = entry.file_name().to_string_lossy().to_string();
537                let id_of = name
538                    .strip_prefix("seg-")
539                    .and_then(|r| r.strip_suffix(".dat").or_else(|| r.strip_suffix(".idx")))
540                    .and_then(|n| n.parse::<u32>().ok());
541                if let Some(id) = id_of {
542                    if id < new_base {
543                        if name.ends_with(".dat") {
544                            if let Ok(m) = entry.metadata() {
545                                bytes_reclaimed += m.len();
546                            }
547                        }
548                        let _ = fs::remove_file(entry.path());
549                    }
550                }
551            }
552        }
553
554        let segments_after = (cur_id - new_base + 1) as usize;
555        Ok(CompactStats {
556            live_objects,
557            dropped_objects: total_before.saturating_sub(live_objects),
558            bytes_reclaimed,
559            segments_after,
560        })
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use tempfile::tempdir;
568
569    fn put_get_hash(s: &SegmentStore, content: &[u8]) -> String {
570        let h = blake2b(content);
571        s.put(&h, content).unwrap();
572        h
573    }
574
575    #[test]
576    fn put_get_roundtrip() {
577        let dir = tempdir().unwrap();
578        let s = SegmentStore::open(dir.path()).unwrap();
579        let h = put_get_hash(&s, b"hello nedb v3");
580        assert_eq!(s.get(&h).unwrap().unwrap(), b"hello nedb v3");
581        assert!(s.contains(&h));
582        assert!(s.get(&"0".repeat(64)).unwrap().is_none());
583    }
584
585    #[test]
586    fn idempotent_put() {
587        let dir = tempdir().unwrap();
588        let s = SegmentStore::open(dir.path()).unwrap();
589        let h1 = put_get_hash(&s, b"dup");
590        let h2 = put_get_hash(&s, b"dup");
591        assert_eq!(h1, h2);
592        assert_eq!(s.all_hashes().len(), 1);
593    }
594
595    #[test]
596    fn index_rebuilt_on_reopen() {
597        let dir = tempdir().unwrap();
598        let h = {
599            let s = SegmentStore::open(dir.path()).unwrap();
600            let h = put_get_hash(&s, b"persisted");
601            s.sync().unwrap();
602            h
603        };
604        let s2 = SegmentStore::open(dir.path()).unwrap();
605        assert_eq!(s2.get(&h).unwrap().unwrap(), b"persisted");
606    }
607
608    #[test]
609    fn rollover_writes_idx_and_reopen_uses_it() {
610        let dir = tempdir().unwrap();
611        let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
612        let mut hashes = Vec::new();
613        for i in 0..8u32 {
614            hashes.push(put_get_hash(&s, format!("record-{}", i).as_bytes()));
615        }
616        s.sync().unwrap();
617        // Rollover should have produced sealed .idx sidecars.
618        let idx_files = fs::read_dir(dir.path().join("segments"))
619            .unwrap()
620            .flatten()
621            .filter(|e| e.file_name().to_string_lossy().ends_with(".idx"))
622            .count();
623        assert!(idx_files >= 1, "expected at least one sealed .idx");
624        // Reopen (loads sealed segments via .idx) — every object still reads.
625        let s2 = SegmentStore::open(dir.path()).unwrap();
626        for h in &hashes {
627            assert!(s2.get(h).unwrap().is_some());
628        }
629    }
630
631    #[test]
632    fn corrupt_idx_falls_back_to_scan() {
633        let dir = tempdir().unwrap();
634        let mut hashes = Vec::new();
635        {
636            let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
637            for i in 0..6u32 {
638                hashes.push(put_get_hash(&s, format!("rec-{}", i).as_bytes()));
639            }
640            s.sync().unwrap();
641        }
642        // Corrupt every .idx (truncate to garbage). Reopen must still work via scan.
643        for e in fs::read_dir(dir.path().join("segments")).unwrap().flatten() {
644            if e.file_name().to_string_lossy().ends_with(".idx") {
645                fs::write(e.path(), b"garbage").unwrap();
646            }
647        }
648        let s2 = SegmentStore::open(dir.path()).unwrap();
649        for h in &hashes {
650            assert!(s2.get(h).unwrap().is_some(), "scan fallback must recover the object");
651        }
652    }
653
654    #[test]
655    fn torn_tail_is_truncated_on_open() {
656        let dir = tempdir().unwrap();
657        let good = {
658            let s = SegmentStore::open(dir.path()).unwrap();
659            let h = put_get_hash(&s, b"good record");
660            s.sync().unwrap();
661            h
662        };
663        let seg = dir.path().join("segments").join("seg-000000.dat");
664        {
665            let mut f = OpenOptions::new().append(true).open(&seg).unwrap();
666            f.write_all(&9999u32.to_le_bytes()).unwrap();
667            f.write_all(b"short").unwrap();
668        }
669        let s2 = SegmentStore::open(dir.path()).unwrap();
670        assert_eq!(s2.get(&good).unwrap().unwrap(), b"good record");
671        let h2 = put_get_hash(&s2, b"after recovery");
672        assert!(s2.get(&h2).unwrap().is_some());
673    }
674
675    #[test]
676    fn tamper_detected_on_read() {
677        let dir = tempdir().unwrap();
678        let h = {
679            let s = SegmentStore::open(dir.path()).unwrap();
680            let h = put_get_hash(&s, b"authentic");
681            s.sync().unwrap();
682            h
683        };
684        let seg = dir.path().join("segments").join("seg-000000.dat");
685        let mut bytes = fs::read(&seg).unwrap();
686        let n = bytes.len();
687        bytes[n - 1] ^= 0xff;
688        fs::write(&seg, bytes).unwrap();
689        let s2 = SegmentStore::open(dir.path()).unwrap();
690        match s2.get(&h) {
691            Ok(None) => {}
692            Err(_) => {}
693            Ok(Some(_)) => panic!("tampered content must not verify under original hash"),
694        }
695    }
696
697    #[test]
698    fn compaction_keeps_live_drops_dead() {
699        let dir = tempdir().unwrap();
700        let s = SegmentStore::open(dir.path()).unwrap();
701        let keep = put_get_hash(&s, b"keep me");
702        let _drop1 = put_get_hash(&s, b"drop me 1");
703        let _drop2 = put_get_hash(&s, b"drop me 2");
704        s.sync().unwrap();
705        assert_eq!(s.all_hashes().len(), 3);
706
707        let mut live = HashSet::new();
708        live.insert(keep.clone());
709        let stats = s.compact(&live).unwrap();
710        assert_eq!(stats.live_objects, 1);
711        assert_eq!(stats.dropped_objects, 2);
712
713        // Live object survives; dead ones are gone.
714        assert_eq!(s.get(&keep).unwrap().unwrap(), b"keep me");
715        assert_eq!(s.all_hashes().len(), 1);
716
717        // And it survives a reopen (new segments + index swap persisted).
718        let s2 = SegmentStore::open(dir.path()).unwrap();
719        assert_eq!(s2.get(&keep).unwrap().unwrap(), b"keep me");
720        assert!(s2.get(&_drop1).unwrap().is_none());
721
722        // Writes still work after compaction.
723        let after = put_get_hash(&s, b"post-compaction");
724        assert!(s.get(&after).unwrap().is_some());
725    }
726
727    #[test]
728    fn compaction_reclaims_and_writes_still_read() {
729        let dir = tempdir().unwrap();
730        let s = SegmentStore::open_with_max(dir.path(), 64).unwrap();
731        let mut all = Vec::new();
732        for i in 0..20u32 {
733            all.push(put_get_hash(&s, format!("obj-{:03}", i).as_bytes()));
734        }
735        s.sync().unwrap();
736        // Keep only the even-indexed ones.
737        let mut live = HashSet::new();
738        for (i, h) in all.iter().enumerate() {
739            if i % 2 == 0 {
740                live.insert(h.clone());
741            }
742        }
743        let stats = s.compact(&live).unwrap();
744        assert_eq!(stats.live_objects, 10);
745        assert_eq!(stats.dropped_objects, 10);
746        for (i, h) in all.iter().enumerate() {
747            let got = s.get(h).unwrap();
748            if i % 2 == 0 {
749                assert!(got.is_some(), "live object {} must survive", i);
750            } else {
751                assert!(got.is_none(), "dead object {} must be pruned", i);
752            }
753        }
754    }
755}