Skip to main content

nedb_engine/
segment.rs

1//! segment.rs — NEDB v3 packed object substrate.
2//!
3//! v3 keeps the v2 logical model intact (content-addressed, immutable,
4//! BLAKE2b-verified DAG nodes) and changes only *where the bytes live*: instead
5//! of one filesystem object per node — which caps throughput at the OS
6//! small-file metadata rate — many immutable objects are appended into
7//! **segment files** addressed through an in-memory `hash -> (segment, offset,
8//! len)` index. The hash is still `BLAKE2b(content)` and is re-verified on every
9//! read, so content-addressing and tamper-evidence are unchanged.
10//!
11//! This module knows nothing about `Node`, JSON, or encryption: callers
12//! (`ObjectStore`) pass already-serialized/encrypted `content` bytes and the
13//! precomputed hash. That keeps the segment store a pure content<->location
14//! layer and leaves all crypto/serialization in `store.rs`.
15//!
16//! Phases:
17//!   1. Segment append + in-memory index + startup scan + tail-truncation.
18//!   2. Compaction/pruning — rewrite the live object set into fresh segments,
19//!      reclaiming dead (superseded/spent) records. `compact(live)`.
20//!   3. On-disk `.idx` sidecars — a sealed segment gets a checksummed
21//!      `hash -> (offset,len)` index file so cold start loads it instead of
22//!      rescanning every record. Missing/corrupt `.idx` falls back to a scan.
23//!
24//! Opt-in only: `ObjectStore` instantiates this when `NEDB_DAG_V3` is set
25//! (surfaced as the `--dag-v3` flag). Default storage is byte-for-byte v2.
26
27use std::collections::HashSet;
28use std::fs::{self, File, OpenOptions};
29use std::io::{Read, Seek, SeekFrom, Write};
30use std::path::{Path, PathBuf};
31use std::sync::Mutex;
32
33use anyhow::{bail, Context, Result};
34use blake2::{Blake2b512, Digest};
35use dashmap::DashMap;
36
37/// Default segment rollover size (256 MiB).
38const DEFAULT_MAX_SEGMENT_BYTES: u64 = 256 * 1024 * 1024;
39
40/// Magic prefix for `.idx` sidecar files (NEDB v3 index, format 1).
41const IDX_MAGIC: &[u8; 4] = b"NIX1";
42/// `.idx` on-disk entry: 32-byte raw digest + u64 offset + u32 len.
43const IDX_ENTRY_BYTES: usize = 32 + 8 + 4;
44/// `.idx` header: magic(4) + count(8). Trailer: blake2b-256 checksum (32).
45const IDX_HEADER_BYTES: usize = 4 + 8;
46const IDX_CHECKSUM_BYTES: usize = 32;
47
48/// Location of one content record inside the segment set.
49#[derive(Clone, Copy, Debug)]
50struct SegmentLocation {
51    segment_id: u32,
52    /// Byte offset of the CONTENT (immediately after the u32 length prefix).
53    offset: u64,
54    len: u32,
55}
56
57/// Result of a `compact()` pass.
58#[derive(Clone, Copy, Debug, Default)]
59pub struct CompactStats {
60    /// Live objects copied forward into fresh segments.
61    pub live_objects: usize,
62    /// Dead objects dropped (superseded versions / pruned history).
63    pub dropped_objects: usize,
64    /// Bytes reclaimed by deleting the old segment files.
65    pub bytes_reclaimed: u64,
66    /// Number of segment files after compaction.
67    pub segments_after: usize,
68}
69
70/// BLAKE2b-256 (first 32 bytes of Blake2b-512) raw digest.
71fn blake2b_raw(data: &[u8]) -> [u8; 32] {
72    let mut h = Blake2b512::new();
73    h.update(data);
74    let out = h.finalize();
75    let mut a = [0u8; 32];
76    a.copy_from_slice(&out[..32]);
77    a
78}
79
80/// Hex-encoded BLAKE2b-256. MUST match `store::blake2b` so segment hashes equal
81/// loose-object hashes.
82fn blake2b(data: &[u8]) -> String {
83    hex::encode(blake2b_raw(data))
84}
85
86/// The currently-appended-to segment.
87struct Active {
88    id: u32,
89    file: File,
90    /// End-of-file = next append position (kept in sync with the file cursor).
91    offset: u64,
92}
93
94/// Append-only, content-addressed packed object store with an in-memory index.
95pub struct SegmentStore {
96    dir: PathBuf,
97    index: DashMap<String, SegmentLocation>,
98    active: Mutex<Active>,
99    max_segment_bytes: u64,
100}
101
102impl SegmentStore {
103    fn seg_path(dir: &Path, id: u32) -> PathBuf {
104        dir.join(format!("seg-{:06}.dat", id))
105    }
106    fn idx_path(dir: &Path, id: u32) -> PathBuf {
107        dir.join(format!("seg-{:06}.idx", id))
108    }
109
110    /// Open (or create) the segment store under `{objects_root}/segments`.
111    pub fn open(objects_root: &Path) -> Result<Self> {
112        Self::open_with_max(objects_root, DEFAULT_MAX_SEGMENT_BYTES)
113    }
114
115    /// Like `open`, with an explicit rollover size (used by tests).
116    pub fn open_with_max(objects_root: &Path, max_segment_bytes: u64) -> Result<Self> {
117        let dir = objects_root.join("segments");
118        fs::create_dir_all(&dir).context("create objects/segments dir")?;
119
120        // Discover existing segment ids.
121        let mut ids: Vec<u32> = Vec::new();
122        for entry in fs::read_dir(&dir).context("read segments dir")? {
123            let entry = entry?;
124            let name = entry.file_name().to_string_lossy().to_string();
125            if let Some(rest) = name.strip_prefix("seg-") {
126                if let Some(num) = rest.strip_suffix(".dat") {
127                    if let Ok(id) = num.parse::<u32>() {
128                        ids.push(id);
129                    }
130                }
131            }
132        }
133        ids.sort_unstable();
134
135        let index: DashMap<String, SegmentLocation> = DashMap::new();
136        let mut active_id: u32 = 0;
137        let mut active_end: u64 = 0;
138
139        for (pos, &id) in ids.iter().enumerate() {
140            let is_last = pos + 1 == ids.len();
141            if is_last {
142                // Active (last) segment: always scan (no .idx — it's still
143                // mutable) and truncate any torn tail from a crash.
144                let (valid_end, entries) = Self::scan_segment(&dir, id)?;
145                for (h, o, l) in entries {
146                    index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
147                }
148                let path = Self::seg_path(&dir, id);
149                let file_len = fs::metadata(&path)?.len();
150                if valid_end < file_len {
151                    let f = OpenOptions::new().write(true).open(&path)?;
152                    f.set_len(valid_end)?;
153                }
154                active_id = id;
155                active_end = valid_end;
156            } else {
157                // Sealed segment: load its checksummed .idx if present+valid;
158                // otherwise scan it and heal by writing a fresh .idx.
159                match Self::load_idx(&dir, id) {
160                    Ok(Some(entries)) => {
161                        for (h, o, l) in entries {
162                            index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
163                        }
164                    }
165                    _ => {
166                        let (_ve, entries) = Self::scan_segment(&dir, id)?;
167                        for (h, o, l) in &entries {
168                            index.insert(h.clone(), SegmentLocation { segment_id: id, offset: *o, len: *l });
169                        }
170                        let _ = Self::write_idx(&dir, id, &entries); // best-effort heal
171                    }
172                }
173            }
174        }
175
176        // Open (creating if necessary) the active segment for appending.
177        let active_path = Self::seg_path(&dir, active_id);
178        let mut file = OpenOptions::new()
179            .create(true)
180            .read(true)
181            .write(true)
182            .open(&active_path)
183            .with_context(|| format!("open active segment {:?}", active_path))?;
184        file.seek(SeekFrom::Start(active_end))?;
185
186        Ok(Self {
187            dir,
188            index,
189            active: Mutex::new(Active { id: active_id, file, offset: active_end }),
190            max_segment_bytes,
191        })
192    }
193
194    /// Scan one segment, returning (valid_end_offset, records). Records past a
195    /// torn tail are not returned; `valid_end` marks where to truncate.
196    fn scan_segment(dir: &Path, id: u32) -> Result<(u64, Vec<(String, u64, u32)>)> {
197        let path = Self::seg_path(dir, id);
198        let mut f = match File::open(&path) {
199            Ok(f) => f,
200            Err(_) => return Ok((0, Vec::new())),
201        };
202        let file_len = f.metadata()?.len();
203        let mut pos: u64 = 0;
204        let mut entries: Vec<(String, u64, u32)> = Vec::new();
205        loop {
206            if pos + 4 > file_len {
207                break; // no room for a length prefix → torn tail
208            }
209            f.seek(SeekFrom::Start(pos))?;
210            let mut len_buf = [0u8; 4];
211            if f.read_exact(&mut len_buf).is_err() {
212                break;
213            }
214            let len = u32::from_le_bytes(len_buf);
215            let content_off = pos + 4;
216            if content_off + (len as u64) > file_len {
217                break; // declared length overruns EOF → torn content
218            }
219            let mut content = vec![0u8; len as usize];
220            if f.read_exact(&mut content).is_err() {
221                break;
222            }
223            entries.push((blake2b(&content), content_off, len));
224            pos = content_off + len as u64;
225        }
226        Ok((pos, entries))
227    }
228
229    /// Read+verify the raw content at a location. Errors on tamper.
230    fn read_content(dir: &Path, loc: &SegmentLocation, expect_hash: &str) -> Result<Vec<u8>> {
231        let path = Self::seg_path(dir, loc.segment_id);
232        let mut f = File::open(&path).with_context(|| format!("open segment {:?}", path))?;
233        f.seek(SeekFrom::Start(loc.offset))?;
234        let mut content = vec![0u8; loc.len as usize];
235        f.read_exact(&mut content)
236            .with_context(|| format!("read record from segment {}", loc.segment_id))?;
237        let actual = blake2b(&content);
238        if actual != expect_hash {
239            bail!("segment object {} tampered: recomputed {}", expect_hash, actual);
240        }
241        Ok(content)
242    }
243
244    // ── Phase 3: .idx sidecars ────────────────────────────────────────────────
245
246    /// Write a checksummed `.idx` for a SEALED segment (atomic via tmp→rename).
247    /// Best-effort: a failure just means the next open scans the segment.
248    fn write_idx(dir: &Path, id: u32, entries: &[(String, u64, u32)]) -> Result<()> {
249        let mut body: Vec<u8> = Vec::with_capacity(IDX_HEADER_BYTES + entries.len() * IDX_ENTRY_BYTES);
250        body.extend_from_slice(IDX_MAGIC);
251        body.extend_from_slice(&(entries.len() as u64).to_le_bytes());
252        for (hash, off, len) in entries {
253            let raw = hex::decode(hash).map_err(|_| anyhow::anyhow!("bad hash hex in idx write"))?;
254            if raw.len() != 32 {
255                bail!("idx write: hash not 32 bytes");
256            }
257            body.extend_from_slice(&raw);
258            body.extend_from_slice(&off.to_le_bytes());
259            body.extend_from_slice(&len.to_le_bytes());
260        }
261        let checksum = blake2b_raw(&body);
262        body.extend_from_slice(&checksum);
263
264        let path = Self::idx_path(dir, id);
265        let tmp = path.with_extension("idx.tmp");
266        fs::write(&tmp, &body)?;
267        fs::rename(&tmp, &path)?;
268        Ok(())
269    }
270
271    /// Load a `.idx` if present and checksum-valid. Returns Ok(None) if absent
272    /// or in any way unusable (caller then scans the segment).
273    fn load_idx(dir: &Path, id: u32) -> Result<Option<Vec<(String, u64, u32)>>> {
274        let path = Self::idx_path(dir, id);
275        let data = match fs::read(&path) {
276            Ok(d) => d,
277            Err(_) => return Ok(None),
278        };
279        if data.len() < IDX_HEADER_BYTES + IDX_CHECKSUM_BYTES {
280            return Ok(None);
281        }
282        if &data[0..4] != IDX_MAGIC {
283            return Ok(None);
284        }
285        let count = u64::from_le_bytes(data[4..12].try_into().unwrap()) as usize;
286        let expected = IDX_HEADER_BYTES + count * IDX_ENTRY_BYTES + IDX_CHECKSUM_BYTES;
287        if data.len() != expected {
288            return Ok(None);
289        }
290        let body = &data[..data.len() - IDX_CHECKSUM_BYTES];
291        let stored: [u8; 32] = match data[data.len() - IDX_CHECKSUM_BYTES..].try_into() {
292            Ok(a) => a,
293            Err(_) => return Ok(None),
294        };
295        if blake2b_raw(body) != stored {
296            return Ok(None); // corrupt/stale → fall back to scan
297        }
298        let mut entries = Vec::with_capacity(count);
299        let mut p = IDX_HEADER_BYTES;
300        for _ in 0..count {
301            let hash = hex::encode(&data[p..p + 32]);
302            let off = u64::from_le_bytes(data[p + 32..p + 40].try_into().unwrap());
303            let len = u32::from_le_bytes(data[p + 40..p + 44].try_into().unwrap());
304            entries.push((hash, off, len));
305            p += IDX_ENTRY_BYTES;
306        }
307        Ok(Some(entries))
308    }
309
310    /// Collect the index entries belonging to one segment (for sealing → .idx).
311    fn entries_for_segment(&self, id: u32) -> Vec<(String, u64, u32)> {
312        self.index
313            .iter()
314            .filter(|e| e.value().segment_id == id)
315            .map(|e| (e.key().clone(), e.value().offset, e.value().len))
316            .collect()
317    }
318
319    // ── core API ──────────────────────────────────────────────────────────────
320
321    /// True if this hash is already stored in a segment.
322    pub fn contains(&self, hash: &str) -> bool {
323        self.index.contains_key(hash)
324    }
325
326    /// Append `content` under `hash` (idempotent). `hash` must equal
327    /// `BLAKE2b(content)`; the caller computes it (parallel, outside the lock).
328    pub fn put(&self, hash: &str, content: &[u8]) -> Result<()> {
329        if self.index.contains_key(hash) {
330            return Ok(());
331        }
332        let len = content.len() as u32;
333        let record_size = 4u64 + content.len() as u64;
334
335        let mut active = self.active.lock().unwrap();
336        if self.index.contains_key(hash) {
337            return Ok(());
338        }
339
340        // Roll over if this record would push the active segment past the cap.
341        if active.offset > 0 && active.offset + record_size > self.max_segment_bytes {
342            let _ = active.file.flush();
343            let _ = active.file.sync_all();
344            // Seal: write the .idx for the segment we're leaving behind.
345            let sealed_id = active.id;
346            let entries = self.entries_for_segment(sealed_id);
347            let _ = Self::write_idx(&self.dir, sealed_id, &entries);
348            let next_id = sealed_id + 1;
349            let path = Self::seg_path(&self.dir, next_id);
350            let file = OpenOptions::new()
351                .create(true)
352                .read(true)
353                .write(true)
354                .open(&path)
355                .with_context(|| format!("open new segment {:?}", path))?;
356            *active = Active { id: next_id, file, offset: 0 };
357        }
358
359        let content_off = active.offset + 4;
360        let mut rec = Vec::with_capacity(4 + content.len());
361        rec.extend_from_slice(&len.to_le_bytes());
362        rec.extend_from_slice(content);
363        active.file.write_all(&rec)?;
364
365        let seg_id = active.id;
366        active.offset += record_size;
367        self.index.insert(
368            hash.to_string(),
369            SegmentLocation { segment_id: seg_id, offset: content_off, len },
370        );
371        Ok(())
372    }
373
374    /// Read the raw content bytes for `hash`, or `None` if not stored in any
375    /// segment (caller then falls back to the loose-object path). Re-verifies.
376    pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
377        let loc = match self.index.get(hash) {
378            Some(entry) => *entry.value(),
379            None => return Ok(None),
380        };
381        Ok(Some(Self::read_content(&self.dir, &loc, hash)?))
382    }
383
384    /// All hashes currently stored in segments.
385    pub fn all_hashes(&self) -> Vec<String> {
386        self.index.iter().map(|e| e.key().clone()).collect()
387    }
388
389    /// Flush + fsync the active segment. One durability point per batch.
390    pub fn sync(&self) -> Result<()> {
391        let mut active = self.active.lock().unwrap();
392        let _ = active.file.flush();
393        active.file.sync_all().context("fsync active segment")?;
394        Ok(())
395    }
396
397    // ── Phase 2: compaction / pruning ─────────────────────────────────────────
398
399    /// Rewrite the **live** object set into fresh segments and drop everything
400    /// else, reclaiming dead (superseded/spent/pruned) records.
401    ///
402    /// `live` is the set of hashes to KEEP — typically the current version of
403    /// every document (from the id-index). Hashes not in `live` are pruned, so
404    /// historical versions / AS OF / TRACE for dropped objects are discarded by
405    /// design (that is what reclaims space).
406    ///
407    /// Crash-safe: new segments are written + fsynced BEFORE any old segment is
408    /// deleted, so live data is never lost. A crash mid-compaction leaves both
409    /// the old and new segments (a re-open re-indexes the union — dead objects
410    /// merely linger until the next compaction); it never loses a live object.
411    ///
412    /// Must be called when the store is quiescent (no concurrent reads): writes
413    /// are blocked for the duration via the active lock, and the in-memory index
414    /// is swapped in place.
415    pub fn compact(&self, live: &HashSet<String>) -> Result<CompactStats> {
416        let mut active = self.active.lock().unwrap();
417
418        let total_before = self.index.len();
419        let old_max = active.id;
420        let new_base = old_max + 1;
421
422        // Snapshot the live entries to copy forward.
423        let to_copy: Vec<(String, SegmentLocation)> = self
424            .index
425            .iter()
426            .filter(|e| live.contains(e.key()))
427            .map(|e| (e.key().clone(), *e.value()))
428            .collect();
429
430        // Write live objects into fresh segments starting at new_base.
431        let new_index: DashMap<String, SegmentLocation> = DashMap::new();
432        let mut cur_id = new_base;
433        let mut cur_path = Self::seg_path(&self.dir, cur_id);
434        let mut cur_file = OpenOptions::new()
435            .create(true)
436            .truncate(true)
437            .read(true)
438            .write(true)
439            .open(&cur_path)
440            .with_context(|| format!("open compaction segment {:?}", cur_path))?;
441        let mut cur_off: u64 = 0;
442
443        for (hash, loc) in &to_copy {
444            let content = Self::read_content(&self.dir, loc, hash)?;
445            let len = content.len() as u32;
446            let record_size = 4u64 + content.len() as u64;
447
448            if cur_off > 0 && cur_off + record_size > self.max_segment_bytes {
449                let _ = cur_file.flush();
450                cur_file.sync_all().context("fsync sealed compaction segment")?;
451                let entries: Vec<(String, u64, u32)> = new_index
452                    .iter()
453                    .filter(|e| e.value().segment_id == cur_id)
454                    .map(|e| (e.key().clone(), e.value().offset, e.value().len))
455                    .collect();
456                let _ = Self::write_idx(&self.dir, cur_id, &entries);
457                cur_id += 1;
458                cur_path = Self::seg_path(&self.dir, cur_id);
459                cur_file = OpenOptions::new()
460                    .create(true)
461                    .truncate(true)
462                    .read(true)
463                    .write(true)
464                    .open(&cur_path)
465                    .with_context(|| format!("open compaction segment {:?}", cur_path))?;
466                cur_off = 0;
467            }
468
469            let content_off = cur_off + 4;
470            let mut rec = Vec::with_capacity(4 + content.len());
471            rec.extend_from_slice(&len.to_le_bytes());
472            rec.extend_from_slice(&content);
473            cur_file.write_all(&rec)?;
474            new_index.insert(hash.clone(), SegmentLocation { segment_id: cur_id, offset: content_off, len });
475            cur_off += record_size;
476        }
477        let _ = cur_file.flush();
478        cur_file.sync_all().context("fsync active compaction segment")?;
479
480        // The last new segment becomes the active one (reuse its handle).
481        let live_objects = to_copy.len();
482
483        // Swap the in-memory index to the rebuilt one.
484        self.index.clear();
485        for e in new_index.iter() {
486            self.index.insert(e.key().clone(), *e.value());
487        }
488        *active = Active { id: cur_id, file: cur_file, offset: cur_off };
489
490        // Delete every old segment (id < new_base) + its .idx, after the new
491        // ones are durable. Re-list so we only touch files that actually exist.
492        let mut bytes_reclaimed: u64 = 0;
493        if let Ok(rd) = fs::read_dir(&self.dir) {
494            for entry in rd.flatten() {
495                let name = entry.file_name().to_string_lossy().to_string();
496                let id_of = name
497                    .strip_prefix("seg-")
498                    .and_then(|r| r.strip_suffix(".dat").or_else(|| r.strip_suffix(".idx")))
499                    .and_then(|n| n.parse::<u32>().ok());
500                if let Some(id) = id_of {
501                    if id < new_base {
502                        if name.ends_with(".dat") {
503                            if let Ok(m) = entry.metadata() {
504                                bytes_reclaimed += m.len();
505                            }
506                        }
507                        let _ = fs::remove_file(entry.path());
508                    }
509                }
510            }
511        }
512
513        let segments_after = (cur_id - new_base + 1) as usize;
514        Ok(CompactStats {
515            live_objects,
516            dropped_objects: total_before.saturating_sub(live_objects),
517            bytes_reclaimed,
518            segments_after,
519        })
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use tempfile::tempdir;
527
528    fn put_get_hash(s: &SegmentStore, content: &[u8]) -> String {
529        let h = blake2b(content);
530        s.put(&h, content).unwrap();
531        h
532    }
533
534    #[test]
535    fn put_get_roundtrip() {
536        let dir = tempdir().unwrap();
537        let s = SegmentStore::open(dir.path()).unwrap();
538        let h = put_get_hash(&s, b"hello nedb v3");
539        assert_eq!(s.get(&h).unwrap().unwrap(), b"hello nedb v3");
540        assert!(s.contains(&h));
541        assert!(s.get(&"0".repeat(64)).unwrap().is_none());
542    }
543
544    #[test]
545    fn idempotent_put() {
546        let dir = tempdir().unwrap();
547        let s = SegmentStore::open(dir.path()).unwrap();
548        let h1 = put_get_hash(&s, b"dup");
549        let h2 = put_get_hash(&s, b"dup");
550        assert_eq!(h1, h2);
551        assert_eq!(s.all_hashes().len(), 1);
552    }
553
554    #[test]
555    fn index_rebuilt_on_reopen() {
556        let dir = tempdir().unwrap();
557        let h = {
558            let s = SegmentStore::open(dir.path()).unwrap();
559            let h = put_get_hash(&s, b"persisted");
560            s.sync().unwrap();
561            h
562        };
563        let s2 = SegmentStore::open(dir.path()).unwrap();
564        assert_eq!(s2.get(&h).unwrap().unwrap(), b"persisted");
565    }
566
567    #[test]
568    fn rollover_writes_idx_and_reopen_uses_it() {
569        let dir = tempdir().unwrap();
570        let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
571        let mut hashes = Vec::new();
572        for i in 0..8u32 {
573            hashes.push(put_get_hash(&s, format!("record-{}", i).as_bytes()));
574        }
575        s.sync().unwrap();
576        // Rollover should have produced sealed .idx sidecars.
577        let idx_files = fs::read_dir(dir.path().join("segments"))
578            .unwrap()
579            .flatten()
580            .filter(|e| e.file_name().to_string_lossy().ends_with(".idx"))
581            .count();
582        assert!(idx_files >= 1, "expected at least one sealed .idx");
583        // Reopen (loads sealed segments via .idx) — every object still reads.
584        let s2 = SegmentStore::open(dir.path()).unwrap();
585        for h in &hashes {
586            assert!(s2.get(h).unwrap().is_some());
587        }
588    }
589
590    #[test]
591    fn corrupt_idx_falls_back_to_scan() {
592        let dir = tempdir().unwrap();
593        let mut hashes = Vec::new();
594        {
595            let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
596            for i in 0..6u32 {
597                hashes.push(put_get_hash(&s, format!("rec-{}", i).as_bytes()));
598            }
599            s.sync().unwrap();
600        }
601        // Corrupt every .idx (truncate to garbage). Reopen must still work via scan.
602        for e in fs::read_dir(dir.path().join("segments")).unwrap().flatten() {
603            if e.file_name().to_string_lossy().ends_with(".idx") {
604                fs::write(e.path(), b"garbage").unwrap();
605            }
606        }
607        let s2 = SegmentStore::open(dir.path()).unwrap();
608        for h in &hashes {
609            assert!(s2.get(h).unwrap().is_some(), "scan fallback must recover the object");
610        }
611    }
612
613    #[test]
614    fn torn_tail_is_truncated_on_open() {
615        let dir = tempdir().unwrap();
616        let good = {
617            let s = SegmentStore::open(dir.path()).unwrap();
618            let h = put_get_hash(&s, b"good record");
619            s.sync().unwrap();
620            h
621        };
622        let seg = dir.path().join("segments").join("seg-000000.dat");
623        {
624            let mut f = OpenOptions::new().append(true).open(&seg).unwrap();
625            f.write_all(&9999u32.to_le_bytes()).unwrap();
626            f.write_all(b"short").unwrap();
627        }
628        let s2 = SegmentStore::open(dir.path()).unwrap();
629        assert_eq!(s2.get(&good).unwrap().unwrap(), b"good record");
630        let h2 = put_get_hash(&s2, b"after recovery");
631        assert!(s2.get(&h2).unwrap().is_some());
632    }
633
634    #[test]
635    fn tamper_detected_on_read() {
636        let dir = tempdir().unwrap();
637        let h = {
638            let s = SegmentStore::open(dir.path()).unwrap();
639            let h = put_get_hash(&s, b"authentic");
640            s.sync().unwrap();
641            h
642        };
643        let seg = dir.path().join("segments").join("seg-000000.dat");
644        let mut bytes = fs::read(&seg).unwrap();
645        let n = bytes.len();
646        bytes[n - 1] ^= 0xff;
647        fs::write(&seg, bytes).unwrap();
648        let s2 = SegmentStore::open(dir.path()).unwrap();
649        match s2.get(&h) {
650            Ok(None) => {}
651            Err(_) => {}
652            Ok(Some(_)) => panic!("tampered content must not verify under original hash"),
653        }
654    }
655
656    #[test]
657    fn compaction_keeps_live_drops_dead() {
658        let dir = tempdir().unwrap();
659        let s = SegmentStore::open(dir.path()).unwrap();
660        let keep = put_get_hash(&s, b"keep me");
661        let _drop1 = put_get_hash(&s, b"drop me 1");
662        let _drop2 = put_get_hash(&s, b"drop me 2");
663        s.sync().unwrap();
664        assert_eq!(s.all_hashes().len(), 3);
665
666        let mut live = HashSet::new();
667        live.insert(keep.clone());
668        let stats = s.compact(&live).unwrap();
669        assert_eq!(stats.live_objects, 1);
670        assert_eq!(stats.dropped_objects, 2);
671
672        // Live object survives; dead ones are gone.
673        assert_eq!(s.get(&keep).unwrap().unwrap(), b"keep me");
674        assert_eq!(s.all_hashes().len(), 1);
675
676        // And it survives a reopen (new segments + index swap persisted).
677        let s2 = SegmentStore::open(dir.path()).unwrap();
678        assert_eq!(s2.get(&keep).unwrap().unwrap(), b"keep me");
679        assert!(s2.get(&_drop1).unwrap().is_none());
680
681        // Writes still work after compaction.
682        let after = put_get_hash(&s, b"post-compaction");
683        assert!(s.get(&after).unwrap().is_some());
684    }
685
686    #[test]
687    fn compaction_reclaims_and_writes_still_read() {
688        let dir = tempdir().unwrap();
689        let s = SegmentStore::open_with_max(dir.path(), 64).unwrap();
690        let mut all = Vec::new();
691        for i in 0..20u32 {
692            all.push(put_get_hash(&s, format!("obj-{:03}", i).as_bytes()));
693        }
694        s.sync().unwrap();
695        // Keep only the even-indexed ones.
696        let mut live = HashSet::new();
697        for (i, h) in all.iter().enumerate() {
698            if i % 2 == 0 {
699                live.insert(h.clone());
700            }
701        }
702        let stats = s.compact(&live).unwrap();
703        assert_eq!(stats.live_objects, 10);
704        assert_eq!(stats.dropped_objects, 10);
705        for (i, h) in all.iter().enumerate() {
706            let got = s.get(h).unwrap();
707            if i % 2 == 0 {
708                assert!(got.is_some(), "live object {} must survive", i);
709            } else {
710                assert!(got.is_none(), "dead object {} must be pruned", i);
711            }
712        }
713    }
714}