Skip to main content

nedb_engine/
segment.rs

1//! segment.rs — NEDB v3 packed object substrate.
2//!
3//! v3 keeps the v2 logical model intact (content-addressed, immutable,
4//! BLAKE2b-verified DAG nodes) and changes only *where the bytes live*: instead
5//! of one filesystem object per node — which caps throughput at the OS
6//! small-file metadata rate — many immutable objects are appended into
7//! **segment files** addressed through an in-memory `hash -> (segment, offset,
8//! len)` index. The hash is still `BLAKE2b(content)` and is re-verified on every
9//! read, so content-addressing and tamper-evidence are unchanged.
10//!
11//! This module knows nothing about `Node`, JSON, or encryption: callers
12//! (`ObjectStore`) pass already-serialized/encrypted `content` bytes and the
13//! precomputed hash. That keeps the segment store a pure content<->location
14//! layer and leaves all crypto/serialization in `store.rs`.
15//!
16//! Phases:
17//!   1. Segment append + in-memory index + startup scan + tail-truncation.
18//!   2. Compaction/pruning — rewrite the live object set into fresh segments,
19//!      reclaiming dead (superseded/spent) records. `compact(live)`.
20//!   3. On-disk `.idx` sidecars — a sealed segment gets a checksummed
21//!      `hash -> (offset,len)` index file so cold start loads it instead of
22//!      rescanning every record. Missing/corrupt `.idx` falls back to a scan.
23//!
24//! Opt-in only: `ObjectStore` instantiates this when `NEDB_DAG_V3` is set
25//! (surfaced as the `--dag-v3` flag). Default storage is byte-for-byte v2.
26
27use std::collections::HashSet;
28use std::fs::{self, File, OpenOptions};
29use std::io::{Read, Seek, SeekFrom, Write};
30use std::path::{Path, PathBuf};
31use std::sync::{Arc, Mutex};
32
33use anyhow::{bail, Context, Result};
34use blake2::{Blake2b512, Digest};
35use dashmap::DashMap;
36
37/// Default segment rollover size (256 MiB).
38const DEFAULT_MAX_SEGMENT_BYTES: u64 = 256 * 1024 * 1024;
39
40/// Magic prefix for `.idx` sidecar files (NEDB v3 index, format 1).
41const IDX_MAGIC: &[u8; 4] = b"NIX1";
42/// `.idx` on-disk entry: 32-byte raw digest + u64 offset + u32 len.
43const IDX_ENTRY_BYTES: usize = 32 + 8 + 4;
44/// `.idx` header: magic(4) + count(8). Trailer: blake2b-256 checksum (32).
45const IDX_HEADER_BYTES: usize = 4 + 8;
46const IDX_CHECKSUM_BYTES: usize = 32;
47
48/// Location of one content record inside the segment set.
49#[derive(Clone, Copy, Debug)]
50struct SegmentLocation {
51    segment_id: u32,
52    /// Byte offset of the CONTENT (immediately after the u32 length prefix).
53    offset: u64,
54    len: u32,
55}
56
57/// Result of a `compact()` pass.
58#[derive(Clone, Copy, Debug, Default)]
59pub struct CompactStats {
60    /// Live objects copied forward into fresh segments.
61    pub live_objects: usize,
62    /// Dead objects dropped (superseded versions / pruned history).
63    pub dropped_objects: usize,
64    /// Bytes reclaimed by deleting the old segment files.
65    pub bytes_reclaimed: u64,
66    /// Number of segment files after compaction.
67    pub segments_after: usize,
68}
69
70/// BLAKE2b-256 (first 32 bytes of Blake2b-512) raw digest.
71fn blake2b_raw(data: &[u8]) -> [u8; 32] {
72    let mut h = Blake2b512::new();
73    h.update(data);
74    let out = h.finalize();
75    let mut a = [0u8; 32];
76    a.copy_from_slice(&out[..32]);
77    a
78}
79
80/// Hex-encoded BLAKE2b-256. MUST match `store::blake2b` so segment hashes equal
81/// loose-object hashes.
82fn blake2b(data: &[u8]) -> String {
83    hex::encode(blake2b_raw(data))
84}
85
86/// Positional read at an explicit offset — no shared-cursor state, so one
87/// handle serves any number of concurrent readers without locking.
88#[cfg(unix)]
89fn read_at(f: &File, buf: &mut [u8], offset: u64) -> std::io::Result<()> {
90    use std::os::unix::fs::FileExt;
91    f.read_exact_at(buf, offset)
92}
93
94/// Windows: `seek_read` takes an explicit offset per call (ReadFile with an
95/// OVERLAPPED offset). It may return short reads, so loop to fill the buffer.
96/// It does move that handle's file pointer — harmless here, because cached
97/// read handles are used exclusively through this function (every call passes
98/// its own absolute offset) and the appender writes through a different
99/// handle with its own cursor.
100#[cfg(windows)]
101fn read_at(f: &File, buf: &mut [u8], offset: u64) -> std::io::Result<()> {
102    use std::os::windows::fs::FileExt;
103    let mut done = 0usize;
104    while done < buf.len() {
105        let n = f.seek_read(&mut buf[done..], offset + done as u64)?;
106        if n == 0 {
107            return Err(std::io::Error::new(
108                std::io::ErrorKind::UnexpectedEof,
109                "eof mid-record in segment read",
110            ));
111        }
112        done += n;
113    }
114    Ok(())
115}
116
117/// The currently-appended-to segment.
118struct Active {
119    id: u32,
120    file: File,
121    /// End-of-file = next append position (kept in sync with the file cursor).
122    offset: u64,
123}
124
125/// Append-only, content-addressed packed object store with an in-memory index.
126pub struct SegmentStore {
127    dir: PathBuf,
128    index: DashMap<String, SegmentLocation>,
129    active: Mutex<Active>,
130    max_segment_bytes: u64,
131    /// macOS fast-fsync opt-in (from `NEDB_FAST_FSYNC`). When true, durability
132    /// points use a plain `fsync(2)` instead of std's `sync_all` (= F_FULLFSYNC
133    /// on macOS). Off by default → identical, full-durability behavior.
134    fast_fsync: bool,
135    /// Read-handle cache: one shared, read-only `File` per segment. Reads go
136    /// through positional I/O (`pread` on Unix, `seek_read` on Windows) with an
137    /// explicit offset per call, so a single handle serves any number of threads
138    /// with no cursor state and no lock. Before this cache every `get()` paid
139    /// `open + seek + read + close` — 3-4 syscalls and fd churn per point read,
140    /// on the path itcd's -dagv3 chainstate reads live on (and `CreateFile` is
141    /// the expensive one on Windows). These handles are NEVER used for writes:
142    /// the active segment's writer keeps its own separate handle (own cursor)
143    /// behind the `active` mutex. Cleared on `compact()` — old segment ids are
144    /// never referenced again after the index swap.
145    read_handles: DashMap<u32, Arc<File>>,
146}
147
148/// Durability point for a segment file.
149///
150/// Default (`fast = false`) is `File::sync_all()` everywhere — the safe,
151/// power-loss-durable choice. On macOS that maps to `F_FULLFSYNC` (a full
152/// hardware-cache flush to platter), which is often 10-100x slower than the
153/// plain `fsync(2)` Linux/Windows already use, especially on Fusion/SATA disks.
154///
155/// With `fast = true` (set via `NEDB_FAST_FSYNC`), macOS uses plain `fsync(2)`:
156/// the write reaches the drive (crash-safe) but the drive may still hold it in a
157/// volatile cache, so a sudden power cut could lose the last unsynced batch. NEDB
158/// v3 tolerates that — content-addressed objects, torn-tail truncation on open,
159/// and a reconstructible chainstate (re-sync from peers) — which is exactly why
160/// Bitcoin Core itself uses plain fsync for LevelDB on macOS. On non-macOS the
161/// flag is a no-op (`sync_all` is already a plain fsync / FlushFileBuffers).
162fn durable_sync(file: &File, fast: bool) -> std::io::Result<()> {
163    #[cfg(target_os = "macos")]
164    if fast {
165        use std::os::unix::io::AsRawFd;
166        // SAFETY: `file` is a valid, open descriptor for the duration of the call.
167        let rc = unsafe { libc::fsync(file.as_raw_fd()) };
168        return if rc == 0 { Ok(()) } else { Err(std::io::Error::last_os_error()) };
169    }
170    #[cfg(not(target_os = "macos"))]
171    let _ = fast;
172    file.sync_all()
173}
174
175impl SegmentStore {
176    fn seg_path(dir: &Path, id: u32) -> PathBuf {
177        dir.join(format!("seg-{:06}.dat", id))
178    }
179    fn idx_path(dir: &Path, id: u32) -> PathBuf {
180        dir.join(format!("seg-{:06}.idx", id))
181    }
182
183    /// Open (or create) the segment store under `{objects_root}/segments`.
184    pub fn open(objects_root: &Path) -> Result<Self> {
185        Self::open_with_max(objects_root, DEFAULT_MAX_SEGMENT_BYTES)
186    }
187
188    /// Like `open`, with an explicit rollover size (used by tests).
189    pub fn open_with_max(objects_root: &Path, max_segment_bytes: u64) -> Result<Self> {
190        let dir = objects_root.join("segments");
191        fs::create_dir_all(&dir).context("create objects/segments dir")?;
192
193        // Discover existing segment ids.
194        let mut ids: Vec<u32> = Vec::new();
195        for entry in fs::read_dir(&dir).context("read segments dir")? {
196            let entry = entry?;
197            let name = entry.file_name().to_string_lossy().to_string();
198            if let Some(rest) = name.strip_prefix("seg-") {
199                if let Some(num) = rest.strip_suffix(".dat") {
200                    if let Ok(id) = num.parse::<u32>() {
201                        ids.push(id);
202                    }
203                }
204            }
205        }
206        ids.sort_unstable();
207
208        let index: DashMap<String, SegmentLocation> = DashMap::new();
209        let mut active_id: u32 = 0;
210        let mut active_end: u64 = 0;
211
212        for (pos, &id) in ids.iter().enumerate() {
213            let is_last = pos + 1 == ids.len();
214            if is_last {
215                // Active (last) segment: always scan (no .idx — it's still
216                // mutable) and truncate any torn tail from a crash.
217                let (valid_end, entries) = Self::scan_segment(&dir, id)?;
218                for (h, o, l) in entries {
219                    index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
220                }
221                let path = Self::seg_path(&dir, id);
222                let file_len = fs::metadata(&path)?.len();
223                if valid_end < file_len {
224                    let f = OpenOptions::new().write(true).open(&path)?;
225                    f.set_len(valid_end)?;
226                }
227                active_id = id;
228                active_end = valid_end;
229            } else {
230                // Sealed segment: load its checksummed .idx if present+valid;
231                // otherwise scan it and heal by writing a fresh .idx.
232                match Self::load_idx(&dir, id) {
233                    Ok(Some(entries)) => {
234                        for (h, o, l) in entries {
235                            index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
236                        }
237                    }
238                    _ => {
239                        let (_ve, entries) = Self::scan_segment(&dir, id)?;
240                        for (h, o, l) in &entries {
241                            index.insert(h.clone(), SegmentLocation { segment_id: id, offset: *o, len: *l });
242                        }
243                        let _ = Self::write_idx(&dir, id, &entries); // best-effort heal
244                    }
245                }
246            }
247        }
248
249        // Open (creating if necessary) the active segment for appending.
250        let active_path = Self::seg_path(&dir, active_id);
251        let mut file = OpenOptions::new()
252            .create(true)
253            .read(true)
254            .write(true)
255            .open(&active_path)
256            .with_context(|| format!("open active segment {:?}", active_path))?;
257        file.seek(SeekFrom::Start(active_end))?;
258
259        let fast_fsync = std::env::var("NEDB_FAST_FSYNC")
260            .map(|v| {
261                let v = v.trim();
262                v == "1" || v.eq_ignore_ascii_case("true")
263                         || v.eq_ignore_ascii_case("on")
264                         || v.eq_ignore_ascii_case("yes")
265            })
266            .unwrap_or(false);
267
268        Ok(Self {
269            dir,
270            index,
271            active: Mutex::new(Active { id: active_id, file, offset: active_end }),
272            max_segment_bytes,
273            fast_fsync,
274            read_handles: DashMap::new(),
275        })
276    }
277
278    /// Scan one segment, returning (valid_end_offset, records). Records past a
279    /// torn tail are not returned; `valid_end` marks where to truncate.
280    fn scan_segment(dir: &Path, id: u32) -> Result<(u64, Vec<(String, u64, u32)>)> {
281        let path = Self::seg_path(dir, id);
282        let mut f = match File::open(&path) {
283            Ok(f) => f,
284            Err(_) => return Ok((0, Vec::new())),
285        };
286        let file_len = f.metadata()?.len();
287        let mut pos: u64 = 0;
288        let mut entries: Vec<(String, u64, u32)> = Vec::new();
289        loop {
290            if pos + 4 > file_len {
291                break; // no room for a length prefix → torn tail
292            }
293            f.seek(SeekFrom::Start(pos))?;
294            let mut len_buf = [0u8; 4];
295            if f.read_exact(&mut len_buf).is_err() {
296                break;
297            }
298            let len = u32::from_le_bytes(len_buf);
299            let content_off = pos + 4;
300            if content_off + (len as u64) > file_len {
301                break; // declared length overruns EOF → torn content
302            }
303            let mut content = vec![0u8; len as usize];
304            if f.read_exact(&mut content).is_err() {
305                break;
306            }
307            entries.push((blake2b(&content), content_off, len));
308            pos = content_off + len as u64;
309        }
310        Ok((pos, entries))
311    }
312
313    /// Get (or open and cache) the shared read-only handle for a segment.
314    /// A double-open race between two missing threads is harmless: `or_insert`
315    /// keeps the first handle and the loser's `File` is simply dropped (closed).
316    fn read_handle(&self, id: u32) -> Result<Arc<File>> {
317        if let Some(h) = self.read_handles.get(&id) {
318            return Ok(Arc::clone(h.value()));
319        }
320        let path = Self::seg_path(&self.dir, id);
321        let f = Arc::new(File::open(&path).with_context(|| format!("open segment {:?}", path))?);
322        Ok(Arc::clone(self.read_handles.entry(id).or_insert(f).value()))
323    }
324
325    /// Read+verify the raw content at a location through the shared handle
326    /// cache. Positional reads only — no seek, no cursor, no per-read open.
327    /// Errors on tamper. Safe against a concurrent appender on the active
328    /// segment: index entries are inserted only AFTER `write_all` returns, and
329    /// read-after-write through a second handle of the same file is coherent
330    /// through the page cache on both Unix and Windows.
331    fn read_content(&self, loc: &SegmentLocation, expect_hash: &str) -> Result<Vec<u8>> {
332        let f = self.read_handle(loc.segment_id)?;
333        let mut content = vec![0u8; loc.len as usize];
334        read_at(&f, &mut content, loc.offset)
335            .with_context(|| format!("read record from segment {}", loc.segment_id))?;
336        let actual = blake2b(&content);
337        if actual != expect_hash {
338            bail!("segment object {} tampered: recomputed {}", expect_hash, actual);
339        }
340        Ok(content)
341    }
342
343    // ── Phase 3: .idx sidecars ────────────────────────────────────────────────
344
345    /// Write a checksummed `.idx` for a SEALED segment (atomic via tmp→rename).
346    /// Best-effort: a failure just means the next open scans the segment.
347    fn write_idx(dir: &Path, id: u32, entries: &[(String, u64, u32)]) -> Result<()> {
348        let mut body: Vec<u8> = Vec::with_capacity(IDX_HEADER_BYTES + entries.len() * IDX_ENTRY_BYTES);
349        body.extend_from_slice(IDX_MAGIC);
350        body.extend_from_slice(&(entries.len() as u64).to_le_bytes());
351        for (hash, off, len) in entries {
352            let raw = hex::decode(hash).map_err(|_| anyhow::anyhow!("bad hash hex in idx write"))?;
353            if raw.len() != 32 {
354                bail!("idx write: hash not 32 bytes");
355            }
356            body.extend_from_slice(&raw);
357            body.extend_from_slice(&off.to_le_bytes());
358            body.extend_from_slice(&len.to_le_bytes());
359        }
360        let checksum = blake2b_raw(&body);
361        body.extend_from_slice(&checksum);
362
363        let path = Self::idx_path(dir, id);
364        let tmp = path.with_extension("idx.tmp");
365        fs::write(&tmp, &body)?;
366        fs::rename(&tmp, &path)?;
367        Ok(())
368    }
369
370    /// Load a `.idx` if present and checksum-valid. Returns Ok(None) if absent
371    /// or in any way unusable (caller then scans the segment).
372    fn load_idx(dir: &Path, id: u32) -> Result<Option<Vec<(String, u64, u32)>>> {
373        let path = Self::idx_path(dir, id);
374        let data = match fs::read(&path) {
375            Ok(d) => d,
376            Err(_) => return Ok(None),
377        };
378        if data.len() < IDX_HEADER_BYTES + IDX_CHECKSUM_BYTES {
379            return Ok(None);
380        }
381        if &data[0..4] != IDX_MAGIC {
382            return Ok(None);
383        }
384        let count = u64::from_le_bytes(data[4..12].try_into().unwrap()) as usize;
385        let expected = IDX_HEADER_BYTES + count * IDX_ENTRY_BYTES + IDX_CHECKSUM_BYTES;
386        if data.len() != expected {
387            return Ok(None);
388        }
389        let body = &data[..data.len() - IDX_CHECKSUM_BYTES];
390        let stored: [u8; 32] = match data[data.len() - IDX_CHECKSUM_BYTES..].try_into() {
391            Ok(a) => a,
392            Err(_) => return Ok(None),
393        };
394        if blake2b_raw(body) != stored {
395            return Ok(None); // corrupt/stale → fall back to scan
396        }
397        let mut entries = Vec::with_capacity(count);
398        let mut p = IDX_HEADER_BYTES;
399        for _ in 0..count {
400            let hash = hex::encode(&data[p..p + 32]);
401            let off = u64::from_le_bytes(data[p + 32..p + 40].try_into().unwrap());
402            let len = u32::from_le_bytes(data[p + 40..p + 44].try_into().unwrap());
403            entries.push((hash, off, len));
404            p += IDX_ENTRY_BYTES;
405        }
406        Ok(Some(entries))
407    }
408
409    /// Collect the index entries belonging to one segment (for sealing → .idx).
410    fn entries_for_segment(&self, id: u32) -> Vec<(String, u64, u32)> {
411        self.index
412            .iter()
413            .filter(|e| e.value().segment_id == id)
414            .map(|e| (e.key().clone(), e.value().offset, e.value().len))
415            .collect()
416    }
417
418    // ── core API ──────────────────────────────────────────────────────────────
419
420    /// True if this hash is already stored in a segment.
421    pub fn contains(&self, hash: &str) -> bool {
422        self.index.contains_key(hash)
423    }
424
425    /// Append `content` under `hash` (idempotent). `hash` must equal
426    /// `BLAKE2b(content)`; the caller computes it (parallel, outside the lock).
427    pub fn put(&self, hash: &str, content: &[u8]) -> Result<()> {
428        if self.index.contains_key(hash) {
429            return Ok(());
430        }
431        let len = content.len() as u32;
432        let record_size = 4u64 + content.len() as u64;
433
434        let mut active = self.active.lock().unwrap();
435        if self.index.contains_key(hash) {
436            return Ok(());
437        }
438
439        // Roll over if this record would push the active segment past the cap.
440        if active.offset > 0 && active.offset + record_size > self.max_segment_bytes {
441            let _ = active.file.flush();
442            let _ = durable_sync(&active.file, self.fast_fsync);
443            // Seal: write the .idx for the segment we're leaving behind.
444            let sealed_id = active.id;
445            let entries = self.entries_for_segment(sealed_id);
446            let _ = Self::write_idx(&self.dir, sealed_id, &entries);
447            let next_id = sealed_id + 1;
448            let path = Self::seg_path(&self.dir, next_id);
449            let file = OpenOptions::new()
450                .create(true)
451                .read(true)
452                .write(true)
453                .open(&path)
454                .with_context(|| format!("open new segment {:?}", path))?;
455            *active = Active { id: next_id, file, offset: 0 };
456        }
457
458        let content_off = active.offset + 4;
459        let mut rec = Vec::with_capacity(4 + content.len());
460        rec.extend_from_slice(&len.to_le_bytes());
461        rec.extend_from_slice(content);
462        active.file.write_all(&rec)?;
463
464        let seg_id = active.id;
465        active.offset += record_size;
466        self.index.insert(
467            hash.to_string(),
468            SegmentLocation { segment_id: seg_id, offset: content_off, len },
469        );
470        Ok(())
471    }
472
473    /// Read the raw content bytes for `hash`, or `None` if not stored in any
474    /// segment (caller then falls back to the loose-object path). Re-verifies.
475    pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
476        let loc = match self.index.get(hash) {
477            Some(entry) => *entry.value(),
478            None => return Ok(None),
479        };
480        Ok(Some(self.read_content(&loc, hash)?))
481    }
482
483    /// All hashes currently stored in segments.
484    pub fn all_hashes(&self) -> Vec<String> {
485        self.index.iter().map(|e| e.key().clone()).collect()
486    }
487
488    /// Flush + fsync the active segment. One durability point per batch.
489    pub fn sync(&self) -> Result<()> {
490        let mut active = self.active.lock().unwrap();
491        let _ = active.file.flush();
492        durable_sync(&active.file, self.fast_fsync).context("fsync active segment")?;
493        Ok(())
494    }
495
496    // ── Phase 2: compaction / pruning ─────────────────────────────────────────
497
498    /// Rewrite the **live** object set into fresh segments and drop everything
499    /// else, reclaiming dead (superseded/spent/pruned) records.
500    ///
501    /// `live` is the set of hashes to KEEP — typically the current version of
502    /// every document (from the id-index). Hashes not in `live` are pruned, so
503    /// historical versions / AS OF / TRACE for dropped objects are discarded by
504    /// design (that is what reclaims space).
505    ///
506    /// Crash-safe: new segments are written + fsynced BEFORE any old segment is
507    /// deleted, so live data is never lost. A crash mid-compaction leaves both
508    /// the old and new segments (a re-open re-indexes the union — dead objects
509    /// merely linger until the next compaction); it never loses a live object.
510    ///
511    /// Must be called when the store is quiescent (no concurrent reads): writes
512    /// are blocked for the duration via the active lock, and the in-memory index
513    /// is swapped in place.
514    pub fn compact(&self, live: &HashSet<String>) -> Result<CompactStats> {
515        let mut active = self.active.lock().unwrap();
516
517        let total_before = self.index.len();
518        let old_max = active.id;
519        let new_base = old_max + 1;
520
521        // Snapshot the live entries to copy forward.
522        let to_copy: Vec<(String, SegmentLocation)> = self
523            .index
524            .iter()
525            .filter(|e| live.contains(e.key()))
526            .map(|e| (e.key().clone(), *e.value()))
527            .collect();
528
529        // Write live objects into fresh segments starting at new_base.
530        let new_index: DashMap<String, SegmentLocation> = DashMap::new();
531        let mut cur_id = new_base;
532        let mut cur_path = Self::seg_path(&self.dir, cur_id);
533        let mut cur_file = OpenOptions::new()
534            .create(true)
535            .truncate(true)
536            .read(true)
537            .write(true)
538            .open(&cur_path)
539            .with_context(|| format!("open compaction segment {:?}", cur_path))?;
540        let mut cur_off: u64 = 0;
541
542        for (hash, loc) in &to_copy {
543            let content = self.read_content(loc, hash)?;
544            let len = content.len() as u32;
545            let record_size = 4u64 + content.len() as u64;
546
547            if cur_off > 0 && cur_off + record_size > self.max_segment_bytes {
548                let _ = cur_file.flush();
549                durable_sync(&cur_file, self.fast_fsync).context("fsync sealed compaction segment")?;
550                let entries: Vec<(String, u64, u32)> = new_index
551                    .iter()
552                    .filter(|e| e.value().segment_id == cur_id)
553                    .map(|e| (e.key().clone(), e.value().offset, e.value().len))
554                    .collect();
555                let _ = Self::write_idx(&self.dir, cur_id, &entries);
556                cur_id += 1;
557                cur_path = Self::seg_path(&self.dir, cur_id);
558                cur_file = OpenOptions::new()
559                    .create(true)
560                    .truncate(true)
561                    .read(true)
562                    .write(true)
563                    .open(&cur_path)
564                    .with_context(|| format!("open compaction segment {:?}", cur_path))?;
565                cur_off = 0;
566            }
567
568            let content_off = cur_off + 4;
569            let mut rec = Vec::with_capacity(4 + content.len());
570            rec.extend_from_slice(&len.to_le_bytes());
571            rec.extend_from_slice(&content);
572            cur_file.write_all(&rec)?;
573            new_index.insert(hash.clone(), SegmentLocation { segment_id: cur_id, offset: content_off, len });
574            cur_off += record_size;
575        }
576        let _ = cur_file.flush();
577        durable_sync(&cur_file, self.fast_fsync).context("fsync active compaction segment")?;
578
579        // The last new segment becomes the active one (reuse its handle).
580        let live_objects = to_copy.len();
581
582        // Swap the in-memory index to the rebuilt one.
583        self.index.clear();
584        for e in new_index.iter() {
585            self.index.insert(e.key().clone(), *e.value());
586        }
587        *active = Active { id: cur_id, file: cur_file, offset: cur_off };
588
589        // Drop every cached read handle: old segment ids are never referenced
590        // again after the index swap (ids strictly increase), and releasing the
591        // handles frees their fds before the files are deleted below. (Deletion
592        // would succeed even with handles open — Unix unlink semantics; Rust's
593        // std opens with FILE_SHARE_DELETE on Windows — this is hygiene, not
594        // correctness.) Fresh handles for the new segments open lazily on the
595        // next read.
596        self.read_handles.clear();
597
598        // Delete every old segment (id < new_base) + its .idx, after the new
599        // ones are durable. Re-list so we only touch files that actually exist.
600        let mut bytes_reclaimed: u64 = 0;
601        if let Ok(rd) = fs::read_dir(&self.dir) {
602            for entry in rd.flatten() {
603                let name = entry.file_name().to_string_lossy().to_string();
604                let id_of = name
605                    .strip_prefix("seg-")
606                    .and_then(|r| r.strip_suffix(".dat").or_else(|| r.strip_suffix(".idx")))
607                    .and_then(|n| n.parse::<u32>().ok());
608                if let Some(id) = id_of {
609                    if id < new_base {
610                        if name.ends_with(".dat") {
611                            if let Ok(m) = entry.metadata() {
612                                bytes_reclaimed += m.len();
613                            }
614                        }
615                        let _ = fs::remove_file(entry.path());
616                    }
617                }
618            }
619        }
620
621        let segments_after = (cur_id - new_base + 1) as usize;
622        Ok(CompactStats {
623            live_objects,
624            dropped_objects: total_before.saturating_sub(live_objects),
625            bytes_reclaimed,
626            segments_after,
627        })
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use tempfile::tempdir;
635
636    fn put_get_hash(s: &SegmentStore, content: &[u8]) -> String {
637        let h = blake2b(content);
638        s.put(&h, content).unwrap();
639        h
640    }
641
642    #[test]
643    fn put_get_roundtrip() {
644        let dir = tempdir().unwrap();
645        let s = SegmentStore::open(dir.path()).unwrap();
646        let h = put_get_hash(&s, b"hello nedb v3");
647        assert_eq!(s.get(&h).unwrap().unwrap(), b"hello nedb v3");
648        assert!(s.contains(&h));
649        assert!(s.get(&"0".repeat(64)).unwrap().is_none());
650    }
651
652    #[test]
653    fn idempotent_put() {
654        let dir = tempdir().unwrap();
655        let s = SegmentStore::open(dir.path()).unwrap();
656        let h1 = put_get_hash(&s, b"dup");
657        let h2 = put_get_hash(&s, b"dup");
658        assert_eq!(h1, h2);
659        assert_eq!(s.all_hashes().len(), 1);
660    }
661
662    #[test]
663    fn index_rebuilt_on_reopen() {
664        let dir = tempdir().unwrap();
665        let h = {
666            let s = SegmentStore::open(dir.path()).unwrap();
667            let h = put_get_hash(&s, b"persisted");
668            s.sync().unwrap();
669            h
670        };
671        let s2 = SegmentStore::open(dir.path()).unwrap();
672        assert_eq!(s2.get(&h).unwrap().unwrap(), b"persisted");
673    }
674
675    #[test]
676    fn rollover_writes_idx_and_reopen_uses_it() {
677        let dir = tempdir().unwrap();
678        let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
679        let mut hashes = Vec::new();
680        for i in 0..8u32 {
681            hashes.push(put_get_hash(&s, format!("record-{}", i).as_bytes()));
682        }
683        s.sync().unwrap();
684        // Rollover should have produced sealed .idx sidecars.
685        let idx_files = fs::read_dir(dir.path().join("segments"))
686            .unwrap()
687            .flatten()
688            .filter(|e| e.file_name().to_string_lossy().ends_with(".idx"))
689            .count();
690        assert!(idx_files >= 1, "expected at least one sealed .idx");
691        // Reopen (loads sealed segments via .idx) — every object still reads.
692        let s2 = SegmentStore::open(dir.path()).unwrap();
693        for h in &hashes {
694            assert!(s2.get(h).unwrap().is_some());
695        }
696    }
697
698    #[test]
699    fn corrupt_idx_falls_back_to_scan() {
700        let dir = tempdir().unwrap();
701        let mut hashes = Vec::new();
702        {
703            let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
704            for i in 0..6u32 {
705                hashes.push(put_get_hash(&s, format!("rec-{}", i).as_bytes()));
706            }
707            s.sync().unwrap();
708        }
709        // Corrupt every .idx (truncate to garbage). Reopen must still work via scan.
710        for e in fs::read_dir(dir.path().join("segments")).unwrap().flatten() {
711            if e.file_name().to_string_lossy().ends_with(".idx") {
712                fs::write(e.path(), b"garbage").unwrap();
713            }
714        }
715        let s2 = SegmentStore::open(dir.path()).unwrap();
716        for h in &hashes {
717            assert!(s2.get(h).unwrap().is_some(), "scan fallback must recover the object");
718        }
719    }
720
721    #[test]
722    fn torn_tail_is_truncated_on_open() {
723        let dir = tempdir().unwrap();
724        let good = {
725            let s = SegmentStore::open(dir.path()).unwrap();
726            let h = put_get_hash(&s, b"good record");
727            s.sync().unwrap();
728            h
729        };
730        let seg = dir.path().join("segments").join("seg-000000.dat");
731        {
732            let mut f = OpenOptions::new().append(true).open(&seg).unwrap();
733            f.write_all(&9999u32.to_le_bytes()).unwrap();
734            f.write_all(b"short").unwrap();
735        }
736        let s2 = SegmentStore::open(dir.path()).unwrap();
737        assert_eq!(s2.get(&good).unwrap().unwrap(), b"good record");
738        let h2 = put_get_hash(&s2, b"after recovery");
739        assert!(s2.get(&h2).unwrap().is_some());
740    }
741
742    #[test]
743    fn tamper_detected_on_read() {
744        let dir = tempdir().unwrap();
745        let h = {
746            let s = SegmentStore::open(dir.path()).unwrap();
747            let h = put_get_hash(&s, b"authentic");
748            s.sync().unwrap();
749            h
750        };
751        let seg = dir.path().join("segments").join("seg-000000.dat");
752        let mut bytes = fs::read(&seg).unwrap();
753        let n = bytes.len();
754        bytes[n - 1] ^= 0xff;
755        fs::write(&seg, bytes).unwrap();
756        let s2 = SegmentStore::open(dir.path()).unwrap();
757        match s2.get(&h) {
758            Ok(None) => {}
759            Err(_) => {}
760            Ok(Some(_)) => panic!("tampered content must not verify under original hash"),
761        }
762    }
763
764    #[test]
765    fn compaction_keeps_live_drops_dead() {
766        let dir = tempdir().unwrap();
767        let s = SegmentStore::open(dir.path()).unwrap();
768        let keep = put_get_hash(&s, b"keep me");
769        let _drop1 = put_get_hash(&s, b"drop me 1");
770        let _drop2 = put_get_hash(&s, b"drop me 2");
771        s.sync().unwrap();
772        assert_eq!(s.all_hashes().len(), 3);
773
774        let mut live = HashSet::new();
775        live.insert(keep.clone());
776        let stats = s.compact(&live).unwrap();
777        assert_eq!(stats.live_objects, 1);
778        assert_eq!(stats.dropped_objects, 2);
779
780        // Live object survives; dead ones are gone.
781        assert_eq!(s.get(&keep).unwrap().unwrap(), b"keep me");
782        assert_eq!(s.all_hashes().len(), 1);
783
784        // And it survives a reopen (new segments + index swap persisted).
785        let s2 = SegmentStore::open(dir.path()).unwrap();
786        assert_eq!(s2.get(&keep).unwrap().unwrap(), b"keep me");
787        assert!(s2.get(&_drop1).unwrap().is_none());
788
789        // Writes still work after compaction.
790        let after = put_get_hash(&s, b"post-compaction");
791        assert!(s.get(&after).unwrap().is_some());
792    }
793
794    #[test]
795    fn compaction_reclaims_and_writes_still_read() {
796        let dir = tempdir().unwrap();
797        let s = SegmentStore::open_with_max(dir.path(), 64).unwrap();
798        let mut all = Vec::new();
799        for i in 0..20u32 {
800            all.push(put_get_hash(&s, format!("obj-{:03}", i).as_bytes()));
801        }
802        s.sync().unwrap();
803        // Keep only the even-indexed ones.
804        let mut live = HashSet::new();
805        for (i, h) in all.iter().enumerate() {
806            if i % 2 == 0 {
807                live.insert(h.clone());
808            }
809        }
810        let stats = s.compact(&live).unwrap();
811        assert_eq!(stats.live_objects, 10);
812        assert_eq!(stats.dropped_objects, 10);
813        for (i, h) in all.iter().enumerate() {
814            let got = s.get(h).unwrap();
815            if i % 2 == 0 {
816                assert!(got.is_some(), "live object {} must survive", i);
817            } else {
818                assert!(got.is_none(), "dead object {} must be pruned", i);
819            }
820        }
821    }
822
823    /// The read-handle cache serves many threads through ONE shared handle per
824    /// segment via positional reads — no cursor, no lock, no per-read open.
825    /// Small rollover size forces multiple segments so the cache holds several
826    /// handles, and reads cover both sealed segments and the active one
827    /// (read-after-write coherence through a second handle of the same file).
828    #[test]
829    fn concurrent_reads_share_cached_handles() {
830        let dir = tempdir().unwrap();
831        let s = Arc::new(SegmentStore::open_with_max(dir.path(), 256).unwrap());
832        let mut hashes = Vec::new();
833        for i in 0..64u32 {
834            hashes.push(put_get_hash(&s, format!("concurrent-record-{:04}", i).as_bytes()));
835        }
836        s.sync().unwrap();
837
838        let hashes = Arc::new(hashes);
839        let mut joins = vec![];
840        for t in 0..4 {
841            let s2 = Arc::clone(&s);
842            let hs = Arc::clone(&hashes);
843            joins.push(std::thread::spawn(move || {
844                // Every thread reads every record, twice — the second pass runs
845                // entirely on warm cached handles.
846                for pass in 0..2 {
847                    for (i, h) in hs.iter().enumerate() {
848                        let got = s2.get(h).unwrap()
849                            .unwrap_or_else(|| panic!("thread {} pass {} record {}: missing", t, pass, i));
850                        assert_eq!(got, format!("concurrent-record-{:04}", i).as_bytes(),
851                                   "thread {} pass {} record {}: wrong bytes", t, pass, i);
852                    }
853                }
854            }));
855        }
856        for j in joins { j.join().unwrap(); }
857
858        // And the cache must not survive a compaction (old ids never return).
859        let live: HashSet<String> = hashes.iter().cloned().collect();
860        let stats = s.compact(&live).unwrap();
861        assert_eq!(stats.live_objects, 64);
862        for (i, h) in hashes.iter().enumerate() {
863            assert_eq!(s.get(h).unwrap().unwrap(),
864                       format!("concurrent-record-{:04}", i).as_bytes(),
865                       "record {} must read correctly through fresh post-compact handles", i);
866        }
867    }
868}