1mod disk_hash_index;
32
33use memmap2::MmapOptions;
34use parking_lot::RwLock;
35use sochdb_core::{Result, SochDBError};
36use std::fs::{File, OpenOptions};
37use std::io::{Read, Seek, SeekFrom, Write as IoWrite};
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40
41use disk_hash_index::DiskHashIndex;
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
45#[repr(u8)]
46pub enum CompressionType {
47 None = 0,
48 LZ4 = 1,
49 ZSTD = 2,
50}
51
52impl CompressionType {
53 pub fn from_u8(value: u8) -> Result<Self> {
54 match value {
55 0 => Ok(CompressionType::None),
56 1 => Ok(CompressionType::LZ4),
57 2 => Ok(CompressionType::ZSTD),
58 _ => Err(SochDBError::InvalidArgument(format!(
59 "Invalid compression type: {}",
60 value
61 ))),
62 }
63 }
64}
65
66#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
68pub struct PayloadMeta {
69 pub edge_id: u128,
70 pub offset: u64,
71 pub length: u32,
72 pub compression: CompressionType,
73 pub uncompressed_length: u32,
74}
75
76pub trait PayloadIndex: Send + Sync {
80 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()>;
81 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>>;
82 fn contains_key(&self, edge_id: u128) -> bool;
83 fn len(&self) -> usize;
84 fn is_empty(&self) -> bool;
85 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_>;
86 fn save(&self) -> Result<()>;
87}
88
89struct HashMapIndex {
91 inner: Arc<RwLock<std::collections::HashMap<u128, PayloadMeta>>>,
92 index_path: PathBuf,
93}
94
95impl HashMapIndex {
96 fn new(index_path: PathBuf) -> Result<Self> {
97 if let Some(parent) = index_path.parent() {
99 std::fs::create_dir_all(parent)?;
100 }
101
102 let inner = if index_path.exists() {
103 let loaded = Self::load_from_disk(&index_path)?;
104 tracing::info!(
105 "Loaded payload index with {} entries from {:?}",
106 loaded.len(),
107 index_path
108 );
109 Arc::new(RwLock::new(loaded))
110 } else {
111 tracing::info!(
112 "No existing payload index found at {:?}, starting fresh",
113 index_path
114 );
115 Arc::new(RwLock::new(std::collections::HashMap::new()))
116 };
117 Ok(Self { inner, index_path })
118 }
119
120 fn load_from_disk(path: &Path) -> Result<std::collections::HashMap<u128, PayloadMeta>> {
121 let file = File::open(path)?;
122 let mut reader = std::io::BufReader::new(file);
123
124 let mut magic = [0u8; 8];
126 reader.read_exact(&mut magic)?;
127 if &magic != b"CHRLPAY1" {
128 return Err(SochDBError::Corruption(
129 "Invalid payload index magic".into(),
130 ));
131 }
132
133 let mut count_bytes = [0u8; 8];
135 reader.read_exact(&mut count_bytes)?;
136 let count = u64::from_le_bytes(count_bytes);
137
138 let mut index = std::collections::HashMap::new();
139
140 for _ in 0..count {
141 let mut edge_id_bytes = [0u8; 16];
142 reader.read_exact(&mut edge_id_bytes)?;
143 let edge_id = u128::from_le_bytes(edge_id_bytes);
144
145 let mut offset_bytes = [0u8; 8];
146 reader.read_exact(&mut offset_bytes)?;
147 let offset = u64::from_le_bytes(offset_bytes);
148
149 let mut length_bytes = [0u8; 4];
150 reader.read_exact(&mut length_bytes)?;
151 let length = u32::from_le_bytes(length_bytes);
152
153 let mut compression_byte = [0u8; 1];
154 reader.read_exact(&mut compression_byte)?;
155 let compression = CompressionType::from_u8(compression_byte[0])?;
156
157 let mut uncompressed_bytes = [0u8; 4];
158 reader.read_exact(&mut uncompressed_bytes)?;
159 let uncompressed_length = u32::from_le_bytes(uncompressed_bytes);
160
161 index.insert(
162 edge_id,
163 PayloadMeta {
164 edge_id,
165 offset,
166 length,
167 compression,
168 uncompressed_length,
169 },
170 );
171 }
172
173 Ok(index)
174 }
175}
176
177impl PayloadIndex for HashMapIndex {
178 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
179 self.inner.write().insert(edge_id, meta);
180 Ok(())
181 }
182
183 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
184 Ok(self.inner.read().get(&edge_id).cloned())
185 }
186
187 fn contains_key(&self, edge_id: u128) -> bool {
188 self.inner.read().contains_key(&edge_id)
189 }
190
191 fn len(&self) -> usize {
192 self.inner.read().len()
193 }
194
195 fn is_empty(&self) -> bool {
196 self.inner.read().is_empty()
197 }
198
199 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
200 let values: Vec<_> = self.inner.read().values().cloned().collect();
201 Box::new(values.into_iter())
202 }
203
204 fn save(&self) -> Result<()> {
205 let inner = self.inner.read();
206
207 if let Some(parent) = self.index_path.parent() {
209 std::fs::create_dir_all(parent)?;
210 }
211
212 let temp_path = self.index_path.with_extension("tmp");
214 let file = File::create(&temp_path)?;
215 let mut writer = std::io::BufWriter::new(file);
216
217 writer.write_all(b"CHRLPAY1")?;
219
220 writer.write_all(&(inner.len() as u64).to_le_bytes())?;
222
223 for meta in inner.values() {
225 writer.write_all(&meta.edge_id.to_le_bytes())?;
226 writer.write_all(&meta.offset.to_le_bytes())?;
227 writer.write_all(&meta.length.to_le_bytes())?;
228 writer.write_all(&[meta.compression as u8])?;
229 writer.write_all(&meta.uncompressed_length.to_le_bytes())?;
230 }
231
232 std::io::Write::flush(&mut writer)?;
234 let file = writer
235 .into_inner()
236 .map_err(|e| SochDBError::Internal(format!("Failed to flush: {}", e)))?;
237 file.sync_all()?;
238
239 std::fs::rename(&temp_path, &self.index_path)?;
241
242 tracing::debug!(
243 entries = inner.len(),
244 path = %self.index_path.display(),
245 "Saved payload index to disk"
246 );
247
248 Ok(())
249 }
250}
251
252pub enum IndexBackend {
256 HashMap,
258 DiskHash,
263}
264
265pub struct PayloadStore {
295 data_file: Arc<RwLock<File>>,
296 #[allow(dead_code)]
297 data_path: PathBuf,
298 mmap: Arc<RwLock<Option<memmap2::Mmap>>>,
299 index: Arc<dyn PayloadIndex>,
300 next_offset: Arc<RwLock<u64>>,
301 memory_warning_logged: Arc<RwLock<(bool, bool, bool)>>, backend_type: IndexBackend,
305}
306
307impl PayloadStore {
308 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
310 Self::open_with_backend(data_dir, IndexBackend::HashMap)
311 }
312
313 pub fn open_with_backend<P: AsRef<Path>>(data_dir: P, backend: IndexBackend) -> Result<Self> {
315 let data_dir = data_dir.as_ref();
316 std::fs::create_dir_all(data_dir)?;
317
318 let data_path = data_dir.join("payload.data");
319 let index_path = match backend {
320 IndexBackend::HashMap => data_dir.join("payload.index"),
321 IndexBackend::DiskHash => data_dir.join("payload_disk_hash.idx"),
322 };
323
324 let data_file = OpenOptions::new()
326 .read(true)
327 .write(true)
328 .create(true)
329 .truncate(false)
330 .open(&data_path)?;
331
332 let file_len = data_file.metadata()?.len();
333
334 let index: Arc<dyn PayloadIndex> = match backend {
336 IndexBackend::HashMap => Arc::new(HashMapIndex::new(index_path)?),
337 IndexBackend::DiskHash => Arc::new(DiskHashIndex::new(index_path)?),
338 };
339
340 let mmap = if file_len > 0 {
342 let mmap = unsafe { MmapOptions::new().map(&data_file)? };
343 Some(mmap)
344 } else {
345 None
346 };
347
348 Ok(Self {
349 data_file: Arc::new(RwLock::new(data_file)),
350 data_path,
351 mmap: Arc::new(RwLock::new(mmap)),
352 index,
353 next_offset: Arc::new(RwLock::new(file_len)),
354 memory_warning_logged: Arc::new(RwLock::new((false, false, false))),
355 backend_type: backend,
356 })
357 }
358
359 pub fn append(
371 &self,
372 edge_id: u128,
373 data: &[u8],
374 compression: Option<CompressionType>,
375 ) -> Result<(u64, u32, u8)> {
376 let uncompressed_len = data.len() as u32;
377
378 let compression = compression.unwrap_or(if data.len() < 1024 {
380 CompressionType::None
381 } else if data.len() < 100_000 {
382 CompressionType::LZ4
383 } else {
384 CompressionType::ZSTD
385 });
386
387 let (compressed_data, actual_compression) = match compression {
389 CompressionType::None => (data.to_vec(), CompressionType::None),
390 CompressionType::LZ4 => {
391 match lz4::block::compress(data, None, false) {
392 Ok(compressed) => {
393 if compressed.len() < data.len() {
395 (compressed, CompressionType::LZ4)
396 } else {
397 (data.to_vec(), CompressionType::None)
398 }
399 }
400 Err(_) => (data.to_vec(), CompressionType::None),
401 }
402 }
403 CompressionType::ZSTD => {
404 match zstd::encode_all(data, 3) {
405 Ok(compressed) => {
407 if compressed.len() < data.len() {
408 (compressed, CompressionType::ZSTD)
409 } else {
410 (data.to_vec(), CompressionType::None)
411 }
412 }
413 Err(_) => (data.to_vec(), CompressionType::None),
414 }
415 }
416 };
417
418 let compressed_len = compressed_data.len() as u32;
419
420 let offset = {
422 let mut file = self.data_file.write();
423 let offset = *self.next_offset.read();
424
425 file.seek(SeekFrom::End(0))?;
426 file.write_all(&compressed_data)?;
427 file.sync_all()?;
428
429 offset
430 };
431
432 *self.next_offset.write() = offset + compressed_len as u64;
434
435 self.index.insert(
437 edge_id,
438 PayloadMeta {
439 edge_id,
440 offset,
441 length: compressed_len,
442 compression: actual_compression,
443 uncompressed_length: uncompressed_len,
444 },
445 )?;
446
447 self.remap_file()?;
449
450 if matches!(self.backend_type, IndexBackend::HashMap) {
453 let count = self.index.len();
454 let mut warnings = self.memory_warning_logged.write();
455
456 if count >= 10_000_000 && !warnings.2 {
457 warnings.2 = true;
458 tracing::warn!(
459 payload_count = count,
460 estimated_ram_mb = count * 50 / 1_000_000,
461 "CRITICAL: PayloadStore has 10M+ entries (~500MB RAM). \
462 Strongly recommend switching to DiskHash backend to prevent OOM. \
463 Use PayloadStore::open_with_backend(path, IndexBackend::DiskHash)"
464 );
465 } else if count >= 5_000_000 && !warnings.1 {
466 warnings.1 = true;
467 tracing::warn!(
468 payload_count = count,
469 estimated_ram_mb = count * 50 / 1_000_000,
470 "WARNING: PayloadStore has 5M+ entries (~250MB RAM). \
471 Consider switching to DiskHash backend for better memory efficiency. \
472 Use PayloadStore::open_with_backend(path, IndexBackend::DiskHash)"
473 );
474 } else if count >= 1_000_000 && !warnings.0 {
475 warnings.0 = true;
476 tracing::info!(
477 payload_count = count,
478 estimated_ram_mb = count * 50 / 1_000_000,
479 "INFO: PayloadStore has reached 1M entries (~50MB RAM). \
480 Memory usage will grow linearly. Consider DiskHash backend for 10M+ scale."
481 );
482 }
483 }
484
485 Ok((offset, compressed_len, actual_compression as u8))
496 }
497
498 pub fn get(&self, edge_id: u128) -> Result<Option<Vec<u8>>> {
500 let meta = match self.index.get(edge_id)? {
501 Some(m) => m,
502 None => return Ok(None),
503 };
504
505 self.get_at_offset(meta.offset, meta.length, meta.compression)
506 .map(Some)
507 }
508
509 pub fn get_at_offset(
514 &self,
515 offset: u64,
516 length: u32,
517 compression: CompressionType,
518 ) -> Result<Vec<u8>> {
519 let uncompressed_size = if compression == CompressionType::LZ4 {
521 self.index
523 .iter_values()
524 .find(|meta| meta.offset == offset && meta.length == length)
525 .map(|meta| meta.uncompressed_length as i32)
526 } else {
527 None
528 };
529
530 let mmap = self.mmap.read();
531
532 let compressed_data = match mmap.as_ref() {
533 Some(mmap) => {
534 let start = offset as usize;
535 let end = start + length as usize;
536
537 if end > mmap.len() {
538 return Err(SochDBError::InvalidArgument(format!(
539 "Payload offset {} + length {} exceeds file size {}",
540 offset,
541 length,
542 mmap.len()
543 )));
544 }
545
546 &mmap[start..end]
547 }
548 None => {
549 return Err(SochDBError::InvalidArgument("Payload file is empty".into()));
550 }
551 };
552
553 let data = match compression {
555 CompressionType::None => compressed_data.to_vec(),
556 CompressionType::LZ4 => {
557 lz4::block::decompress(compressed_data, uncompressed_size).map_err(|e| {
559 SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
560 })?
561 }
562 CompressionType::ZSTD => zstd::decode_all(compressed_data).map_err(|e| {
563 SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
564 })?,
565 };
566
567 Ok(data)
568 }
569
570 pub fn has_payload(&self, edge_id: u128) -> bool {
572 self.index.contains_key(edge_id)
573 }
574
575 pub fn len(&self) -> usize {
577 self.index.len()
578 }
579
580 pub fn is_empty(&self) -> bool {
582 self.index.is_empty()
583 }
584
585 pub fn save_index(&self) -> Result<()> {
587 self.index.save()
588 }
589
590 fn remap_file(&self) -> Result<()> {
592 let file = self.data_file.read();
593 let file_len = file.metadata()?.len();
594
595 if file_len > 0 {
596 let new_mmap = unsafe { MmapOptions::new().map(&*file)? };
597 *self.mmap.write() = Some(new_mmap);
598 }
599
600 Ok(())
601 }
602
603 pub fn stats(&self) -> PayloadStats {
605 let mut total_compressed: u64 = 0;
606 let mut total_uncompressed: u64 = 0;
607 let mut compression_counts = [0usize; 3];
608
609 for meta in self.index.iter_values() {
610 total_compressed += meta.length as u64;
611 total_uncompressed += meta.uncompressed_length as u64;
612 compression_counts[meta.compression as usize] += 1;
613 }
614
615 PayloadStats {
616 num_payloads: self.index.len(),
617 total_compressed_bytes: total_compressed,
618 total_uncompressed_bytes: total_uncompressed,
619 compression_ratio: if total_compressed > 0 {
620 total_uncompressed as f64 / total_compressed as f64
621 } else {
622 1.0
623 },
624 none_count: compression_counts[0],
625 lz4_count: compression_counts[1],
626 zstd_count: compression_counts[2],
627 }
628 }
629}
630
631impl Drop for PayloadStore {
632 fn drop(&mut self) {
633 let _ = self.index.save();
635 }
636}
637
638#[derive(Debug, Clone)]
639pub struct PayloadStats {
640 pub num_payloads: usize,
641 pub total_compressed_bytes: u64,
642 pub total_uncompressed_bytes: u64,
643 pub compression_ratio: f64,
644 pub none_count: usize,
645 pub lz4_count: usize,
646 pub zstd_count: usize,
647}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652 use tempfile::tempdir;
653
654 #[test]
655 fn test_payload_store_basic() {
656 let dir = tempdir().unwrap();
657 let store = PayloadStore::open(dir.path()).unwrap();
658
659 let data = b"Hello, SochDB!";
660 let (offset, length, compression) = store.append(1, data, None).unwrap();
661
662 assert_eq!(offset, 0);
663 assert_eq!(length, data.len() as u32);
664 assert_eq!(compression, CompressionType::None as u8);
665
666 let retrieved = store.get(1).unwrap().unwrap();
667 assert_eq!(retrieved, data);
668 }
669
670 #[test]
671 fn test_payload_compression() {
672 let dir = tempdir().unwrap();
673 let store = PayloadStore::open(dir.path()).unwrap();
674
675 let data: Vec<u8> = b"ABCD".repeat(1000);
677
678 let (_, length, compression) = store.append(1, &data, Some(CompressionType::LZ4)).unwrap();
679
680 assert!(length < data.len() as u32);
682 assert_eq!(compression, CompressionType::LZ4 as u8);
683
684 let retrieved = store.get(1).unwrap().unwrap();
686 assert_eq!(retrieved, data);
687 }
688
689 #[test]
690 fn test_payload_persistence() {
691 let dir = tempdir().unwrap();
692
693 let data1 = b"First payload";
694 let data2 = b"Second payload";
695
696 {
698 let store = PayloadStore::open(dir.path()).unwrap();
699 store.append(1, data1, None).unwrap();
700 store.append(2, data2, None).unwrap();
701 }
702
703 {
705 let store = PayloadStore::open(dir.path()).unwrap();
706 assert_eq!(store.get(1).unwrap().unwrap(), data1);
707 assert_eq!(store.get(2).unwrap().unwrap(), data2);
708 }
709 }
710
711 #[test]
712 fn test_payload_stats() {
713 let dir = tempdir().unwrap();
714 let store = PayloadStore::open(dir.path()).unwrap();
715
716 store.append(1, b"small", None).unwrap();
717 store
718 .append(2, &b"A".repeat(2000), Some(CompressionType::LZ4))
719 .unwrap();
720
721 let stats = store.stats();
722 assert_eq!(stats.num_payloads, 2);
723 assert!(stats.compression_ratio > 1.0);
724 }
725
726 #[test]
727 fn test_disk_hash_backend_basic() {
728 let dir = tempdir().unwrap();
729 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
730
731 let data = b"Hello from DiskHash!";
732 let (offset, length, _) = store.append(1, data, None).unwrap();
733
734 assert_eq!(offset, 0);
735 assert_eq!(length, data.len() as u32);
736
737 let retrieved = store.get(1).unwrap().unwrap();
738 assert_eq!(retrieved, data);
739 }
740
741 #[test]
742 fn test_disk_hash_backend_persistence() {
743 let dir = tempdir().unwrap();
744
745 let data1 = b"First disk hash payload";
746 let data2 = b"Second disk hash payload";
747
748 {
750 let store =
751 PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
752 store.append(1, data1, None).unwrap();
753 store.append(2, data2, None).unwrap();
754 store.save_index().unwrap();
756 }
757
758 {
760 let store =
761 PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
762 assert_eq!(store.get(1).unwrap().unwrap(), data1);
763 assert_eq!(store.get(2).unwrap().unwrap(), data2);
764 assert_eq!(store.len(), 2);
765 }
766 }
767
768 #[test]
769 fn test_disk_hash_backend_large_dataset() {
770 let dir = tempdir().unwrap();
771 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
772
773 for i in 0..1000 {
775 let data = format!("Payload {}", i);
776 store.append(i, data.as_bytes(), None).unwrap();
777 }
778
779 assert_eq!(store.len(), 1000);
781 let retrieved = store.get(500).unwrap().unwrap();
782 assert_eq!(retrieved, b"Payload 500");
783
784 let stats = store.stats();
786 assert_eq!(stats.num_payloads, 1000);
787 }
788
789 #[test]
790 fn test_backend_interoperability() {
791 let dir = tempdir().unwrap();
792
793 {
795 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::HashMap).unwrap();
796 store.append(1, b"HashMap data", None).unwrap();
797 }
798
799 {
802 let store =
803 PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
804 assert_eq!(store.len(), 0);
806
807 store.append(2, b"DiskHash data", None).unwrap();
809 assert_eq!(store.len(), 1);
810 }
811 }
812}