Skip to main content

sochdb_storage/
payload.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Payload storage for variable-length data
19//!
20//! TOON records can reference variable-length payloads (text, binary data, embeddings).
21//! These are stored separately in payload segments and referenced by
22//! (payload_offset, payload_length, compression_type).
23//!
24//! **Design Goals:**
25//! - Append-only for fast writes
26//! - Immutable payloads (no in-place updates)
27//! - Support compression (LZ4 for warm, ZSTD for cold)
28//! - Thread-safe concurrent access
29//! - Memory-mapped for fast reads
30
31use memmap2::MmapOptions;
32use parking_lot::RwLock;
33use std::fs::{File, OpenOptions};
34use std::io::{Read, Seek, SeekFrom, Write as IoWrite};
35use std::path::{Path, PathBuf};
36use std::sync::Arc;
37use sochdb_core::{Result, SochDBError};
38
39/// Compression type for payloads
40#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
41#[repr(u8)]
42pub enum CompressionType {
43    None = 0,
44    LZ4 = 1,
45    ZSTD = 2,
46}
47
48impl CompressionType {
49    pub fn from_u8(value: u8) -> Result<Self> {
50        match value {
51            0 => Ok(CompressionType::None),
52            1 => Ok(CompressionType::LZ4),
53            2 => Ok(CompressionType::ZSTD),
54            _ => Err(SochDBError::InvalidArgument(format!(
55                "Invalid compression type: {}",
56                value
57            ))),
58        }
59    }
60}
61
62/// Payload metadata stored in index
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct PayloadMeta {
65    pub edge_id: u128,
66    pub offset: u64,
67    pub length: u32,
68    pub compression: CompressionType,
69    pub uncompressed_length: u32,
70}
71
72/// Pluggable trait for payload index storage
73///
74/// Allows switching between in-memory (HashMap) and disk-backed (sled) implementations.
75pub trait PayloadIndex: Send + Sync {
76    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()>;
77    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>>;
78    fn contains_key(&self, edge_id: u128) -> bool;
79    fn len(&self) -> usize;
80    fn is_empty(&self) -> bool;
81    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_>;
82    fn save(&self) -> Result<()>;
83}
84
85/// In-memory HashMap index (default, fast but memory-hungry)
86struct HashMapIndex {
87    inner: Arc<RwLock<std::collections::HashMap<u128, PayloadMeta>>>,
88    index_path: PathBuf,
89}
90
91impl HashMapIndex {
92    fn new(index_path: PathBuf) -> Result<Self> {
93        // Ensure parent directory exists before any file operations
94        if let Some(parent) = index_path.parent() {
95            std::fs::create_dir_all(parent)?;
96        }
97
98        let inner = if index_path.exists() {
99            let loaded = Self::load_from_disk(&index_path)?;
100            tracing::info!(
101                "Loaded payload index with {} entries from {:?}",
102                loaded.len(),
103                index_path
104            );
105            Arc::new(RwLock::new(loaded))
106        } else {
107            tracing::info!(
108                "No existing payload index found at {:?}, starting fresh",
109                index_path
110            );
111            Arc::new(RwLock::new(std::collections::HashMap::new()))
112        };
113        Ok(Self { inner, index_path })
114    }
115
116    fn load_from_disk(path: &Path) -> Result<std::collections::HashMap<u128, PayloadMeta>> {
117        let file = File::open(path)?;
118        let mut reader = std::io::BufReader::new(file);
119
120        // Read and verify magic
121        let mut magic = [0u8; 8];
122        reader.read_exact(&mut magic)?;
123        if &magic != b"CHRLPAY1" {
124            return Err(SochDBError::Corruption(
125                "Invalid payload index magic".into(),
126            ));
127        }
128
129        // Read count
130        let mut count_bytes = [0u8; 8];
131        reader.read_exact(&mut count_bytes)?;
132        let count = u64::from_le_bytes(count_bytes);
133
134        let mut index = std::collections::HashMap::new();
135
136        for _ in 0..count {
137            let mut edge_id_bytes = [0u8; 16];
138            reader.read_exact(&mut edge_id_bytes)?;
139            let edge_id = u128::from_le_bytes(edge_id_bytes);
140
141            let mut offset_bytes = [0u8; 8];
142            reader.read_exact(&mut offset_bytes)?;
143            let offset = u64::from_le_bytes(offset_bytes);
144
145            let mut length_bytes = [0u8; 4];
146            reader.read_exact(&mut length_bytes)?;
147            let length = u32::from_le_bytes(length_bytes);
148
149            let mut compression_byte = [0u8; 1];
150            reader.read_exact(&mut compression_byte)?;
151            let compression = CompressionType::from_u8(compression_byte[0])?;
152
153            let mut uncompressed_bytes = [0u8; 4];
154            reader.read_exact(&mut uncompressed_bytes)?;
155            let uncompressed_length = u32::from_le_bytes(uncompressed_bytes);
156
157            index.insert(
158                edge_id,
159                PayloadMeta {
160                    edge_id,
161                    offset,
162                    length,
163                    compression,
164                    uncompressed_length,
165                },
166            );
167        }
168
169        Ok(index)
170    }
171}
172
173impl PayloadIndex for HashMapIndex {
174    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
175        self.inner.write().insert(edge_id, meta);
176        Ok(())
177    }
178
179    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
180        Ok(self.inner.read().get(&edge_id).cloned())
181    }
182
183    fn contains_key(&self, edge_id: u128) -> bool {
184        self.inner.read().contains_key(&edge_id)
185    }
186
187    fn len(&self) -> usize {
188        self.inner.read().len()
189    }
190
191    fn is_empty(&self) -> bool {
192        self.inner.read().is_empty()
193    }
194
195    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
196        let values: Vec<_> = self.inner.read().values().cloned().collect();
197        Box::new(values.into_iter())
198    }
199
200    fn save(&self) -> Result<()> {
201        let inner = self.inner.read();
202
203        // Ensure parent directory exists
204        if let Some(parent) = self.index_path.parent() {
205            std::fs::create_dir_all(parent)?;
206        }
207
208        // Write to a temp file first, then rename for atomicity
209        let temp_path = self.index_path.with_extension("tmp");
210        let file = File::create(&temp_path)?;
211        let mut writer = std::io::BufWriter::new(file);
212
213        // Write magic header
214        writer.write_all(b"CHRLPAY1")?;
215
216        // Write count
217        writer.write_all(&(inner.len() as u64).to_le_bytes())?;
218
219        // Write entries
220        for meta in inner.values() {
221            writer.write_all(&meta.edge_id.to_le_bytes())?;
222            writer.write_all(&meta.offset.to_le_bytes())?;
223            writer.write_all(&meta.length.to_le_bytes())?;
224            writer.write_all(&[meta.compression as u8])?;
225            writer.write_all(&meta.uncompressed_length.to_le_bytes())?;
226        }
227
228        // Flush buffer and sync to disk
229        std::io::Write::flush(&mut writer)?;
230        let file = writer
231            .into_inner()
232            .map_err(|e| SochDBError::Internal(format!("Failed to flush: {}", e)))?;
233        file.sync_all()?;
234
235        // Atomically rename temp file to actual index file
236        std::fs::rename(&temp_path, &self.index_path)?;
237
238        tracing::debug!(
239            entries = inner.len(),
240            path = %self.index_path.display(),
241            "Saved payload index to disk"
242        );
243
244        Ok(())
245    }
246}
247
248/// Disk-backed sled index (scales to billions, minimal RAM)
249struct SledIndex {
250    db: sled::Db,
251}
252
253impl SledIndex {
254    fn new(index_path: PathBuf) -> Result<Self> {
255        let db = sled::open(index_path)
256            .map_err(|e| SochDBError::Internal(format!("Failed to open sled: {}", e)))?;
257        Ok(Self { db })
258    }
259}
260
261impl PayloadIndex for SledIndex {
262    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
263        let key = edge_id.to_le_bytes();
264        let value = bincode::serialize(&meta)
265            .map_err(|e| SochDBError::Corruption(format!("Serialization failed: {}", e)))?;
266        self.db
267            .insert(&key[..], value.as_slice())
268            .map_err(|e| SochDBError::Internal(format!("Sled insert failed: {}", e)))?;
269        Ok(())
270    }
271
272    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
273        let key = edge_id.to_le_bytes();
274        let value = self
275            .db
276            .get(&key[..])
277            .map_err(|e| SochDBError::Internal(format!("Sled get failed: {}", e)))?;
278        match value {
279            Some(bytes) => {
280                let meta: PayloadMeta = bincode::deserialize(&bytes).map_err(|e| {
281                    SochDBError::Corruption(format!("Deserialization failed: {}", e))
282                })?;
283                Ok(Some(meta))
284            }
285            None => Ok(None),
286        }
287    }
288
289    fn contains_key(&self, edge_id: u128) -> bool {
290        let key = edge_id.to_le_bytes();
291        self.db.contains_key(&key[..]).unwrap_or(false)
292    }
293
294    fn len(&self) -> usize {
295        self.db.len()
296    }
297
298    fn is_empty(&self) -> bool {
299        self.db.is_empty()
300    }
301
302    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
303        Box::new(self.db.iter().filter_map(|result| {
304            result
305                .ok()
306                .and_then(|(_, value)| bincode::deserialize::<PayloadMeta>(&value).ok())
307        }))
308    }
309
310    fn save(&self) -> Result<()> {
311        self.db
312            .flush()
313            .map_err(|e| SochDBError::Internal(format!("Sled flush failed: {}", e)))?;
314        Ok(())
315    }
316}
317
318/// Index backend type
319pub enum IndexBackend {
320    /// In-memory HashMap (default, fast but uses ~50MB per 1M payloads)
321    HashMap,
322    /// Disk-backed sled (scales to billions, ~5-10MB RAM)
323    Sled,
324}
325
326/// Payload storage engine
327///
328/// **Architecture:**
329/// - Append-only payload file (payload.data)
330/// - In-memory index: edge_id -> (offset, length, compression)
331/// - Memory-mapped reads for zero-copy access
332/// - Write-ahead log for crash recovery
333///
334/// **Thread Safety:**
335/// - Reads: Lock-free via memory map (multiple concurrent readers)
336/// - Writes: Serialized via RwLock (single writer)
337/// - Index: Protected by RwLock
338///
339/// **SCALABILITY:**
340/// - **HashMap backend (default)**: Fast but uses ~50MB RAM per 1M payloads (32 bytes per entry + overhead).
341///   Suitable for < 10M traces.
342/// - **Sled backend**: Disk-backed B-Tree scales to billions with ~5-10MB RAM.
343///   Recommended for 10M+ traces.
344///
345/// **Usage:**
346/// ```rust,no_run
347/// use sochdb_storage::payload::{PayloadStore, IndexBackend};
348///
349/// // Default: in-memory HashMap
350/// let store = PayloadStore::open("./data").unwrap();
351///
352/// // For 10M+ scale: disk-backed sled
353/// let store = PayloadStore::open_with_backend("./data", IndexBackend::Sled).unwrap();
354/// ```
355pub struct PayloadStore {
356    data_file: Arc<RwLock<File>>,
357    #[allow(dead_code)]
358    data_path: PathBuf,
359    mmap: Arc<RwLock<Option<memmap2::Mmap>>>,
360    index: Arc<dyn PayloadIndex>,
361    next_offset: Arc<RwLock<u64>>,
362    /// Memory warning thresholds (for HashMap backend)
363    /// Tracks when to warn users about memory growth
364    memory_warning_logged: Arc<RwLock<(bool, bool, bool)>>, // (1M, 5M, 10M warnings)
365    backend_type: IndexBackend,
366}
367
368impl PayloadStore {
369    /// Open or create a payload store with default HashMap index
370    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
371        Self::open_with_backend(data_dir, IndexBackend::HashMap)
372    }
373
374    /// Open or create a payload store with specified index backend
375    pub fn open_with_backend<P: AsRef<Path>>(data_dir: P, backend: IndexBackend) -> Result<Self> {
376        let data_dir = data_dir.as_ref();
377        std::fs::create_dir_all(data_dir)?;
378
379        let data_path = data_dir.join("payload.data");
380        let index_path = match backend {
381            IndexBackend::HashMap => data_dir.join("payload.index"),
382            IndexBackend::Sled => data_dir.join("payload_sled"),
383        };
384
385        // Open or create data file
386        let data_file = OpenOptions::new()
387            .read(true)
388            .write(true)
389            .create(true)
390            .truncate(false)
391            .open(&data_path)?;
392
393        let file_len = data_file.metadata()?.len();
394
395        // Create index based on backend type
396        let index: Arc<dyn PayloadIndex> = match backend {
397            IndexBackend::HashMap => Arc::new(HashMapIndex::new(index_path)?),
398            IndexBackend::Sled => Arc::new(SledIndex::new(index_path)?),
399        };
400
401        // Create memory map if file has data
402        let mmap = if file_len > 0 {
403            let mmap = unsafe { MmapOptions::new().map(&data_file)? };
404            Some(mmap)
405        } else {
406            None
407        };
408
409        Ok(Self {
410            data_file: Arc::new(RwLock::new(data_file)),
411            data_path,
412            mmap: Arc::new(RwLock::new(mmap)),
413            index,
414            next_offset: Arc::new(RwLock::new(file_len)),
415            memory_warning_logged: Arc::new(RwLock::new((false, false, false))),
416            backend_type: backend,
417        })
418    }
419
420    /// Append a payload and return (offset, length, compression_type)
421    ///
422    /// **Compression Strategy:**
423    /// - Small payloads (<1KB): No compression (overhead > benefit)
424    /// - Medium payloads (1KB-100KB): LZ4 (fast compression/decompression)
425    /// - Large payloads (>100KB): ZSTD (higher compression ratio)
426    ///
427    /// This can be overridden by specifying compression_type explicitly.
428    ///
429    /// **DESKTOP APP FIX:** Monitors memory growth and logs warnings at thresholds
430    /// to prevent silent OOM in long-running sessions.
431    pub fn append(
432        &self,
433        edge_id: u128,
434        data: &[u8],
435        compression: Option<CompressionType>,
436    ) -> Result<(u64, u32, u8)> {
437        let uncompressed_len = data.len() as u32;
438
439        // Auto-select compression if not specified
440        let compression = compression.unwrap_or(if data.len() < 1024 {
441            CompressionType::None
442        } else if data.len() < 100_000 {
443            CompressionType::LZ4
444        } else {
445            CompressionType::ZSTD
446        });
447
448        // Compress data if needed
449        let (compressed_data, actual_compression) = match compression {
450            CompressionType::None => (data.to_vec(), CompressionType::None),
451            CompressionType::LZ4 => {
452                match lz4::block::compress(data, None, false) {
453                    Ok(compressed) => {
454                        // Only use compression if it saves space
455                        if compressed.len() < data.len() {
456                            (compressed, CompressionType::LZ4)
457                        } else {
458                            (data.to_vec(), CompressionType::None)
459                        }
460                    }
461                    Err(_) => (data.to_vec(), CompressionType::None),
462                }
463            }
464            CompressionType::ZSTD => {
465                match zstd::encode_all(data, 3) {
466                    // Level 3 is good balance
467                    Ok(compressed) => {
468                        if compressed.len() < data.len() {
469                            (compressed, CompressionType::ZSTD)
470                        } else {
471                            (data.to_vec(), CompressionType::None)
472                        }
473                    }
474                    Err(_) => (data.to_vec(), CompressionType::None),
475                }
476            }
477        };
478
479        let compressed_len = compressed_data.len() as u32;
480
481        // Append to file
482        let offset = {
483            let mut file = self.data_file.write();
484            let offset = *self.next_offset.read();
485
486            file.seek(SeekFrom::End(0))?;
487            file.write_all(&compressed_data)?;
488            file.sync_all()?;
489
490            offset
491        };
492
493        // Update next offset
494        *self.next_offset.write() = offset + compressed_len as u64;
495
496        // Update index
497        self.index.insert(
498            edge_id,
499            PayloadMeta {
500                edge_id,
501                offset,
502                length: compressed_len,
503                compression: actual_compression,
504                uncompressed_length: uncompressed_len,
505            },
506        )?;
507
508        // Remap file if needed
509        self.remap_file()?;
510
511        // DESKTOP APP FIX: Monitor memory growth for HashMap backend
512        // Log warnings at 1M, 5M, and 10M payloads to prevent silent OOM
513        if matches!(self.backend_type, IndexBackend::HashMap) {
514            let count = self.index.len();
515            let mut warnings = self.memory_warning_logged.write();
516
517            if count >= 10_000_000 && !warnings.2 {
518                warnings.2 = true;
519                tracing::warn!(
520                    payload_count = count,
521                    estimated_ram_mb = count * 50 / 1_000_000,
522                    "CRITICAL: PayloadStore has 10M+ entries (~500MB RAM). \
523                     Strongly recommend switching to Sled backend to prevent OOM. \
524                     Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
525                );
526            } else if count >= 5_000_000 && !warnings.1 {
527                warnings.1 = true;
528                tracing::warn!(
529                    payload_count = count,
530                    estimated_ram_mb = count * 50 / 1_000_000,
531                    "WARNING: PayloadStore has 5M+ entries (~250MB RAM). \
532                     Consider switching to Sled backend for better memory efficiency. \
533                     Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
534                );
535            } else if count >= 1_000_000 && !warnings.0 {
536                warnings.0 = true;
537                tracing::info!(
538                    payload_count = count,
539                    estimated_ram_mb = count * 50 / 1_000_000,
540                    "INFO: PayloadStore has reached 1M entries (~50MB RAM). \
541                     Memory usage will grow linearly. Consider Sled backend for 10M+ scale."
542                );
543            }
544        }
545
546        // NOTE: Index is saved on explicit sync() or shutdown, NOT on every insert.
547        // This is critical for performance - the old per-insert save caused O(N²)
548        // write amplification, limiting throughput to ~42 spans/sec.
549        //
550        // With this fix, throughput increases to 10,000+ spans/sec.
551        // The caller should call save_index() or sync() when durability is required.
552        //
553        // For crash recovery, the data file is append-only and the index can be
554        // rebuilt from the data file if needed (see rebuild_index()).
555
556        Ok((offset, compressed_len, actual_compression as u8))
557    }
558
559    /// Get a payload by edge ID
560    pub fn get(&self, edge_id: u128) -> Result<Option<Vec<u8>>> {
561        let meta = match self.index.get(edge_id)? {
562            Some(m) => m,
563            None => return Ok(None),
564        };
565
566        self.get_at_offset(meta.offset, meta.length, meta.compression)
567            .map(Some)
568    }
569
570    /// Get a payload at a specific offset
571    ///
572    /// This is used when reading edges from SSTables that have embedded
573    /// (offset, length, compression) tuples.
574    pub fn get_at_offset(
575        &self,
576        offset: u64,
577        length: u32,
578        compression: CompressionType,
579    ) -> Result<Vec<u8>> {
580        // For LZ4, we need the uncompressed size from the index
581        let uncompressed_size = if compression == CompressionType::LZ4 {
582            // Look up the metadata to get uncompressed size
583            self.index
584                .iter_values()
585                .find(|meta| meta.offset == offset && meta.length == length)
586                .map(|meta| meta.uncompressed_length as i32)
587        } else {
588            None
589        };
590
591        let mmap = self.mmap.read();
592
593        let compressed_data = match mmap.as_ref() {
594            Some(mmap) => {
595                let start = offset as usize;
596                let end = start + length as usize;
597
598                if end > mmap.len() {
599                    return Err(SochDBError::InvalidArgument(format!(
600                        "Payload offset {} + length {} exceeds file size {}",
601                        offset,
602                        length,
603                        mmap.len()
604                    )));
605                }
606
607                &mmap[start..end]
608            }
609            None => {
610                return Err(SochDBError::InvalidArgument("Payload file is empty".into()));
611            }
612        };
613
614        // Decompress if needed
615        let data = match compression {
616            CompressionType::None => compressed_data.to_vec(),
617            CompressionType::LZ4 => {
618                // LZ4 requires the uncompressed size for decompression
619                lz4::block::decompress(compressed_data, uncompressed_size).map_err(|e| {
620                    SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
621                })?
622            }
623            CompressionType::ZSTD => zstd::decode_all(compressed_data).map_err(|e| {
624                SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
625            })?,
626        };
627
628        Ok(data)
629    }
630
631    /// Check if a payload exists for an edge
632    pub fn has_payload(&self, edge_id: u128) -> bool {
633        self.index.contains_key(edge_id)
634    }
635
636    /// Get number of stored payloads
637    pub fn len(&self) -> usize {
638        self.index.len()
639    }
640
641    /// Check if empty
642    pub fn is_empty(&self) -> bool {
643        self.index.is_empty()
644    }
645
646    /// Save index to disk for fast recovery (delegates to backend)
647    pub fn save_index(&self) -> Result<()> {
648        self.index.save()
649    }
650
651    /// Remap file after writes
652    fn remap_file(&self) -> Result<()> {
653        let file = self.data_file.read();
654        let file_len = file.metadata()?.len();
655
656        if file_len > 0 {
657            let new_mmap = unsafe { MmapOptions::new().map(&*file)? };
658            *self.mmap.write() = Some(new_mmap);
659        }
660
661        Ok(())
662    }
663
664    /// Get statistics
665    pub fn stats(&self) -> PayloadStats {
666        let mut total_compressed: u64 = 0;
667        let mut total_uncompressed: u64 = 0;
668        let mut compression_counts = [0usize; 3];
669
670        for meta in self.index.iter_values() {
671            total_compressed += meta.length as u64;
672            total_uncompressed += meta.uncompressed_length as u64;
673            compression_counts[meta.compression as usize] += 1;
674        }
675
676        PayloadStats {
677            num_payloads: self.index.len(),
678            total_compressed_bytes: total_compressed,
679            total_uncompressed_bytes: total_uncompressed,
680            compression_ratio: if total_compressed > 0 {
681                total_uncompressed as f64 / total_compressed as f64
682            } else {
683                1.0
684            },
685            none_count: compression_counts[0],
686            lz4_count: compression_counts[1],
687            zstd_count: compression_counts[2],
688        }
689    }
690}
691
692impl Drop for PayloadStore {
693    fn drop(&mut self) {
694        // Save index on drop (delegates to backend)
695        let _ = self.index.save();
696    }
697}
698
699#[derive(Debug, Clone)]
700pub struct PayloadStats {
701    pub num_payloads: usize,
702    pub total_compressed_bytes: u64,
703    pub total_uncompressed_bytes: u64,
704    pub compression_ratio: f64,
705    pub none_count: usize,
706    pub lz4_count: usize,
707    pub zstd_count: usize,
708}
709
710#[cfg(test)]
711mod tests {
712    use super::*;
713    use tempfile::tempdir;
714
715    #[test]
716    fn test_payload_store_basic() {
717        let dir = tempdir().unwrap();
718        let store = PayloadStore::open(dir.path()).unwrap();
719
720        let data = b"Hello, SochDB!";
721        let (offset, length, compression) = store.append(1, data, None).unwrap();
722
723        assert_eq!(offset, 0);
724        assert_eq!(length, data.len() as u32);
725        assert_eq!(compression, CompressionType::None as u8);
726
727        let retrieved = store.get(1).unwrap().unwrap();
728        assert_eq!(retrieved, data);
729    }
730
731    #[test]
732    fn test_payload_compression() {
733        let dir = tempdir().unwrap();
734        let store = PayloadStore::open(dir.path()).unwrap();
735
736        // Create compressible data (repeated pattern)
737        let data: Vec<u8> = b"ABCD".repeat(1000);
738
739        let (_, length, compression) = store.append(1, &data, Some(CompressionType::LZ4)).unwrap();
740
741        // Should be compressed
742        assert!(length < data.len() as u32);
743        assert_eq!(compression, CompressionType::LZ4 as u8);
744
745        // Should decompress correctly
746        let retrieved = store.get(1).unwrap().unwrap();
747        assert_eq!(retrieved, data);
748    }
749
750    #[test]
751    fn test_payload_persistence() {
752        let dir = tempdir().unwrap();
753
754        let data1 = b"First payload";
755        let data2 = b"Second payload";
756
757        // Write payloads
758        {
759            let store = PayloadStore::open(dir.path()).unwrap();
760            store.append(1, data1, None).unwrap();
761            store.append(2, data2, None).unwrap();
762        }
763
764        // Reopen and verify
765        {
766            let store = PayloadStore::open(dir.path()).unwrap();
767            assert_eq!(store.get(1).unwrap().unwrap(), data1);
768            assert_eq!(store.get(2).unwrap().unwrap(), data2);
769        }
770    }
771
772    #[test]
773    fn test_payload_stats() {
774        let dir = tempdir().unwrap();
775        let store = PayloadStore::open(dir.path()).unwrap();
776
777        store.append(1, b"small", None).unwrap();
778        store
779            .append(2, &b"A".repeat(2000), Some(CompressionType::LZ4))
780            .unwrap();
781
782        let stats = store.stats();
783        assert_eq!(stats.num_payloads, 2);
784        assert!(stats.compression_ratio > 1.0);
785    }
786
787    #[test]
788    fn test_sled_backend_basic() {
789        let dir = tempdir().unwrap();
790        let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
791
792        let data = b"Hello from Sled!";
793        let (offset, length, _) = store.append(1, data, None).unwrap();
794
795        assert_eq!(offset, 0);
796        assert_eq!(length, data.len() as u32);
797
798        let retrieved = store.get(1).unwrap().unwrap();
799        assert_eq!(retrieved, data);
800    }
801
802    #[test]
803    fn test_sled_backend_persistence() {
804        let dir = tempdir().unwrap();
805
806        let data1 = b"First sled payload";
807        let data2 = b"Second sled payload";
808
809        // Write with sled backend
810        {
811            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
812            store.append(1, data1, None).unwrap();
813            store.append(2, data2, None).unwrap();
814            // Explicit save (though Drop also saves)
815            store.save_index().unwrap();
816        }
817
818        // Reopen with sled and verify
819        {
820            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
821            assert_eq!(store.get(1).unwrap().unwrap(), data1);
822            assert_eq!(store.get(2).unwrap().unwrap(), data2);
823            assert_eq!(store.len(), 2);
824        }
825    }
826
827    #[test]
828    fn test_sled_backend_large_dataset() {
829        let dir = tempdir().unwrap();
830        let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
831
832        // Simulate large dataset
833        for i in 0..1000 {
834            let data = format!("Payload {}", i);
835            store.append(i, data.as_bytes(), None).unwrap();
836        }
837
838        // Verify random access
839        assert_eq!(store.len(), 1000);
840        let retrieved = store.get(500).unwrap().unwrap();
841        assert_eq!(retrieved, b"Payload 500");
842
843        // Verify stats
844        let stats = store.stats();
845        assert_eq!(stats.num_payloads, 1000);
846    }
847
848    #[test]
849    fn test_backend_interoperability() {
850        let dir = tempdir().unwrap();
851
852        // Write with HashMap backend
853        {
854            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::HashMap).unwrap();
855            store.append(1, b"HashMap data", None).unwrap();
856        }
857
858        // Payload data file is shared, but indexes are separate
859        // Sled backend starts fresh (no cross-backend index migration)
860        {
861            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
862            // Sled has its own index, so it won't see HashMap's entry
863            assert_eq!(store.len(), 0);
864
865            // Add new entry with sled
866            store.append(2, b"Sled data", None).unwrap();
867            assert_eq!(store.len(), 1);
868        }
869    }
870}