1use memmap2::MmapOptions;
29use parking_lot::RwLock;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write as IoWrite};
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use sochdb_core::{Result, SochDBError};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
38#[repr(u8)]
39pub enum CompressionType {
40 None = 0,
41 LZ4 = 1,
42 ZSTD = 2,
43}
44
45impl CompressionType {
46 pub fn from_u8(value: u8) -> Result<Self> {
47 match value {
48 0 => Ok(CompressionType::None),
49 1 => Ok(CompressionType::LZ4),
50 2 => Ok(CompressionType::ZSTD),
51 _ => Err(SochDBError::InvalidArgument(format!(
52 "Invalid compression type: {}",
53 value
54 ))),
55 }
56 }
57}
58
59#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
61pub struct PayloadMeta {
62 pub edge_id: u128,
63 pub offset: u64,
64 pub length: u32,
65 pub compression: CompressionType,
66 pub uncompressed_length: u32,
67}
68
69pub trait PayloadIndex: Send + Sync {
73 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()>;
74 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>>;
75 fn contains_key(&self, edge_id: u128) -> bool;
76 fn len(&self) -> usize;
77 fn is_empty(&self) -> bool;
78 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_>;
79 fn save(&self) -> Result<()>;
80}
81
82struct HashMapIndex {
84 inner: Arc<RwLock<std::collections::HashMap<u128, PayloadMeta>>>,
85 index_path: PathBuf,
86}
87
88impl HashMapIndex {
89 fn new(index_path: PathBuf) -> Result<Self> {
90 if let Some(parent) = index_path.parent() {
92 std::fs::create_dir_all(parent)?;
93 }
94
95 let inner = if index_path.exists() {
96 let loaded = Self::load_from_disk(&index_path)?;
97 tracing::info!(
98 "Loaded payload index with {} entries from {:?}",
99 loaded.len(),
100 index_path
101 );
102 Arc::new(RwLock::new(loaded))
103 } else {
104 tracing::info!(
105 "No existing payload index found at {:?}, starting fresh",
106 index_path
107 );
108 Arc::new(RwLock::new(std::collections::HashMap::new()))
109 };
110 Ok(Self { inner, index_path })
111 }
112
113 fn load_from_disk(path: &Path) -> Result<std::collections::HashMap<u128, PayloadMeta>> {
114 let file = File::open(path)?;
115 let mut reader = std::io::BufReader::new(file);
116
117 let mut magic = [0u8; 8];
119 reader.read_exact(&mut magic)?;
120 if &magic != b"CHRLPAY1" {
121 return Err(SochDBError::Corruption(
122 "Invalid payload index magic".into(),
123 ));
124 }
125
126 let mut count_bytes = [0u8; 8];
128 reader.read_exact(&mut count_bytes)?;
129 let count = u64::from_le_bytes(count_bytes);
130
131 let mut index = std::collections::HashMap::new();
132
133 for _ in 0..count {
134 let mut edge_id_bytes = [0u8; 16];
135 reader.read_exact(&mut edge_id_bytes)?;
136 let edge_id = u128::from_le_bytes(edge_id_bytes);
137
138 let mut offset_bytes = [0u8; 8];
139 reader.read_exact(&mut offset_bytes)?;
140 let offset = u64::from_le_bytes(offset_bytes);
141
142 let mut length_bytes = [0u8; 4];
143 reader.read_exact(&mut length_bytes)?;
144 let length = u32::from_le_bytes(length_bytes);
145
146 let mut compression_byte = [0u8; 1];
147 reader.read_exact(&mut compression_byte)?;
148 let compression = CompressionType::from_u8(compression_byte[0])?;
149
150 let mut uncompressed_bytes = [0u8; 4];
151 reader.read_exact(&mut uncompressed_bytes)?;
152 let uncompressed_length = u32::from_le_bytes(uncompressed_bytes);
153
154 index.insert(
155 edge_id,
156 PayloadMeta {
157 edge_id,
158 offset,
159 length,
160 compression,
161 uncompressed_length,
162 },
163 );
164 }
165
166 Ok(index)
167 }
168}
169
170impl PayloadIndex for HashMapIndex {
171 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
172 self.inner.write().insert(edge_id, meta);
173 Ok(())
174 }
175
176 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
177 Ok(self.inner.read().get(&edge_id).cloned())
178 }
179
180 fn contains_key(&self, edge_id: u128) -> bool {
181 self.inner.read().contains_key(&edge_id)
182 }
183
184 fn len(&self) -> usize {
185 self.inner.read().len()
186 }
187
188 fn is_empty(&self) -> bool {
189 self.inner.read().is_empty()
190 }
191
192 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
193 let values: Vec<_> = self.inner.read().values().cloned().collect();
194 Box::new(values.into_iter())
195 }
196
197 fn save(&self) -> Result<()> {
198 let inner = self.inner.read();
199
200 if let Some(parent) = self.index_path.parent() {
202 std::fs::create_dir_all(parent)?;
203 }
204
205 let temp_path = self.index_path.with_extension("tmp");
207 let file = File::create(&temp_path)?;
208 let mut writer = std::io::BufWriter::new(file);
209
210 writer.write_all(b"CHRLPAY1")?;
212
213 writer.write_all(&(inner.len() as u64).to_le_bytes())?;
215
216 for meta in inner.values() {
218 writer.write_all(&meta.edge_id.to_le_bytes())?;
219 writer.write_all(&meta.offset.to_le_bytes())?;
220 writer.write_all(&meta.length.to_le_bytes())?;
221 writer.write_all(&[meta.compression as u8])?;
222 writer.write_all(&meta.uncompressed_length.to_le_bytes())?;
223 }
224
225 std::io::Write::flush(&mut writer)?;
227 let file = writer
228 .into_inner()
229 .map_err(|e| SochDBError::Internal(format!("Failed to flush: {}", e)))?;
230 file.sync_all()?;
231
232 std::fs::rename(&temp_path, &self.index_path)?;
234
235 tracing::debug!(
236 entries = inner.len(),
237 path = %self.index_path.display(),
238 "Saved payload index to disk"
239 );
240
241 Ok(())
242 }
243}
244
245struct SledIndex {
247 db: sled::Db,
248}
249
250impl SledIndex {
251 fn new(index_path: PathBuf) -> Result<Self> {
252 let db = sled::open(index_path)
253 .map_err(|e| SochDBError::Internal(format!("Failed to open sled: {}", e)))?;
254 Ok(Self { db })
255 }
256}
257
258impl PayloadIndex for SledIndex {
259 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
260 let key = edge_id.to_le_bytes();
261 let value = bincode::serialize(&meta)
262 .map_err(|e| SochDBError::Corruption(format!("Serialization failed: {}", e)))?;
263 self.db
264 .insert(&key[..], value.as_slice())
265 .map_err(|e| SochDBError::Internal(format!("Sled insert failed: {}", e)))?;
266 Ok(())
267 }
268
269 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
270 let key = edge_id.to_le_bytes();
271 let value = self
272 .db
273 .get(&key[..])
274 .map_err(|e| SochDBError::Internal(format!("Sled get failed: {}", e)))?;
275 match value {
276 Some(bytes) => {
277 let meta: PayloadMeta = bincode::deserialize(&bytes).map_err(|e| {
278 SochDBError::Corruption(format!("Deserialization failed: {}", e))
279 })?;
280 Ok(Some(meta))
281 }
282 None => Ok(None),
283 }
284 }
285
286 fn contains_key(&self, edge_id: u128) -> bool {
287 let key = edge_id.to_le_bytes();
288 self.db.contains_key(&key[..]).unwrap_or(false)
289 }
290
291 fn len(&self) -> usize {
292 self.db.len()
293 }
294
295 fn is_empty(&self) -> bool {
296 self.db.is_empty()
297 }
298
299 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
300 Box::new(self.db.iter().filter_map(|result| {
301 result
302 .ok()
303 .and_then(|(_, value)| bincode::deserialize::<PayloadMeta>(&value).ok())
304 }))
305 }
306
307 fn save(&self) -> Result<()> {
308 self.db
309 .flush()
310 .map_err(|e| SochDBError::Internal(format!("Sled flush failed: {}", e)))?;
311 Ok(())
312 }
313}
314
315pub enum IndexBackend {
317 HashMap,
319 Sled,
321}
322
323pub struct PayloadStore {
353 data_file: Arc<RwLock<File>>,
354 #[allow(dead_code)]
355 data_path: PathBuf,
356 mmap: Arc<RwLock<Option<memmap2::Mmap>>>,
357 index: Arc<dyn PayloadIndex>,
358 next_offset: Arc<RwLock<u64>>,
359 memory_warning_logged: Arc<RwLock<(bool, bool, bool)>>, backend_type: IndexBackend,
363}
364
365impl PayloadStore {
366 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
368 Self::open_with_backend(data_dir, IndexBackend::HashMap)
369 }
370
371 pub fn open_with_backend<P: AsRef<Path>>(data_dir: P, backend: IndexBackend) -> Result<Self> {
373 let data_dir = data_dir.as_ref();
374 std::fs::create_dir_all(data_dir)?;
375
376 let data_path = data_dir.join("payload.data");
377 let index_path = match backend {
378 IndexBackend::HashMap => data_dir.join("payload.index"),
379 IndexBackend::Sled => data_dir.join("payload_sled"),
380 };
381
382 let data_file = OpenOptions::new()
384 .read(true)
385 .write(true)
386 .create(true)
387 .truncate(false)
388 .open(&data_path)?;
389
390 let file_len = data_file.metadata()?.len();
391
392 let index: Arc<dyn PayloadIndex> = match backend {
394 IndexBackend::HashMap => Arc::new(HashMapIndex::new(index_path)?),
395 IndexBackend::Sled => Arc::new(SledIndex::new(index_path)?),
396 };
397
398 let mmap = if file_len > 0 {
400 let mmap = unsafe { MmapOptions::new().map(&data_file)? };
401 Some(mmap)
402 } else {
403 None
404 };
405
406 Ok(Self {
407 data_file: Arc::new(RwLock::new(data_file)),
408 data_path,
409 mmap: Arc::new(RwLock::new(mmap)),
410 index,
411 next_offset: Arc::new(RwLock::new(file_len)),
412 memory_warning_logged: Arc::new(RwLock::new((false, false, false))),
413 backend_type: backend,
414 })
415 }
416
417 pub fn append(
429 &self,
430 edge_id: u128,
431 data: &[u8],
432 compression: Option<CompressionType>,
433 ) -> Result<(u64, u32, u8)> {
434 let uncompressed_len = data.len() as u32;
435
436 let compression = compression.unwrap_or(if data.len() < 1024 {
438 CompressionType::None
439 } else if data.len() < 100_000 {
440 CompressionType::LZ4
441 } else {
442 CompressionType::ZSTD
443 });
444
445 let (compressed_data, actual_compression) = match compression {
447 CompressionType::None => (data.to_vec(), CompressionType::None),
448 CompressionType::LZ4 => {
449 match lz4::block::compress(data, None, false) {
450 Ok(compressed) => {
451 if compressed.len() < data.len() {
453 (compressed, CompressionType::LZ4)
454 } else {
455 (data.to_vec(), CompressionType::None)
456 }
457 }
458 Err(_) => (data.to_vec(), CompressionType::None),
459 }
460 }
461 CompressionType::ZSTD => {
462 match zstd::encode_all(data, 3) {
463 Ok(compressed) => {
465 if compressed.len() < data.len() {
466 (compressed, CompressionType::ZSTD)
467 } else {
468 (data.to_vec(), CompressionType::None)
469 }
470 }
471 Err(_) => (data.to_vec(), CompressionType::None),
472 }
473 }
474 };
475
476 let compressed_len = compressed_data.len() as u32;
477
478 let offset = {
480 let mut file = self.data_file.write();
481 let offset = *self.next_offset.read();
482
483 file.seek(SeekFrom::End(0))?;
484 file.write_all(&compressed_data)?;
485 file.sync_all()?;
486
487 offset
488 };
489
490 *self.next_offset.write() = offset + compressed_len as u64;
492
493 self.index.insert(
495 edge_id,
496 PayloadMeta {
497 edge_id,
498 offset,
499 length: compressed_len,
500 compression: actual_compression,
501 uncompressed_length: uncompressed_len,
502 },
503 )?;
504
505 self.remap_file()?;
507
508 if matches!(self.backend_type, IndexBackend::HashMap) {
511 let count = self.index.len();
512 let mut warnings = self.memory_warning_logged.write();
513
514 if count >= 10_000_000 && !warnings.2 {
515 warnings.2 = true;
516 tracing::warn!(
517 payload_count = count,
518 estimated_ram_mb = count * 50 / 1_000_000,
519 "CRITICAL: PayloadStore has 10M+ entries (~500MB RAM). \
520 Strongly recommend switching to Sled backend to prevent OOM. \
521 Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
522 );
523 } else if count >= 5_000_000 && !warnings.1 {
524 warnings.1 = true;
525 tracing::warn!(
526 payload_count = count,
527 estimated_ram_mb = count * 50 / 1_000_000,
528 "WARNING: PayloadStore has 5M+ entries (~250MB RAM). \
529 Consider switching to Sled backend for better memory efficiency. \
530 Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
531 );
532 } else if count >= 1_000_000 && !warnings.0 {
533 warnings.0 = true;
534 tracing::info!(
535 payload_count = count,
536 estimated_ram_mb = count * 50 / 1_000_000,
537 "INFO: PayloadStore has reached 1M entries (~50MB RAM). \
538 Memory usage will grow linearly. Consider Sled backend for 10M+ scale."
539 );
540 }
541 }
542
543 Ok((offset, compressed_len, actual_compression as u8))
554 }
555
556 pub fn get(&self, edge_id: u128) -> Result<Option<Vec<u8>>> {
558 let meta = match self.index.get(edge_id)? {
559 Some(m) => m,
560 None => return Ok(None),
561 };
562
563 self.get_at_offset(meta.offset, meta.length, meta.compression)
564 .map(Some)
565 }
566
567 pub fn get_at_offset(
572 &self,
573 offset: u64,
574 length: u32,
575 compression: CompressionType,
576 ) -> Result<Vec<u8>> {
577 let uncompressed_size = if compression == CompressionType::LZ4 {
579 self.index
581 .iter_values()
582 .find(|meta| meta.offset == offset && meta.length == length)
583 .map(|meta| meta.uncompressed_length as i32)
584 } else {
585 None
586 };
587
588 let mmap = self.mmap.read();
589
590 let compressed_data = match mmap.as_ref() {
591 Some(mmap) => {
592 let start = offset as usize;
593 let end = start + length as usize;
594
595 if end > mmap.len() {
596 return Err(SochDBError::InvalidArgument(format!(
597 "Payload offset {} + length {} exceeds file size {}",
598 offset,
599 length,
600 mmap.len()
601 )));
602 }
603
604 &mmap[start..end]
605 }
606 None => {
607 return Err(SochDBError::InvalidArgument("Payload file is empty".into()));
608 }
609 };
610
611 let data = match compression {
613 CompressionType::None => compressed_data.to_vec(),
614 CompressionType::LZ4 => {
615 lz4::block::decompress(compressed_data, uncompressed_size).map_err(|e| {
617 SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
618 })?
619 }
620 CompressionType::ZSTD => zstd::decode_all(compressed_data).map_err(|e| {
621 SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
622 })?,
623 };
624
625 Ok(data)
626 }
627
628 pub fn has_payload(&self, edge_id: u128) -> bool {
630 self.index.contains_key(edge_id)
631 }
632
633 pub fn len(&self) -> usize {
635 self.index.len()
636 }
637
638 pub fn is_empty(&self) -> bool {
640 self.index.is_empty()
641 }
642
643 pub fn save_index(&self) -> Result<()> {
645 self.index.save()
646 }
647
648 fn remap_file(&self) -> Result<()> {
650 let file = self.data_file.read();
651 let file_len = file.metadata()?.len();
652
653 if file_len > 0 {
654 let new_mmap = unsafe { MmapOptions::new().map(&*file)? };
655 *self.mmap.write() = Some(new_mmap);
656 }
657
658 Ok(())
659 }
660
661 pub fn stats(&self) -> PayloadStats {
663 let mut total_compressed: u64 = 0;
664 let mut total_uncompressed: u64 = 0;
665 let mut compression_counts = [0usize; 3];
666
667 for meta in self.index.iter_values() {
668 total_compressed += meta.length as u64;
669 total_uncompressed += meta.uncompressed_length as u64;
670 compression_counts[meta.compression as usize] += 1;
671 }
672
673 PayloadStats {
674 num_payloads: self.index.len(),
675 total_compressed_bytes: total_compressed,
676 total_uncompressed_bytes: total_uncompressed,
677 compression_ratio: if total_compressed > 0 {
678 total_uncompressed as f64 / total_compressed as f64
679 } else {
680 1.0
681 },
682 none_count: compression_counts[0],
683 lz4_count: compression_counts[1],
684 zstd_count: compression_counts[2],
685 }
686 }
687}
688
689impl Drop for PayloadStore {
690 fn drop(&mut self) {
691 let _ = self.index.save();
693 }
694}
695
696#[derive(Debug, Clone)]
697pub struct PayloadStats {
698 pub num_payloads: usize,
699 pub total_compressed_bytes: u64,
700 pub total_uncompressed_bytes: u64,
701 pub compression_ratio: f64,
702 pub none_count: usize,
703 pub lz4_count: usize,
704 pub zstd_count: usize,
705}
706
707#[cfg(test)]
708mod tests {
709 use super::*;
710 use tempfile::tempdir;
711
712 #[test]
713 fn test_payload_store_basic() {
714 let dir = tempdir().unwrap();
715 let store = PayloadStore::open(dir.path()).unwrap();
716
717 let data = b"Hello, SochDB!";
718 let (offset, length, compression) = store.append(1, data, None).unwrap();
719
720 assert_eq!(offset, 0);
721 assert_eq!(length, data.len() as u32);
722 assert_eq!(compression, CompressionType::None as u8);
723
724 let retrieved = store.get(1).unwrap().unwrap();
725 assert_eq!(retrieved, data);
726 }
727
728 #[test]
729 fn test_payload_compression() {
730 let dir = tempdir().unwrap();
731 let store = PayloadStore::open(dir.path()).unwrap();
732
733 let data: Vec<u8> = b"ABCD".repeat(1000);
735
736 let (_, length, compression) = store.append(1, &data, Some(CompressionType::LZ4)).unwrap();
737
738 assert!(length < data.len() as u32);
740 assert_eq!(compression, CompressionType::LZ4 as u8);
741
742 let retrieved = store.get(1).unwrap().unwrap();
744 assert_eq!(retrieved, data);
745 }
746
747 #[test]
748 fn test_payload_persistence() {
749 let dir = tempdir().unwrap();
750
751 let data1 = b"First payload";
752 let data2 = b"Second payload";
753
754 {
756 let store = PayloadStore::open(dir.path()).unwrap();
757 store.append(1, data1, None).unwrap();
758 store.append(2, data2, None).unwrap();
759 }
760
761 {
763 let store = PayloadStore::open(dir.path()).unwrap();
764 assert_eq!(store.get(1).unwrap().unwrap(), data1);
765 assert_eq!(store.get(2).unwrap().unwrap(), data2);
766 }
767 }
768
769 #[test]
770 fn test_payload_stats() {
771 let dir = tempdir().unwrap();
772 let store = PayloadStore::open(dir.path()).unwrap();
773
774 store.append(1, b"small", None).unwrap();
775 store
776 .append(2, &b"A".repeat(2000), Some(CompressionType::LZ4))
777 .unwrap();
778
779 let stats = store.stats();
780 assert_eq!(stats.num_payloads, 2);
781 assert!(stats.compression_ratio > 1.0);
782 }
783
784 #[test]
785 fn test_sled_backend_basic() {
786 let dir = tempdir().unwrap();
787 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
788
789 let data = b"Hello from Sled!";
790 let (offset, length, _) = store.append(1, data, None).unwrap();
791
792 assert_eq!(offset, 0);
793 assert_eq!(length, data.len() as u32);
794
795 let retrieved = store.get(1).unwrap().unwrap();
796 assert_eq!(retrieved, data);
797 }
798
799 #[test]
800 fn test_sled_backend_persistence() {
801 let dir = tempdir().unwrap();
802
803 let data1 = b"First sled payload";
804 let data2 = b"Second sled payload";
805
806 {
808 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
809 store.append(1, data1, None).unwrap();
810 store.append(2, data2, None).unwrap();
811 store.save_index().unwrap();
813 }
814
815 {
817 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
818 assert_eq!(store.get(1).unwrap().unwrap(), data1);
819 assert_eq!(store.get(2).unwrap().unwrap(), data2);
820 assert_eq!(store.len(), 2);
821 }
822 }
823
824 #[test]
825 fn test_sled_backend_large_dataset() {
826 let dir = tempdir().unwrap();
827 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
828
829 for i in 0..1000 {
831 let data = format!("Payload {}", i);
832 store.append(i, data.as_bytes(), None).unwrap();
833 }
834
835 assert_eq!(store.len(), 1000);
837 let retrieved = store.get(500).unwrap().unwrap();
838 assert_eq!(retrieved, b"Payload 500");
839
840 let stats = store.stats();
842 assert_eq!(stats.num_payloads, 1000);
843 }
844
845 #[test]
846 fn test_backend_interoperability() {
847 let dir = tempdir().unwrap();
848
849 {
851 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::HashMap).unwrap();
852 store.append(1, b"HashMap data", None).unwrap();
853 }
854
855 {
858 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
859 assert_eq!(store.len(), 0);
861
862 store.append(2, b"Sled data", None).unwrap();
864 assert_eq!(store.len(), 1);
865 }
866 }
867}