1use std::collections::HashSet;
28use std::fs::{self, File, OpenOptions};
29use std::io::{Read, Seek, SeekFrom, Write};
30use std::path::{Path, PathBuf};
31use std::sync::{Arc, Mutex};
32
33use anyhow::{bail, Context, Result};
34use blake2::{Blake2b512, Digest};
35use dashmap::DashMap;
36
37const DEFAULT_MAX_SEGMENT_BYTES: u64 = 256 * 1024 * 1024;
39
40const IDX_MAGIC: &[u8; 4] = b"NIX1";
42const IDX_ENTRY_BYTES: usize = 32 + 8 + 4;
44const IDX_HEADER_BYTES: usize = 4 + 8;
46const IDX_CHECKSUM_BYTES: usize = 32;
47
48#[derive(Clone, Copy, Debug)]
50struct SegmentLocation {
51 segment_id: u32,
52 offset: u64,
54 len: u32,
55}
56
57#[derive(Clone, Copy, Debug, Default)]
59pub struct CompactStats {
60 pub live_objects: usize,
62 pub dropped_objects: usize,
64 pub bytes_reclaimed: u64,
66 pub segments_after: usize,
68}
69
70fn blake2b_raw(data: &[u8]) -> [u8; 32] {
72 let mut h = Blake2b512::new();
73 h.update(data);
74 let out = h.finalize();
75 let mut a = [0u8; 32];
76 a.copy_from_slice(&out[..32]);
77 a
78}
79
80fn blake2b(data: &[u8]) -> String {
83 hex::encode(blake2b_raw(data))
84}
85
86#[cfg(unix)]
89fn read_at(f: &File, buf: &mut [u8], offset: u64) -> std::io::Result<()> {
90 use std::os::unix::fs::FileExt;
91 f.read_exact_at(buf, offset)
92}
93
94#[cfg(windows)]
101fn read_at(f: &File, buf: &mut [u8], offset: u64) -> std::io::Result<()> {
102 use std::os::windows::fs::FileExt;
103 let mut done = 0usize;
104 while done < buf.len() {
105 let n = f.seek_read(&mut buf[done..], offset + done as u64)?;
106 if n == 0 {
107 return Err(std::io::Error::new(
108 std::io::ErrorKind::UnexpectedEof,
109 "eof mid-record in segment read",
110 ));
111 }
112 done += n;
113 }
114 Ok(())
115}
116
117struct Active {
119 id: u32,
120 file: File,
121 offset: u64,
123}
124
125pub struct SegmentStore {
127 dir: PathBuf,
128 index: DashMap<String, SegmentLocation>,
129 active: Mutex<Active>,
130 max_segment_bytes: u64,
131 fast_fsync: bool,
135 read_handles: DashMap<u32, Arc<File>>,
146}
147
148fn durable_sync(file: &File, fast: bool) -> std::io::Result<()> {
163 #[cfg(target_os = "macos")]
164 if fast {
165 use std::os::unix::io::AsRawFd;
166 let rc = unsafe { libc::fsync(file.as_raw_fd()) };
168 return if rc == 0 { Ok(()) } else { Err(std::io::Error::last_os_error()) };
169 }
170 #[cfg(not(target_os = "macos"))]
171 let _ = fast;
172 file.sync_all()
173}
174
175impl SegmentStore {
176 fn seg_path(dir: &Path, id: u32) -> PathBuf {
177 dir.join(format!("seg-{:06}.dat", id))
178 }
179 fn idx_path(dir: &Path, id: u32) -> PathBuf {
180 dir.join(format!("seg-{:06}.idx", id))
181 }
182
183 pub fn open(objects_root: &Path) -> Result<Self> {
185 Self::open_with_max(objects_root, DEFAULT_MAX_SEGMENT_BYTES)
186 }
187
188 pub fn open_with_max(objects_root: &Path, max_segment_bytes: u64) -> Result<Self> {
190 let dir = objects_root.join("segments");
191 fs::create_dir_all(&dir).context("create objects/segments dir")?;
192
193 let mut ids: Vec<u32> = Vec::new();
195 for entry in fs::read_dir(&dir).context("read segments dir")? {
196 let entry = entry?;
197 let name = entry.file_name().to_string_lossy().to_string();
198 if let Some(rest) = name.strip_prefix("seg-") {
199 if let Some(num) = rest.strip_suffix(".dat") {
200 if let Ok(id) = num.parse::<u32>() {
201 ids.push(id);
202 }
203 }
204 }
205 }
206 ids.sort_unstable();
207
208 let index: DashMap<String, SegmentLocation> = DashMap::new();
209 let mut active_id: u32 = 0;
210 let mut active_end: u64 = 0;
211
212 for (pos, &id) in ids.iter().enumerate() {
213 let is_last = pos + 1 == ids.len();
214 if is_last {
215 let (valid_end, entries) = Self::scan_segment(&dir, id)?;
218 for (h, o, l) in entries {
219 index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
220 }
221 let path = Self::seg_path(&dir, id);
222 let file_len = fs::metadata(&path)?.len();
223 if valid_end < file_len {
224 let f = OpenOptions::new().write(true).open(&path)?;
225 f.set_len(valid_end)?;
226 }
227 active_id = id;
228 active_end = valid_end;
229 } else {
230 match Self::load_idx(&dir, id) {
233 Ok(Some(entries)) => {
234 for (h, o, l) in entries {
235 index.insert(h, SegmentLocation { segment_id: id, offset: o, len: l });
236 }
237 }
238 _ => {
239 let (_ve, entries) = Self::scan_segment(&dir, id)?;
240 for (h, o, l) in &entries {
241 index.insert(h.clone(), SegmentLocation { segment_id: id, offset: *o, len: *l });
242 }
243 let _ = Self::write_idx(&dir, id, &entries); }
245 }
246 }
247 }
248
249 let active_path = Self::seg_path(&dir, active_id);
251 let mut file = OpenOptions::new()
252 .create(true)
253 .read(true)
254 .write(true)
255 .open(&active_path)
256 .with_context(|| format!("open active segment {:?}", active_path))?;
257 file.seek(SeekFrom::Start(active_end))?;
258
259 let fast_fsync = std::env::var("NEDB_FAST_FSYNC")
260 .map(|v| {
261 let v = v.trim();
262 v == "1" || v.eq_ignore_ascii_case("true")
263 || v.eq_ignore_ascii_case("on")
264 || v.eq_ignore_ascii_case("yes")
265 })
266 .unwrap_or(false);
267
268 Ok(Self {
269 dir,
270 index,
271 active: Mutex::new(Active { id: active_id, file, offset: active_end }),
272 max_segment_bytes,
273 fast_fsync,
274 read_handles: DashMap::new(),
275 })
276 }
277
278 fn scan_segment(dir: &Path, id: u32) -> Result<(u64, Vec<(String, u64, u32)>)> {
281 let path = Self::seg_path(dir, id);
282 let mut f = match File::open(&path) {
283 Ok(f) => f,
284 Err(_) => return Ok((0, Vec::new())),
285 };
286 let file_len = f.metadata()?.len();
287 let mut pos: u64 = 0;
288 let mut entries: Vec<(String, u64, u32)> = Vec::new();
289 loop {
290 if pos + 4 > file_len {
291 break; }
293 f.seek(SeekFrom::Start(pos))?;
294 let mut len_buf = [0u8; 4];
295 if f.read_exact(&mut len_buf).is_err() {
296 break;
297 }
298 let len = u32::from_le_bytes(len_buf);
299 let content_off = pos + 4;
300 if content_off + (len as u64) > file_len {
301 break; }
303 let mut content = vec![0u8; len as usize];
304 if f.read_exact(&mut content).is_err() {
305 break;
306 }
307 entries.push((blake2b(&content), content_off, len));
308 pos = content_off + len as u64;
309 }
310 Ok((pos, entries))
311 }
312
313 fn read_handle(&self, id: u32) -> Result<Arc<File>> {
317 if let Some(h) = self.read_handles.get(&id) {
318 return Ok(Arc::clone(h.value()));
319 }
320 let path = Self::seg_path(&self.dir, id);
321 let f = Arc::new(File::open(&path).with_context(|| format!("open segment {:?}", path))?);
322 Ok(Arc::clone(self.read_handles.entry(id).or_insert(f).value()))
323 }
324
325 fn read_content(&self, loc: &SegmentLocation, expect_hash: &str) -> Result<Vec<u8>> {
332 let f = self.read_handle(loc.segment_id)?;
333 let mut content = vec![0u8; loc.len as usize];
334 read_at(&f, &mut content, loc.offset)
335 .with_context(|| format!("read record from segment {}", loc.segment_id))?;
336 let actual = blake2b(&content);
337 if actual != expect_hash {
338 bail!("segment object {} tampered: recomputed {}", expect_hash, actual);
339 }
340 Ok(content)
341 }
342
343 fn write_idx(dir: &Path, id: u32, entries: &[(String, u64, u32)]) -> Result<()> {
348 let mut body: Vec<u8> = Vec::with_capacity(IDX_HEADER_BYTES + entries.len() * IDX_ENTRY_BYTES);
349 body.extend_from_slice(IDX_MAGIC);
350 body.extend_from_slice(&(entries.len() as u64).to_le_bytes());
351 for (hash, off, len) in entries {
352 let raw = hex::decode(hash).map_err(|_| anyhow::anyhow!("bad hash hex in idx write"))?;
353 if raw.len() != 32 {
354 bail!("idx write: hash not 32 bytes");
355 }
356 body.extend_from_slice(&raw);
357 body.extend_from_slice(&off.to_le_bytes());
358 body.extend_from_slice(&len.to_le_bytes());
359 }
360 let checksum = blake2b_raw(&body);
361 body.extend_from_slice(&checksum);
362
363 let path = Self::idx_path(dir, id);
364 let tmp = path.with_extension("idx.tmp");
365 fs::write(&tmp, &body)?;
366 fs::rename(&tmp, &path)?;
367 Ok(())
368 }
369
370 fn load_idx(dir: &Path, id: u32) -> Result<Option<Vec<(String, u64, u32)>>> {
373 let path = Self::idx_path(dir, id);
374 let data = match fs::read(&path) {
375 Ok(d) => d,
376 Err(_) => return Ok(None),
377 };
378 if data.len() < IDX_HEADER_BYTES + IDX_CHECKSUM_BYTES {
379 return Ok(None);
380 }
381 if &data[0..4] != IDX_MAGIC {
382 return Ok(None);
383 }
384 let count = u64::from_le_bytes(data[4..12].try_into().unwrap()) as usize;
385 let expected = IDX_HEADER_BYTES + count * IDX_ENTRY_BYTES + IDX_CHECKSUM_BYTES;
386 if data.len() != expected {
387 return Ok(None);
388 }
389 let body = &data[..data.len() - IDX_CHECKSUM_BYTES];
390 let stored: [u8; 32] = match data[data.len() - IDX_CHECKSUM_BYTES..].try_into() {
391 Ok(a) => a,
392 Err(_) => return Ok(None),
393 };
394 if blake2b_raw(body) != stored {
395 return Ok(None); }
397 let mut entries = Vec::with_capacity(count);
398 let mut p = IDX_HEADER_BYTES;
399 for _ in 0..count {
400 let hash = hex::encode(&data[p..p + 32]);
401 let off = u64::from_le_bytes(data[p + 32..p + 40].try_into().unwrap());
402 let len = u32::from_le_bytes(data[p + 40..p + 44].try_into().unwrap());
403 entries.push((hash, off, len));
404 p += IDX_ENTRY_BYTES;
405 }
406 Ok(Some(entries))
407 }
408
409 fn entries_for_segment(&self, id: u32) -> Vec<(String, u64, u32)> {
411 self.index
412 .iter()
413 .filter(|e| e.value().segment_id == id)
414 .map(|e| (e.key().clone(), e.value().offset, e.value().len))
415 .collect()
416 }
417
418 pub fn contains(&self, hash: &str) -> bool {
422 self.index.contains_key(hash)
423 }
424
425 pub fn put(&self, hash: &str, content: &[u8]) -> Result<()> {
428 if self.index.contains_key(hash) {
429 return Ok(());
430 }
431 let len = content.len() as u32;
432 let record_size = 4u64 + content.len() as u64;
433
434 let mut active = self.active.lock().unwrap();
435 if self.index.contains_key(hash) {
436 return Ok(());
437 }
438
439 if active.offset > 0 && active.offset + record_size > self.max_segment_bytes {
441 let _ = active.file.flush();
442 let _ = durable_sync(&active.file, self.fast_fsync);
443 let sealed_id = active.id;
445 let entries = self.entries_for_segment(sealed_id);
446 let _ = Self::write_idx(&self.dir, sealed_id, &entries);
447 let next_id = sealed_id + 1;
448 let path = Self::seg_path(&self.dir, next_id);
449 let file = OpenOptions::new()
450 .create(true)
451 .read(true)
452 .write(true)
453 .open(&path)
454 .with_context(|| format!("open new segment {:?}", path))?;
455 *active = Active { id: next_id, file, offset: 0 };
456 }
457
458 let content_off = active.offset + 4;
459 let mut rec = Vec::with_capacity(4 + content.len());
460 rec.extend_from_slice(&len.to_le_bytes());
461 rec.extend_from_slice(content);
462 active.file.write_all(&rec)?;
463
464 let seg_id = active.id;
465 active.offset += record_size;
466 self.index.insert(
467 hash.to_string(),
468 SegmentLocation { segment_id: seg_id, offset: content_off, len },
469 );
470 Ok(())
471 }
472
473 pub fn get(&self, hash: &str) -> Result<Option<Vec<u8>>> {
476 let loc = match self.index.get(hash) {
477 Some(entry) => *entry.value(),
478 None => return Ok(None),
479 };
480 Ok(Some(self.read_content(&loc, hash)?))
481 }
482
483 pub fn all_hashes(&self) -> Vec<String> {
485 self.index.iter().map(|e| e.key().clone()).collect()
486 }
487
488 pub fn sync(&self) -> Result<()> {
490 let mut active = self.active.lock().unwrap();
491 let _ = active.file.flush();
492 durable_sync(&active.file, self.fast_fsync).context("fsync active segment")?;
493 Ok(())
494 }
495
496 pub fn compact(&self, live: &HashSet<String>) -> Result<CompactStats> {
515 let mut active = self.active.lock().unwrap();
516
517 let total_before = self.index.len();
518 let old_max = active.id;
519 let new_base = old_max + 1;
520
521 let to_copy: Vec<(String, SegmentLocation)> = self
523 .index
524 .iter()
525 .filter(|e| live.contains(e.key()))
526 .map(|e| (e.key().clone(), *e.value()))
527 .collect();
528
529 let new_index: DashMap<String, SegmentLocation> = DashMap::new();
531 let mut cur_id = new_base;
532 let mut cur_path = Self::seg_path(&self.dir, cur_id);
533 let mut cur_file = OpenOptions::new()
534 .create(true)
535 .truncate(true)
536 .read(true)
537 .write(true)
538 .open(&cur_path)
539 .with_context(|| format!("open compaction segment {:?}", cur_path))?;
540 let mut cur_off: u64 = 0;
541
542 for (hash, loc) in &to_copy {
543 let content = self.read_content(loc, hash)?;
544 let len = content.len() as u32;
545 let record_size = 4u64 + content.len() as u64;
546
547 if cur_off > 0 && cur_off + record_size > self.max_segment_bytes {
548 let _ = cur_file.flush();
549 durable_sync(&cur_file, self.fast_fsync).context("fsync sealed compaction segment")?;
550 let entries: Vec<(String, u64, u32)> = new_index
551 .iter()
552 .filter(|e| e.value().segment_id == cur_id)
553 .map(|e| (e.key().clone(), e.value().offset, e.value().len))
554 .collect();
555 let _ = Self::write_idx(&self.dir, cur_id, &entries);
556 cur_id += 1;
557 cur_path = Self::seg_path(&self.dir, cur_id);
558 cur_file = OpenOptions::new()
559 .create(true)
560 .truncate(true)
561 .read(true)
562 .write(true)
563 .open(&cur_path)
564 .with_context(|| format!("open compaction segment {:?}", cur_path))?;
565 cur_off = 0;
566 }
567
568 let content_off = cur_off + 4;
569 let mut rec = Vec::with_capacity(4 + content.len());
570 rec.extend_from_slice(&len.to_le_bytes());
571 rec.extend_from_slice(&content);
572 cur_file.write_all(&rec)?;
573 new_index.insert(hash.clone(), SegmentLocation { segment_id: cur_id, offset: content_off, len });
574 cur_off += record_size;
575 }
576 let _ = cur_file.flush();
577 durable_sync(&cur_file, self.fast_fsync).context("fsync active compaction segment")?;
578
579 let live_objects = to_copy.len();
581
582 self.index.clear();
584 for e in new_index.iter() {
585 self.index.insert(e.key().clone(), *e.value());
586 }
587 *active = Active { id: cur_id, file: cur_file, offset: cur_off };
588
589 self.read_handles.clear();
597
598 let mut bytes_reclaimed: u64 = 0;
601 if let Ok(rd) = fs::read_dir(&self.dir) {
602 for entry in rd.flatten() {
603 let name = entry.file_name().to_string_lossy().to_string();
604 let id_of = name
605 .strip_prefix("seg-")
606 .and_then(|r| r.strip_suffix(".dat").or_else(|| r.strip_suffix(".idx")))
607 .and_then(|n| n.parse::<u32>().ok());
608 if let Some(id) = id_of {
609 if id < new_base {
610 if name.ends_with(".dat") {
611 if let Ok(m) = entry.metadata() {
612 bytes_reclaimed += m.len();
613 }
614 }
615 let _ = fs::remove_file(entry.path());
616 }
617 }
618 }
619 }
620
621 let segments_after = (cur_id - new_base + 1) as usize;
622 Ok(CompactStats {
623 live_objects,
624 dropped_objects: total_before.saturating_sub(live_objects),
625 bytes_reclaimed,
626 segments_after,
627 })
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use tempfile::tempdir;
635
636 fn put_get_hash(s: &SegmentStore, content: &[u8]) -> String {
637 let h = blake2b(content);
638 s.put(&h, content).unwrap();
639 h
640 }
641
642 #[test]
643 fn put_get_roundtrip() {
644 let dir = tempdir().unwrap();
645 let s = SegmentStore::open(dir.path()).unwrap();
646 let h = put_get_hash(&s, b"hello nedb v3");
647 assert_eq!(s.get(&h).unwrap().unwrap(), b"hello nedb v3");
648 assert!(s.contains(&h));
649 assert!(s.get(&"0".repeat(64)).unwrap().is_none());
650 }
651
652 #[test]
653 fn idempotent_put() {
654 let dir = tempdir().unwrap();
655 let s = SegmentStore::open(dir.path()).unwrap();
656 let h1 = put_get_hash(&s, b"dup");
657 let h2 = put_get_hash(&s, b"dup");
658 assert_eq!(h1, h2);
659 assert_eq!(s.all_hashes().len(), 1);
660 }
661
662 #[test]
663 fn index_rebuilt_on_reopen() {
664 let dir = tempdir().unwrap();
665 let h = {
666 let s = SegmentStore::open(dir.path()).unwrap();
667 let h = put_get_hash(&s, b"persisted");
668 s.sync().unwrap();
669 h
670 };
671 let s2 = SegmentStore::open(dir.path()).unwrap();
672 assert_eq!(s2.get(&h).unwrap().unwrap(), b"persisted");
673 }
674
675 #[test]
676 fn rollover_writes_idx_and_reopen_uses_it() {
677 let dir = tempdir().unwrap();
678 let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
679 let mut hashes = Vec::new();
680 for i in 0..8u32 {
681 hashes.push(put_get_hash(&s, format!("record-{}", i).as_bytes()));
682 }
683 s.sync().unwrap();
684 let idx_files = fs::read_dir(dir.path().join("segments"))
686 .unwrap()
687 .flatten()
688 .filter(|e| e.file_name().to_string_lossy().ends_with(".idx"))
689 .count();
690 assert!(idx_files >= 1, "expected at least one sealed .idx");
691 let s2 = SegmentStore::open(dir.path()).unwrap();
693 for h in &hashes {
694 assert!(s2.get(h).unwrap().is_some());
695 }
696 }
697
698 #[test]
699 fn corrupt_idx_falls_back_to_scan() {
700 let dir = tempdir().unwrap();
701 let mut hashes = Vec::new();
702 {
703 let s = SegmentStore::open_with_max(dir.path(), 32).unwrap();
704 for i in 0..6u32 {
705 hashes.push(put_get_hash(&s, format!("rec-{}", i).as_bytes()));
706 }
707 s.sync().unwrap();
708 }
709 for e in fs::read_dir(dir.path().join("segments")).unwrap().flatten() {
711 if e.file_name().to_string_lossy().ends_with(".idx") {
712 fs::write(e.path(), b"garbage").unwrap();
713 }
714 }
715 let s2 = SegmentStore::open(dir.path()).unwrap();
716 for h in &hashes {
717 assert!(s2.get(h).unwrap().is_some(), "scan fallback must recover the object");
718 }
719 }
720
721 #[test]
722 fn torn_tail_is_truncated_on_open() {
723 let dir = tempdir().unwrap();
724 let good = {
725 let s = SegmentStore::open(dir.path()).unwrap();
726 let h = put_get_hash(&s, b"good record");
727 s.sync().unwrap();
728 h
729 };
730 let seg = dir.path().join("segments").join("seg-000000.dat");
731 {
732 let mut f = OpenOptions::new().append(true).open(&seg).unwrap();
733 f.write_all(&9999u32.to_le_bytes()).unwrap();
734 f.write_all(b"short").unwrap();
735 }
736 let s2 = SegmentStore::open(dir.path()).unwrap();
737 assert_eq!(s2.get(&good).unwrap().unwrap(), b"good record");
738 let h2 = put_get_hash(&s2, b"after recovery");
739 assert!(s2.get(&h2).unwrap().is_some());
740 }
741
742 #[test]
743 fn tamper_detected_on_read() {
744 let dir = tempdir().unwrap();
745 let h = {
746 let s = SegmentStore::open(dir.path()).unwrap();
747 let h = put_get_hash(&s, b"authentic");
748 s.sync().unwrap();
749 h
750 };
751 let seg = dir.path().join("segments").join("seg-000000.dat");
752 let mut bytes = fs::read(&seg).unwrap();
753 let n = bytes.len();
754 bytes[n - 1] ^= 0xff;
755 fs::write(&seg, bytes).unwrap();
756 let s2 = SegmentStore::open(dir.path()).unwrap();
757 match s2.get(&h) {
758 Ok(None) => {}
759 Err(_) => {}
760 Ok(Some(_)) => panic!("tampered content must not verify under original hash"),
761 }
762 }
763
764 #[test]
765 fn compaction_keeps_live_drops_dead() {
766 let dir = tempdir().unwrap();
767 let s = SegmentStore::open(dir.path()).unwrap();
768 let keep = put_get_hash(&s, b"keep me");
769 let _drop1 = put_get_hash(&s, b"drop me 1");
770 let _drop2 = put_get_hash(&s, b"drop me 2");
771 s.sync().unwrap();
772 assert_eq!(s.all_hashes().len(), 3);
773
774 let mut live = HashSet::new();
775 live.insert(keep.clone());
776 let stats = s.compact(&live).unwrap();
777 assert_eq!(stats.live_objects, 1);
778 assert_eq!(stats.dropped_objects, 2);
779
780 assert_eq!(s.get(&keep).unwrap().unwrap(), b"keep me");
782 assert_eq!(s.all_hashes().len(), 1);
783
784 let s2 = SegmentStore::open(dir.path()).unwrap();
786 assert_eq!(s2.get(&keep).unwrap().unwrap(), b"keep me");
787 assert!(s2.get(&_drop1).unwrap().is_none());
788
789 let after = put_get_hash(&s, b"post-compaction");
791 assert!(s.get(&after).unwrap().is_some());
792 }
793
794 #[test]
795 fn compaction_reclaims_and_writes_still_read() {
796 let dir = tempdir().unwrap();
797 let s = SegmentStore::open_with_max(dir.path(), 64).unwrap();
798 let mut all = Vec::new();
799 for i in 0..20u32 {
800 all.push(put_get_hash(&s, format!("obj-{:03}", i).as_bytes()));
801 }
802 s.sync().unwrap();
803 let mut live = HashSet::new();
805 for (i, h) in all.iter().enumerate() {
806 if i % 2 == 0 {
807 live.insert(h.clone());
808 }
809 }
810 let stats = s.compact(&live).unwrap();
811 assert_eq!(stats.live_objects, 10);
812 assert_eq!(stats.dropped_objects, 10);
813 for (i, h) in all.iter().enumerate() {
814 let got = s.get(h).unwrap();
815 if i % 2 == 0 {
816 assert!(got.is_some(), "live object {} must survive", i);
817 } else {
818 assert!(got.is_none(), "dead object {} must be pruned", i);
819 }
820 }
821 }
822
823 #[test]
829 fn concurrent_reads_share_cached_handles() {
830 let dir = tempdir().unwrap();
831 let s = Arc::new(SegmentStore::open_with_max(dir.path(), 256).unwrap());
832 let mut hashes = Vec::new();
833 for i in 0..64u32 {
834 hashes.push(put_get_hash(&s, format!("concurrent-record-{:04}", i).as_bytes()));
835 }
836 s.sync().unwrap();
837
838 let hashes = Arc::new(hashes);
839 let mut joins = vec![];
840 for t in 0..4 {
841 let s2 = Arc::clone(&s);
842 let hs = Arc::clone(&hashes);
843 joins.push(std::thread::spawn(move || {
844 for pass in 0..2 {
847 for (i, h) in hs.iter().enumerate() {
848 let got = s2.get(h).unwrap()
849 .unwrap_or_else(|| panic!("thread {} pass {} record {}: missing", t, pass, i));
850 assert_eq!(got, format!("concurrent-record-{:04}", i).as_bytes(),
851 "thread {} pass {} record {}: wrong bytes", t, pass, i);
852 }
853 }
854 }));
855 }
856 for j in joins { j.join().unwrap(); }
857
858 let live: HashSet<String> = hashes.iter().cloned().collect();
860 let stats = s.compact(&live).unwrap();
861 assert_eq!(stats.live_objects, 64);
862 for (i, h) in hashes.iter().enumerate() {
863 assert_eq!(s.get(h).unwrap().unwrap(),
864 format!("concurrent-record-{:04}", i).as_bytes(),
865 "record {} must read correctly through fresh post-compact handles", i);
866 }
867 }
868}