1use memmap2::MmapOptions;
32use parking_lot::RwLock;
33use std::fs::{File, OpenOptions};
34use std::io::{Read, Seek, SeekFrom, Write as IoWrite};
35use std::path::{Path, PathBuf};
36use std::sync::Arc;
37use sochdb_core::{Result, SochDBError};
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
41#[repr(u8)]
42pub enum CompressionType {
43 None = 0,
44 LZ4 = 1,
45 ZSTD = 2,
46}
47
48impl CompressionType {
49 pub fn from_u8(value: u8) -> Result<Self> {
50 match value {
51 0 => Ok(CompressionType::None),
52 1 => Ok(CompressionType::LZ4),
53 2 => Ok(CompressionType::ZSTD),
54 _ => Err(SochDBError::InvalidArgument(format!(
55 "Invalid compression type: {}",
56 value
57 ))),
58 }
59 }
60}
61
62#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct PayloadMeta {
65 pub edge_id: u128,
66 pub offset: u64,
67 pub length: u32,
68 pub compression: CompressionType,
69 pub uncompressed_length: u32,
70}
71
72pub trait PayloadIndex: Send + Sync {
76 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()>;
77 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>>;
78 fn contains_key(&self, edge_id: u128) -> bool;
79 fn len(&self) -> usize;
80 fn is_empty(&self) -> bool;
81 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_>;
82 fn save(&self) -> Result<()>;
83}
84
85struct HashMapIndex {
87 inner: Arc<RwLock<std::collections::HashMap<u128, PayloadMeta>>>,
88 index_path: PathBuf,
89}
90
91impl HashMapIndex {
92 fn new(index_path: PathBuf) -> Result<Self> {
93 if let Some(parent) = index_path.parent() {
95 std::fs::create_dir_all(parent)?;
96 }
97
98 let inner = if index_path.exists() {
99 let loaded = Self::load_from_disk(&index_path)?;
100 tracing::info!(
101 "Loaded payload index with {} entries from {:?}",
102 loaded.len(),
103 index_path
104 );
105 Arc::new(RwLock::new(loaded))
106 } else {
107 tracing::info!(
108 "No existing payload index found at {:?}, starting fresh",
109 index_path
110 );
111 Arc::new(RwLock::new(std::collections::HashMap::new()))
112 };
113 Ok(Self { inner, index_path })
114 }
115
116 fn load_from_disk(path: &Path) -> Result<std::collections::HashMap<u128, PayloadMeta>> {
117 let file = File::open(path)?;
118 let mut reader = std::io::BufReader::new(file);
119
120 let mut magic = [0u8; 8];
122 reader.read_exact(&mut magic)?;
123 if &magic != b"CHRLPAY1" {
124 return Err(SochDBError::Corruption(
125 "Invalid payload index magic".into(),
126 ));
127 }
128
129 let mut count_bytes = [0u8; 8];
131 reader.read_exact(&mut count_bytes)?;
132 let count = u64::from_le_bytes(count_bytes);
133
134 let mut index = std::collections::HashMap::new();
135
136 for _ in 0..count {
137 let mut edge_id_bytes = [0u8; 16];
138 reader.read_exact(&mut edge_id_bytes)?;
139 let edge_id = u128::from_le_bytes(edge_id_bytes);
140
141 let mut offset_bytes = [0u8; 8];
142 reader.read_exact(&mut offset_bytes)?;
143 let offset = u64::from_le_bytes(offset_bytes);
144
145 let mut length_bytes = [0u8; 4];
146 reader.read_exact(&mut length_bytes)?;
147 let length = u32::from_le_bytes(length_bytes);
148
149 let mut compression_byte = [0u8; 1];
150 reader.read_exact(&mut compression_byte)?;
151 let compression = CompressionType::from_u8(compression_byte[0])?;
152
153 let mut uncompressed_bytes = [0u8; 4];
154 reader.read_exact(&mut uncompressed_bytes)?;
155 let uncompressed_length = u32::from_le_bytes(uncompressed_bytes);
156
157 index.insert(
158 edge_id,
159 PayloadMeta {
160 edge_id,
161 offset,
162 length,
163 compression,
164 uncompressed_length,
165 },
166 );
167 }
168
169 Ok(index)
170 }
171}
172
173impl PayloadIndex for HashMapIndex {
174 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
175 self.inner.write().insert(edge_id, meta);
176 Ok(())
177 }
178
179 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
180 Ok(self.inner.read().get(&edge_id).cloned())
181 }
182
183 fn contains_key(&self, edge_id: u128) -> bool {
184 self.inner.read().contains_key(&edge_id)
185 }
186
187 fn len(&self) -> usize {
188 self.inner.read().len()
189 }
190
191 fn is_empty(&self) -> bool {
192 self.inner.read().is_empty()
193 }
194
195 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
196 let values: Vec<_> = self.inner.read().values().cloned().collect();
197 Box::new(values.into_iter())
198 }
199
200 fn save(&self) -> Result<()> {
201 let inner = self.inner.read();
202
203 if let Some(parent) = self.index_path.parent() {
205 std::fs::create_dir_all(parent)?;
206 }
207
208 let temp_path = self.index_path.with_extension("tmp");
210 let file = File::create(&temp_path)?;
211 let mut writer = std::io::BufWriter::new(file);
212
213 writer.write_all(b"CHRLPAY1")?;
215
216 writer.write_all(&(inner.len() as u64).to_le_bytes())?;
218
219 for meta in inner.values() {
221 writer.write_all(&meta.edge_id.to_le_bytes())?;
222 writer.write_all(&meta.offset.to_le_bytes())?;
223 writer.write_all(&meta.length.to_le_bytes())?;
224 writer.write_all(&[meta.compression as u8])?;
225 writer.write_all(&meta.uncompressed_length.to_le_bytes())?;
226 }
227
228 std::io::Write::flush(&mut writer)?;
230 let file = writer
231 .into_inner()
232 .map_err(|e| SochDBError::Internal(format!("Failed to flush: {}", e)))?;
233 file.sync_all()?;
234
235 std::fs::rename(&temp_path, &self.index_path)?;
237
238 tracing::debug!(
239 entries = inner.len(),
240 path = %self.index_path.display(),
241 "Saved payload index to disk"
242 );
243
244 Ok(())
245 }
246}
247
248struct SledIndex {
250 db: sled::Db,
251}
252
253impl SledIndex {
254 fn new(index_path: PathBuf) -> Result<Self> {
255 let db = sled::open(index_path)
256 .map_err(|e| SochDBError::Internal(format!("Failed to open sled: {}", e)))?;
257 Ok(Self { db })
258 }
259}
260
261impl PayloadIndex for SledIndex {
262 fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
263 let key = edge_id.to_le_bytes();
264 let value = bincode::serialize(&meta)
265 .map_err(|e| SochDBError::Corruption(format!("Serialization failed: {}", e)))?;
266 self.db
267 .insert(&key[..], value.as_slice())
268 .map_err(|e| SochDBError::Internal(format!("Sled insert failed: {}", e)))?;
269 Ok(())
270 }
271
272 fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
273 let key = edge_id.to_le_bytes();
274 let value = self
275 .db
276 .get(&key[..])
277 .map_err(|e| SochDBError::Internal(format!("Sled get failed: {}", e)))?;
278 match value {
279 Some(bytes) => {
280 let meta: PayloadMeta = bincode::deserialize(&bytes).map_err(|e| {
281 SochDBError::Corruption(format!("Deserialization failed: {}", e))
282 })?;
283 Ok(Some(meta))
284 }
285 None => Ok(None),
286 }
287 }
288
289 fn contains_key(&self, edge_id: u128) -> bool {
290 let key = edge_id.to_le_bytes();
291 self.db.contains_key(&key[..]).unwrap_or(false)
292 }
293
294 fn len(&self) -> usize {
295 self.db.len()
296 }
297
298 fn is_empty(&self) -> bool {
299 self.db.is_empty()
300 }
301
302 fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
303 Box::new(self.db.iter().filter_map(|result| {
304 result
305 .ok()
306 .and_then(|(_, value)| bincode::deserialize::<PayloadMeta>(&value).ok())
307 }))
308 }
309
310 fn save(&self) -> Result<()> {
311 self.db
312 .flush()
313 .map_err(|e| SochDBError::Internal(format!("Sled flush failed: {}", e)))?;
314 Ok(())
315 }
316}
317
318pub enum IndexBackend {
320 HashMap,
322 Sled,
324}
325
326pub struct PayloadStore {
356 data_file: Arc<RwLock<File>>,
357 #[allow(dead_code)]
358 data_path: PathBuf,
359 mmap: Arc<RwLock<Option<memmap2::Mmap>>>,
360 index: Arc<dyn PayloadIndex>,
361 next_offset: Arc<RwLock<u64>>,
362 memory_warning_logged: Arc<RwLock<(bool, bool, bool)>>, backend_type: IndexBackend,
366}
367
368impl PayloadStore {
369 pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
371 Self::open_with_backend(data_dir, IndexBackend::HashMap)
372 }
373
374 pub fn open_with_backend<P: AsRef<Path>>(data_dir: P, backend: IndexBackend) -> Result<Self> {
376 let data_dir = data_dir.as_ref();
377 std::fs::create_dir_all(data_dir)?;
378
379 let data_path = data_dir.join("payload.data");
380 let index_path = match backend {
381 IndexBackend::HashMap => data_dir.join("payload.index"),
382 IndexBackend::Sled => data_dir.join("payload_sled"),
383 };
384
385 let data_file = OpenOptions::new()
387 .read(true)
388 .write(true)
389 .create(true)
390 .truncate(false)
391 .open(&data_path)?;
392
393 let file_len = data_file.metadata()?.len();
394
395 let index: Arc<dyn PayloadIndex> = match backend {
397 IndexBackend::HashMap => Arc::new(HashMapIndex::new(index_path)?),
398 IndexBackend::Sled => Arc::new(SledIndex::new(index_path)?),
399 };
400
401 let mmap = if file_len > 0 {
403 let mmap = unsafe { MmapOptions::new().map(&data_file)? };
404 Some(mmap)
405 } else {
406 None
407 };
408
409 Ok(Self {
410 data_file: Arc::new(RwLock::new(data_file)),
411 data_path,
412 mmap: Arc::new(RwLock::new(mmap)),
413 index,
414 next_offset: Arc::new(RwLock::new(file_len)),
415 memory_warning_logged: Arc::new(RwLock::new((false, false, false))),
416 backend_type: backend,
417 })
418 }
419
420 pub fn append(
432 &self,
433 edge_id: u128,
434 data: &[u8],
435 compression: Option<CompressionType>,
436 ) -> Result<(u64, u32, u8)> {
437 let uncompressed_len = data.len() as u32;
438
439 let compression = compression.unwrap_or(if data.len() < 1024 {
441 CompressionType::None
442 } else if data.len() < 100_000 {
443 CompressionType::LZ4
444 } else {
445 CompressionType::ZSTD
446 });
447
448 let (compressed_data, actual_compression) = match compression {
450 CompressionType::None => (data.to_vec(), CompressionType::None),
451 CompressionType::LZ4 => {
452 match lz4::block::compress(data, None, false) {
453 Ok(compressed) => {
454 if compressed.len() < data.len() {
456 (compressed, CompressionType::LZ4)
457 } else {
458 (data.to_vec(), CompressionType::None)
459 }
460 }
461 Err(_) => (data.to_vec(), CompressionType::None),
462 }
463 }
464 CompressionType::ZSTD => {
465 match zstd::encode_all(data, 3) {
466 Ok(compressed) => {
468 if compressed.len() < data.len() {
469 (compressed, CompressionType::ZSTD)
470 } else {
471 (data.to_vec(), CompressionType::None)
472 }
473 }
474 Err(_) => (data.to_vec(), CompressionType::None),
475 }
476 }
477 };
478
479 let compressed_len = compressed_data.len() as u32;
480
481 let offset = {
483 let mut file = self.data_file.write();
484 let offset = *self.next_offset.read();
485
486 file.seek(SeekFrom::End(0))?;
487 file.write_all(&compressed_data)?;
488 file.sync_all()?;
489
490 offset
491 };
492
493 *self.next_offset.write() = offset + compressed_len as u64;
495
496 self.index.insert(
498 edge_id,
499 PayloadMeta {
500 edge_id,
501 offset,
502 length: compressed_len,
503 compression: actual_compression,
504 uncompressed_length: uncompressed_len,
505 },
506 )?;
507
508 self.remap_file()?;
510
511 if matches!(self.backend_type, IndexBackend::HashMap) {
514 let count = self.index.len();
515 let mut warnings = self.memory_warning_logged.write();
516
517 if count >= 10_000_000 && !warnings.2 {
518 warnings.2 = true;
519 tracing::warn!(
520 payload_count = count,
521 estimated_ram_mb = count * 50 / 1_000_000,
522 "CRITICAL: PayloadStore has 10M+ entries (~500MB RAM). \
523 Strongly recommend switching to Sled backend to prevent OOM. \
524 Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
525 );
526 } else if count >= 5_000_000 && !warnings.1 {
527 warnings.1 = true;
528 tracing::warn!(
529 payload_count = count,
530 estimated_ram_mb = count * 50 / 1_000_000,
531 "WARNING: PayloadStore has 5M+ entries (~250MB RAM). \
532 Consider switching to Sled backend for better memory efficiency. \
533 Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
534 );
535 } else if count >= 1_000_000 && !warnings.0 {
536 warnings.0 = true;
537 tracing::info!(
538 payload_count = count,
539 estimated_ram_mb = count * 50 / 1_000_000,
540 "INFO: PayloadStore has reached 1M entries (~50MB RAM). \
541 Memory usage will grow linearly. Consider Sled backend for 10M+ scale."
542 );
543 }
544 }
545
546 Ok((offset, compressed_len, actual_compression as u8))
557 }
558
559 pub fn get(&self, edge_id: u128) -> Result<Option<Vec<u8>>> {
561 let meta = match self.index.get(edge_id)? {
562 Some(m) => m,
563 None => return Ok(None),
564 };
565
566 self.get_at_offset(meta.offset, meta.length, meta.compression)
567 .map(Some)
568 }
569
570 pub fn get_at_offset(
575 &self,
576 offset: u64,
577 length: u32,
578 compression: CompressionType,
579 ) -> Result<Vec<u8>> {
580 let uncompressed_size = if compression == CompressionType::LZ4 {
582 self.index
584 .iter_values()
585 .find(|meta| meta.offset == offset && meta.length == length)
586 .map(|meta| meta.uncompressed_length as i32)
587 } else {
588 None
589 };
590
591 let mmap = self.mmap.read();
592
593 let compressed_data = match mmap.as_ref() {
594 Some(mmap) => {
595 let start = offset as usize;
596 let end = start + length as usize;
597
598 if end > mmap.len() {
599 return Err(SochDBError::InvalidArgument(format!(
600 "Payload offset {} + length {} exceeds file size {}",
601 offset,
602 length,
603 mmap.len()
604 )));
605 }
606
607 &mmap[start..end]
608 }
609 None => {
610 return Err(SochDBError::InvalidArgument("Payload file is empty".into()));
611 }
612 };
613
614 let data = match compression {
616 CompressionType::None => compressed_data.to_vec(),
617 CompressionType::LZ4 => {
618 lz4::block::decompress(compressed_data, uncompressed_size).map_err(|e| {
620 SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
621 })?
622 }
623 CompressionType::ZSTD => zstd::decode_all(compressed_data).map_err(|e| {
624 SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
625 })?,
626 };
627
628 Ok(data)
629 }
630
631 pub fn has_payload(&self, edge_id: u128) -> bool {
633 self.index.contains_key(edge_id)
634 }
635
636 pub fn len(&self) -> usize {
638 self.index.len()
639 }
640
641 pub fn is_empty(&self) -> bool {
643 self.index.is_empty()
644 }
645
646 pub fn save_index(&self) -> Result<()> {
648 self.index.save()
649 }
650
651 fn remap_file(&self) -> Result<()> {
653 let file = self.data_file.read();
654 let file_len = file.metadata()?.len();
655
656 if file_len > 0 {
657 let new_mmap = unsafe { MmapOptions::new().map(&*file)? };
658 *self.mmap.write() = Some(new_mmap);
659 }
660
661 Ok(())
662 }
663
664 pub fn stats(&self) -> PayloadStats {
666 let mut total_compressed: u64 = 0;
667 let mut total_uncompressed: u64 = 0;
668 let mut compression_counts = [0usize; 3];
669
670 for meta in self.index.iter_values() {
671 total_compressed += meta.length as u64;
672 total_uncompressed += meta.uncompressed_length as u64;
673 compression_counts[meta.compression as usize] += 1;
674 }
675
676 PayloadStats {
677 num_payloads: self.index.len(),
678 total_compressed_bytes: total_compressed,
679 total_uncompressed_bytes: total_uncompressed,
680 compression_ratio: if total_compressed > 0 {
681 total_uncompressed as f64 / total_compressed as f64
682 } else {
683 1.0
684 },
685 none_count: compression_counts[0],
686 lz4_count: compression_counts[1],
687 zstd_count: compression_counts[2],
688 }
689 }
690}
691
692impl Drop for PayloadStore {
693 fn drop(&mut self) {
694 let _ = self.index.save();
696 }
697}
698
699#[derive(Debug, Clone)]
700pub struct PayloadStats {
701 pub num_payloads: usize,
702 pub total_compressed_bytes: u64,
703 pub total_uncompressed_bytes: u64,
704 pub compression_ratio: f64,
705 pub none_count: usize,
706 pub lz4_count: usize,
707 pub zstd_count: usize,
708}
709
710#[cfg(test)]
711mod tests {
712 use super::*;
713 use tempfile::tempdir;
714
715 #[test]
716 fn test_payload_store_basic() {
717 let dir = tempdir().unwrap();
718 let store = PayloadStore::open(dir.path()).unwrap();
719
720 let data = b"Hello, SochDB!";
721 let (offset, length, compression) = store.append(1, data, None).unwrap();
722
723 assert_eq!(offset, 0);
724 assert_eq!(length, data.len() as u32);
725 assert_eq!(compression, CompressionType::None as u8);
726
727 let retrieved = store.get(1).unwrap().unwrap();
728 assert_eq!(retrieved, data);
729 }
730
731 #[test]
732 fn test_payload_compression() {
733 let dir = tempdir().unwrap();
734 let store = PayloadStore::open(dir.path()).unwrap();
735
736 let data: Vec<u8> = b"ABCD".repeat(1000);
738
739 let (_, length, compression) = store.append(1, &data, Some(CompressionType::LZ4)).unwrap();
740
741 assert!(length < data.len() as u32);
743 assert_eq!(compression, CompressionType::LZ4 as u8);
744
745 let retrieved = store.get(1).unwrap().unwrap();
747 assert_eq!(retrieved, data);
748 }
749
750 #[test]
751 fn test_payload_persistence() {
752 let dir = tempdir().unwrap();
753
754 let data1 = b"First payload";
755 let data2 = b"Second payload";
756
757 {
759 let store = PayloadStore::open(dir.path()).unwrap();
760 store.append(1, data1, None).unwrap();
761 store.append(2, data2, None).unwrap();
762 }
763
764 {
766 let store = PayloadStore::open(dir.path()).unwrap();
767 assert_eq!(store.get(1).unwrap().unwrap(), data1);
768 assert_eq!(store.get(2).unwrap().unwrap(), data2);
769 }
770 }
771
772 #[test]
773 fn test_payload_stats() {
774 let dir = tempdir().unwrap();
775 let store = PayloadStore::open(dir.path()).unwrap();
776
777 store.append(1, b"small", None).unwrap();
778 store
779 .append(2, &b"A".repeat(2000), Some(CompressionType::LZ4))
780 .unwrap();
781
782 let stats = store.stats();
783 assert_eq!(stats.num_payloads, 2);
784 assert!(stats.compression_ratio > 1.0);
785 }
786
787 #[test]
788 fn test_sled_backend_basic() {
789 let dir = tempdir().unwrap();
790 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
791
792 let data = b"Hello from Sled!";
793 let (offset, length, _) = store.append(1, data, None).unwrap();
794
795 assert_eq!(offset, 0);
796 assert_eq!(length, data.len() as u32);
797
798 let retrieved = store.get(1).unwrap().unwrap();
799 assert_eq!(retrieved, data);
800 }
801
802 #[test]
803 fn test_sled_backend_persistence() {
804 let dir = tempdir().unwrap();
805
806 let data1 = b"First sled payload";
807 let data2 = b"Second sled payload";
808
809 {
811 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
812 store.append(1, data1, None).unwrap();
813 store.append(2, data2, None).unwrap();
814 store.save_index().unwrap();
816 }
817
818 {
820 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
821 assert_eq!(store.get(1).unwrap().unwrap(), data1);
822 assert_eq!(store.get(2).unwrap().unwrap(), data2);
823 assert_eq!(store.len(), 2);
824 }
825 }
826
827 #[test]
828 fn test_sled_backend_large_dataset() {
829 let dir = tempdir().unwrap();
830 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
831
832 for i in 0..1000 {
834 let data = format!("Payload {}", i);
835 store.append(i, data.as_bytes(), None).unwrap();
836 }
837
838 assert_eq!(store.len(), 1000);
840 let retrieved = store.get(500).unwrap().unwrap();
841 assert_eq!(retrieved, b"Payload 500");
842
843 let stats = store.stats();
845 assert_eq!(stats.num_payloads, 1000);
846 }
847
848 #[test]
849 fn test_backend_interoperability() {
850 let dir = tempdir().unwrap();
851
852 {
854 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::HashMap).unwrap();
855 store.append(1, b"HashMap data", None).unwrap();
856 }
857
858 {
861 let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
862 assert_eq!(store.len(), 0);
864
865 store.append(2, b"Sled data", None).unwrap();
867 assert_eq!(store.len(), 1);
868 }
869 }
870}