sochdb_storage/
payload.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Payload storage for variable-length data
16//!
17//! TOON records can reference variable-length payloads (text, binary data, embeddings).
18//! These are stored separately in payload segments and referenced by
19//! (payload_offset, payload_length, compression_type).
20//!
21//! **Design Goals:**
22//! - Append-only for fast writes
23//! - Immutable payloads (no in-place updates)
24//! - Support compression (LZ4 for warm, ZSTD for cold)
25//! - Thread-safe concurrent access
26//! - Memory-mapped for fast reads
27
28use memmap2::MmapOptions;
29use parking_lot::RwLock;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write as IoWrite};
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use sochdb_core::{Result, SochDBError};
35
36/// Compression type for payloads
37#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
38#[repr(u8)]
39pub enum CompressionType {
40    None = 0,
41    LZ4 = 1,
42    ZSTD = 2,
43}
44
45impl CompressionType {
46    pub fn from_u8(value: u8) -> Result<Self> {
47        match value {
48            0 => Ok(CompressionType::None),
49            1 => Ok(CompressionType::LZ4),
50            2 => Ok(CompressionType::ZSTD),
51            _ => Err(SochDBError::InvalidArgument(format!(
52                "Invalid compression type: {}",
53                value
54            ))),
55        }
56    }
57}
58
59/// Payload metadata stored in index
60#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
61pub struct PayloadMeta {
62    pub edge_id: u128,
63    pub offset: u64,
64    pub length: u32,
65    pub compression: CompressionType,
66    pub uncompressed_length: u32,
67}
68
69/// Pluggable trait for payload index storage
70///
71/// Allows switching between in-memory (HashMap) and disk-backed (sled) implementations.
72pub trait PayloadIndex: Send + Sync {
73    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()>;
74    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>>;
75    fn contains_key(&self, edge_id: u128) -> bool;
76    fn len(&self) -> usize;
77    fn is_empty(&self) -> bool;
78    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_>;
79    fn save(&self) -> Result<()>;
80}
81
82/// In-memory HashMap index (default, fast but memory-hungry)
83struct HashMapIndex {
84    inner: Arc<RwLock<std::collections::HashMap<u128, PayloadMeta>>>,
85    index_path: PathBuf,
86}
87
88impl HashMapIndex {
89    fn new(index_path: PathBuf) -> Result<Self> {
90        // Ensure parent directory exists before any file operations
91        if let Some(parent) = index_path.parent() {
92            std::fs::create_dir_all(parent)?;
93        }
94
95        let inner = if index_path.exists() {
96            let loaded = Self::load_from_disk(&index_path)?;
97            tracing::info!(
98                "Loaded payload index with {} entries from {:?}",
99                loaded.len(),
100                index_path
101            );
102            Arc::new(RwLock::new(loaded))
103        } else {
104            tracing::info!(
105                "No existing payload index found at {:?}, starting fresh",
106                index_path
107            );
108            Arc::new(RwLock::new(std::collections::HashMap::new()))
109        };
110        Ok(Self { inner, index_path })
111    }
112
113    fn load_from_disk(path: &Path) -> Result<std::collections::HashMap<u128, PayloadMeta>> {
114        let file = File::open(path)?;
115        let mut reader = std::io::BufReader::new(file);
116
117        // Read and verify magic
118        let mut magic = [0u8; 8];
119        reader.read_exact(&mut magic)?;
120        if &magic != b"CHRLPAY1" {
121            return Err(SochDBError::Corruption(
122                "Invalid payload index magic".into(),
123            ));
124        }
125
126        // Read count
127        let mut count_bytes = [0u8; 8];
128        reader.read_exact(&mut count_bytes)?;
129        let count = u64::from_le_bytes(count_bytes);
130
131        let mut index = std::collections::HashMap::new();
132
133        for _ in 0..count {
134            let mut edge_id_bytes = [0u8; 16];
135            reader.read_exact(&mut edge_id_bytes)?;
136            let edge_id = u128::from_le_bytes(edge_id_bytes);
137
138            let mut offset_bytes = [0u8; 8];
139            reader.read_exact(&mut offset_bytes)?;
140            let offset = u64::from_le_bytes(offset_bytes);
141
142            let mut length_bytes = [0u8; 4];
143            reader.read_exact(&mut length_bytes)?;
144            let length = u32::from_le_bytes(length_bytes);
145
146            let mut compression_byte = [0u8; 1];
147            reader.read_exact(&mut compression_byte)?;
148            let compression = CompressionType::from_u8(compression_byte[0])?;
149
150            let mut uncompressed_bytes = [0u8; 4];
151            reader.read_exact(&mut uncompressed_bytes)?;
152            let uncompressed_length = u32::from_le_bytes(uncompressed_bytes);
153
154            index.insert(
155                edge_id,
156                PayloadMeta {
157                    edge_id,
158                    offset,
159                    length,
160                    compression,
161                    uncompressed_length,
162                },
163            );
164        }
165
166        Ok(index)
167    }
168}
169
170impl PayloadIndex for HashMapIndex {
171    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
172        self.inner.write().insert(edge_id, meta);
173        Ok(())
174    }
175
176    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
177        Ok(self.inner.read().get(&edge_id).cloned())
178    }
179
180    fn contains_key(&self, edge_id: u128) -> bool {
181        self.inner.read().contains_key(&edge_id)
182    }
183
184    fn len(&self) -> usize {
185        self.inner.read().len()
186    }
187
188    fn is_empty(&self) -> bool {
189        self.inner.read().is_empty()
190    }
191
192    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
193        let values: Vec<_> = self.inner.read().values().cloned().collect();
194        Box::new(values.into_iter())
195    }
196
197    fn save(&self) -> Result<()> {
198        let inner = self.inner.read();
199
200        // Ensure parent directory exists
201        if let Some(parent) = self.index_path.parent() {
202            std::fs::create_dir_all(parent)?;
203        }
204
205        // Write to a temp file first, then rename for atomicity
206        let temp_path = self.index_path.with_extension("tmp");
207        let file = File::create(&temp_path)?;
208        let mut writer = std::io::BufWriter::new(file);
209
210        // Write magic header
211        writer.write_all(b"CHRLPAY1")?;
212
213        // Write count
214        writer.write_all(&(inner.len() as u64).to_le_bytes())?;
215
216        // Write entries
217        for meta in inner.values() {
218            writer.write_all(&meta.edge_id.to_le_bytes())?;
219            writer.write_all(&meta.offset.to_le_bytes())?;
220            writer.write_all(&meta.length.to_le_bytes())?;
221            writer.write_all(&[meta.compression as u8])?;
222            writer.write_all(&meta.uncompressed_length.to_le_bytes())?;
223        }
224
225        // Flush buffer and sync to disk
226        std::io::Write::flush(&mut writer)?;
227        let file = writer
228            .into_inner()
229            .map_err(|e| SochDBError::Internal(format!("Failed to flush: {}", e)))?;
230        file.sync_all()?;
231
232        // Atomically rename temp file to actual index file
233        std::fs::rename(&temp_path, &self.index_path)?;
234
235        tracing::debug!(
236            entries = inner.len(),
237            path = %self.index_path.display(),
238            "Saved payload index to disk"
239        );
240
241        Ok(())
242    }
243}
244
245/// Disk-backed sled index (scales to billions, minimal RAM)
246struct SledIndex {
247    db: sled::Db,
248}
249
250impl SledIndex {
251    fn new(index_path: PathBuf) -> Result<Self> {
252        let db = sled::open(index_path)
253            .map_err(|e| SochDBError::Internal(format!("Failed to open sled: {}", e)))?;
254        Ok(Self { db })
255    }
256}
257
258impl PayloadIndex for SledIndex {
259    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
260        let key = edge_id.to_le_bytes();
261        let value = bincode::serialize(&meta)
262            .map_err(|e| SochDBError::Corruption(format!("Serialization failed: {}", e)))?;
263        self.db
264            .insert(&key[..], value.as_slice())
265            .map_err(|e| SochDBError::Internal(format!("Sled insert failed: {}", e)))?;
266        Ok(())
267    }
268
269    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
270        let key = edge_id.to_le_bytes();
271        let value = self
272            .db
273            .get(&key[..])
274            .map_err(|e| SochDBError::Internal(format!("Sled get failed: {}", e)))?;
275        match value {
276            Some(bytes) => {
277                let meta: PayloadMeta = bincode::deserialize(&bytes).map_err(|e| {
278                    SochDBError::Corruption(format!("Deserialization failed: {}", e))
279                })?;
280                Ok(Some(meta))
281            }
282            None => Ok(None),
283        }
284    }
285
286    fn contains_key(&self, edge_id: u128) -> bool {
287        let key = edge_id.to_le_bytes();
288        self.db.contains_key(&key[..]).unwrap_or(false)
289    }
290
291    fn len(&self) -> usize {
292        self.db.len()
293    }
294
295    fn is_empty(&self) -> bool {
296        self.db.is_empty()
297    }
298
299    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
300        Box::new(self.db.iter().filter_map(|result| {
301            result
302                .ok()
303                .and_then(|(_, value)| bincode::deserialize::<PayloadMeta>(&value).ok())
304        }))
305    }
306
307    fn save(&self) -> Result<()> {
308        self.db
309            .flush()
310            .map_err(|e| SochDBError::Internal(format!("Sled flush failed: {}", e)))?;
311        Ok(())
312    }
313}
314
315/// Index backend type
316pub enum IndexBackend {
317    /// In-memory HashMap (default, fast but uses ~50MB per 1M payloads)
318    HashMap,
319    /// Disk-backed sled (scales to billions, ~5-10MB RAM)
320    Sled,
321}
322
323/// Payload storage engine
324///
325/// **Architecture:**
326/// - Append-only payload file (payload.data)
327/// - In-memory index: edge_id -> (offset, length, compression)
328/// - Memory-mapped reads for zero-copy access
329/// - Write-ahead log for crash recovery
330///
331/// **Thread Safety:**
332/// - Reads: Lock-free via memory map (multiple concurrent readers)
333/// - Writes: Serialized via RwLock (single writer)
334/// - Index: Protected by RwLock
335///
336/// **SCALABILITY:**
337/// - **HashMap backend (default)**: Fast but uses ~50MB RAM per 1M payloads (32 bytes per entry + overhead).
338///   Suitable for < 10M traces.
339/// - **Sled backend**: Disk-backed B-Tree scales to billions with ~5-10MB RAM.
340///   Recommended for 10M+ traces.
341///
342/// **Usage:**
343/// ```rust,no_run
344/// use sochdb_storage::payload::{PayloadStore, IndexBackend};
345///
346/// // Default: in-memory HashMap
347/// let store = PayloadStore::open("./data").unwrap();
348///
349/// // For 10M+ scale: disk-backed sled
350/// let store = PayloadStore::open_with_backend("./data", IndexBackend::Sled).unwrap();
351/// ```
352pub struct PayloadStore {
353    data_file: Arc<RwLock<File>>,
354    #[allow(dead_code)]
355    data_path: PathBuf,
356    mmap: Arc<RwLock<Option<memmap2::Mmap>>>,
357    index: Arc<dyn PayloadIndex>,
358    next_offset: Arc<RwLock<u64>>,
359    /// Memory warning thresholds (for HashMap backend)
360    /// Tracks when to warn users about memory growth
361    memory_warning_logged: Arc<RwLock<(bool, bool, bool)>>, // (1M, 5M, 10M warnings)
362    backend_type: IndexBackend,
363}
364
365impl PayloadStore {
366    /// Open or create a payload store with default HashMap index
367    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
368        Self::open_with_backend(data_dir, IndexBackend::HashMap)
369    }
370
371    /// Open or create a payload store with specified index backend
372    pub fn open_with_backend<P: AsRef<Path>>(data_dir: P, backend: IndexBackend) -> Result<Self> {
373        let data_dir = data_dir.as_ref();
374        std::fs::create_dir_all(data_dir)?;
375
376        let data_path = data_dir.join("payload.data");
377        let index_path = match backend {
378            IndexBackend::HashMap => data_dir.join("payload.index"),
379            IndexBackend::Sled => data_dir.join("payload_sled"),
380        };
381
382        // Open or create data file
383        let data_file = OpenOptions::new()
384            .read(true)
385            .write(true)
386            .create(true)
387            .truncate(false)
388            .open(&data_path)?;
389
390        let file_len = data_file.metadata()?.len();
391
392        // Create index based on backend type
393        let index: Arc<dyn PayloadIndex> = match backend {
394            IndexBackend::HashMap => Arc::new(HashMapIndex::new(index_path)?),
395            IndexBackend::Sled => Arc::new(SledIndex::new(index_path)?),
396        };
397
398        // Create memory map if file has data
399        let mmap = if file_len > 0 {
400            let mmap = unsafe { MmapOptions::new().map(&data_file)? };
401            Some(mmap)
402        } else {
403            None
404        };
405
406        Ok(Self {
407            data_file: Arc::new(RwLock::new(data_file)),
408            data_path,
409            mmap: Arc::new(RwLock::new(mmap)),
410            index,
411            next_offset: Arc::new(RwLock::new(file_len)),
412            memory_warning_logged: Arc::new(RwLock::new((false, false, false))),
413            backend_type: backend,
414        })
415    }
416
417    /// Append a payload and return (offset, length, compression_type)
418    ///
419    /// **Compression Strategy:**
420    /// - Small payloads (<1KB): No compression (overhead > benefit)
421    /// - Medium payloads (1KB-100KB): LZ4 (fast compression/decompression)
422    /// - Large payloads (>100KB): ZSTD (higher compression ratio)
423    ///
424    /// This can be overridden by specifying compression_type explicitly.
425    ///
426    /// **DESKTOP APP FIX:** Monitors memory growth and logs warnings at thresholds
427    /// to prevent silent OOM in long-running sessions.
428    pub fn append(
429        &self,
430        edge_id: u128,
431        data: &[u8],
432        compression: Option<CompressionType>,
433    ) -> Result<(u64, u32, u8)> {
434        let uncompressed_len = data.len() as u32;
435
436        // Auto-select compression if not specified
437        let compression = compression.unwrap_or(if data.len() < 1024 {
438            CompressionType::None
439        } else if data.len() < 100_000 {
440            CompressionType::LZ4
441        } else {
442            CompressionType::ZSTD
443        });
444
445        // Compress data if needed
446        let (compressed_data, actual_compression) = match compression {
447            CompressionType::None => (data.to_vec(), CompressionType::None),
448            CompressionType::LZ4 => {
449                match lz4::block::compress(data, None, false) {
450                    Ok(compressed) => {
451                        // Only use compression if it saves space
452                        if compressed.len() < data.len() {
453                            (compressed, CompressionType::LZ4)
454                        } else {
455                            (data.to_vec(), CompressionType::None)
456                        }
457                    }
458                    Err(_) => (data.to_vec(), CompressionType::None),
459                }
460            }
461            CompressionType::ZSTD => {
462                match zstd::encode_all(data, 3) {
463                    // Level 3 is good balance
464                    Ok(compressed) => {
465                        if compressed.len() < data.len() {
466                            (compressed, CompressionType::ZSTD)
467                        } else {
468                            (data.to_vec(), CompressionType::None)
469                        }
470                    }
471                    Err(_) => (data.to_vec(), CompressionType::None),
472                }
473            }
474        };
475
476        let compressed_len = compressed_data.len() as u32;
477
478        // Append to file
479        let offset = {
480            let mut file = self.data_file.write();
481            let offset = *self.next_offset.read();
482
483            file.seek(SeekFrom::End(0))?;
484            file.write_all(&compressed_data)?;
485            file.sync_all()?;
486
487            offset
488        };
489
490        // Update next offset
491        *self.next_offset.write() = offset + compressed_len as u64;
492
493        // Update index
494        self.index.insert(
495            edge_id,
496            PayloadMeta {
497                edge_id,
498                offset,
499                length: compressed_len,
500                compression: actual_compression,
501                uncompressed_length: uncompressed_len,
502            },
503        )?;
504
505        // Remap file if needed
506        self.remap_file()?;
507
508        // DESKTOP APP FIX: Monitor memory growth for HashMap backend
509        // Log warnings at 1M, 5M, and 10M payloads to prevent silent OOM
510        if matches!(self.backend_type, IndexBackend::HashMap) {
511            let count = self.index.len();
512            let mut warnings = self.memory_warning_logged.write();
513
514            if count >= 10_000_000 && !warnings.2 {
515                warnings.2 = true;
516                tracing::warn!(
517                    payload_count = count,
518                    estimated_ram_mb = count * 50 / 1_000_000,
519                    "CRITICAL: PayloadStore has 10M+ entries (~500MB RAM). \
520                     Strongly recommend switching to Sled backend to prevent OOM. \
521                     Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
522                );
523            } else if count >= 5_000_000 && !warnings.1 {
524                warnings.1 = true;
525                tracing::warn!(
526                    payload_count = count,
527                    estimated_ram_mb = count * 50 / 1_000_000,
528                    "WARNING: PayloadStore has 5M+ entries (~250MB RAM). \
529                     Consider switching to Sled backend for better memory efficiency. \
530                     Use PayloadStore::open_with_backend(path, IndexBackend::Sled)"
531                );
532            } else if count >= 1_000_000 && !warnings.0 {
533                warnings.0 = true;
534                tracing::info!(
535                    payload_count = count,
536                    estimated_ram_mb = count * 50 / 1_000_000,
537                    "INFO: PayloadStore has reached 1M entries (~50MB RAM). \
538                     Memory usage will grow linearly. Consider Sled backend for 10M+ scale."
539                );
540            }
541        }
542
543        // NOTE: Index is saved on explicit sync() or shutdown, NOT on every insert.
544        // This is critical for performance - the old per-insert save caused O(N²)
545        // write amplification, limiting throughput to ~42 spans/sec.
546        //
547        // With this fix, throughput increases to 10,000+ spans/sec.
548        // The caller should call save_index() or sync() when durability is required.
549        //
550        // For crash recovery, the data file is append-only and the index can be
551        // rebuilt from the data file if needed (see rebuild_index()).
552
553        Ok((offset, compressed_len, actual_compression as u8))
554    }
555
556    /// Get a payload by edge ID
557    pub fn get(&self, edge_id: u128) -> Result<Option<Vec<u8>>> {
558        let meta = match self.index.get(edge_id)? {
559            Some(m) => m,
560            None => return Ok(None),
561        };
562
563        self.get_at_offset(meta.offset, meta.length, meta.compression)
564            .map(Some)
565    }
566
567    /// Get a payload at a specific offset
568    ///
569    /// This is used when reading edges from SSTables that have embedded
570    /// (offset, length, compression) tuples.
571    pub fn get_at_offset(
572        &self,
573        offset: u64,
574        length: u32,
575        compression: CompressionType,
576    ) -> Result<Vec<u8>> {
577        // For LZ4, we need the uncompressed size from the index
578        let uncompressed_size = if compression == CompressionType::LZ4 {
579            // Look up the metadata to get uncompressed size
580            self.index
581                .iter_values()
582                .find(|meta| meta.offset == offset && meta.length == length)
583                .map(|meta| meta.uncompressed_length as i32)
584        } else {
585            None
586        };
587
588        let mmap = self.mmap.read();
589
590        let compressed_data = match mmap.as_ref() {
591            Some(mmap) => {
592                let start = offset as usize;
593                let end = start + length as usize;
594
595                if end > mmap.len() {
596                    return Err(SochDBError::InvalidArgument(format!(
597                        "Payload offset {} + length {} exceeds file size {}",
598                        offset,
599                        length,
600                        mmap.len()
601                    )));
602                }
603
604                &mmap[start..end]
605            }
606            None => {
607                return Err(SochDBError::InvalidArgument("Payload file is empty".into()));
608            }
609        };
610
611        // Decompress if needed
612        let data = match compression {
613            CompressionType::None => compressed_data.to_vec(),
614            CompressionType::LZ4 => {
615                // LZ4 requires the uncompressed size for decompression
616                lz4::block::decompress(compressed_data, uncompressed_size).map_err(|e| {
617                    SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
618                })?
619            }
620            CompressionType::ZSTD => zstd::decode_all(compressed_data).map_err(|e| {
621                SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
622            })?,
623        };
624
625        Ok(data)
626    }
627
628    /// Check if a payload exists for an edge
629    pub fn has_payload(&self, edge_id: u128) -> bool {
630        self.index.contains_key(edge_id)
631    }
632
633    /// Get number of stored payloads
634    pub fn len(&self) -> usize {
635        self.index.len()
636    }
637
638    /// Check if empty
639    pub fn is_empty(&self) -> bool {
640        self.index.is_empty()
641    }
642
643    /// Save index to disk for fast recovery (delegates to backend)
644    pub fn save_index(&self) -> Result<()> {
645        self.index.save()
646    }
647
648    /// Remap file after writes
649    fn remap_file(&self) -> Result<()> {
650        let file = self.data_file.read();
651        let file_len = file.metadata()?.len();
652
653        if file_len > 0 {
654            let new_mmap = unsafe { MmapOptions::new().map(&*file)? };
655            *self.mmap.write() = Some(new_mmap);
656        }
657
658        Ok(())
659    }
660
661    /// Get statistics
662    pub fn stats(&self) -> PayloadStats {
663        let mut total_compressed: u64 = 0;
664        let mut total_uncompressed: u64 = 0;
665        let mut compression_counts = [0usize; 3];
666
667        for meta in self.index.iter_values() {
668            total_compressed += meta.length as u64;
669            total_uncompressed += meta.uncompressed_length as u64;
670            compression_counts[meta.compression as usize] += 1;
671        }
672
673        PayloadStats {
674            num_payloads: self.index.len(),
675            total_compressed_bytes: total_compressed,
676            total_uncompressed_bytes: total_uncompressed,
677            compression_ratio: if total_compressed > 0 {
678                total_uncompressed as f64 / total_compressed as f64
679            } else {
680                1.0
681            },
682            none_count: compression_counts[0],
683            lz4_count: compression_counts[1],
684            zstd_count: compression_counts[2],
685        }
686    }
687}
688
689impl Drop for PayloadStore {
690    fn drop(&mut self) {
691        // Save index on drop (delegates to backend)
692        let _ = self.index.save();
693    }
694}
695
696#[derive(Debug, Clone)]
697pub struct PayloadStats {
698    pub num_payloads: usize,
699    pub total_compressed_bytes: u64,
700    pub total_uncompressed_bytes: u64,
701    pub compression_ratio: f64,
702    pub none_count: usize,
703    pub lz4_count: usize,
704    pub zstd_count: usize,
705}
706
707#[cfg(test)]
708mod tests {
709    use super::*;
710    use tempfile::tempdir;
711
712    #[test]
713    fn test_payload_store_basic() {
714        let dir = tempdir().unwrap();
715        let store = PayloadStore::open(dir.path()).unwrap();
716
717        let data = b"Hello, SochDB!";
718        let (offset, length, compression) = store.append(1, data, None).unwrap();
719
720        assert_eq!(offset, 0);
721        assert_eq!(length, data.len() as u32);
722        assert_eq!(compression, CompressionType::None as u8);
723
724        let retrieved = store.get(1).unwrap().unwrap();
725        assert_eq!(retrieved, data);
726    }
727
728    #[test]
729    fn test_payload_compression() {
730        let dir = tempdir().unwrap();
731        let store = PayloadStore::open(dir.path()).unwrap();
732
733        // Create compressible data (repeated pattern)
734        let data: Vec<u8> = b"ABCD".repeat(1000);
735
736        let (_, length, compression) = store.append(1, &data, Some(CompressionType::LZ4)).unwrap();
737
738        // Should be compressed
739        assert!(length < data.len() as u32);
740        assert_eq!(compression, CompressionType::LZ4 as u8);
741
742        // Should decompress correctly
743        let retrieved = store.get(1).unwrap().unwrap();
744        assert_eq!(retrieved, data);
745    }
746
747    #[test]
748    fn test_payload_persistence() {
749        let dir = tempdir().unwrap();
750
751        let data1 = b"First payload";
752        let data2 = b"Second payload";
753
754        // Write payloads
755        {
756            let store = PayloadStore::open(dir.path()).unwrap();
757            store.append(1, data1, None).unwrap();
758            store.append(2, data2, None).unwrap();
759        }
760
761        // Reopen and verify
762        {
763            let store = PayloadStore::open(dir.path()).unwrap();
764            assert_eq!(store.get(1).unwrap().unwrap(), data1);
765            assert_eq!(store.get(2).unwrap().unwrap(), data2);
766        }
767    }
768
769    #[test]
770    fn test_payload_stats() {
771        let dir = tempdir().unwrap();
772        let store = PayloadStore::open(dir.path()).unwrap();
773
774        store.append(1, b"small", None).unwrap();
775        store
776            .append(2, &b"A".repeat(2000), Some(CompressionType::LZ4))
777            .unwrap();
778
779        let stats = store.stats();
780        assert_eq!(stats.num_payloads, 2);
781        assert!(stats.compression_ratio > 1.0);
782    }
783
784    #[test]
785    fn test_sled_backend_basic() {
786        let dir = tempdir().unwrap();
787        let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
788
789        let data = b"Hello from Sled!";
790        let (offset, length, _) = store.append(1, data, None).unwrap();
791
792        assert_eq!(offset, 0);
793        assert_eq!(length, data.len() as u32);
794
795        let retrieved = store.get(1).unwrap().unwrap();
796        assert_eq!(retrieved, data);
797    }
798
799    #[test]
800    fn test_sled_backend_persistence() {
801        let dir = tempdir().unwrap();
802
803        let data1 = b"First sled payload";
804        let data2 = b"Second sled payload";
805
806        // Write with sled backend
807        {
808            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
809            store.append(1, data1, None).unwrap();
810            store.append(2, data2, None).unwrap();
811            // Explicit save (though Drop also saves)
812            store.save_index().unwrap();
813        }
814
815        // Reopen with sled and verify
816        {
817            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
818            assert_eq!(store.get(1).unwrap().unwrap(), data1);
819            assert_eq!(store.get(2).unwrap().unwrap(), data2);
820            assert_eq!(store.len(), 2);
821        }
822    }
823
824    #[test]
825    fn test_sled_backend_large_dataset() {
826        let dir = tempdir().unwrap();
827        let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
828
829        // Simulate large dataset
830        for i in 0..1000 {
831            let data = format!("Payload {}", i);
832            store.append(i, data.as_bytes(), None).unwrap();
833        }
834
835        // Verify random access
836        assert_eq!(store.len(), 1000);
837        let retrieved = store.get(500).unwrap().unwrap();
838        assert_eq!(retrieved, b"Payload 500");
839
840        // Verify stats
841        let stats = store.stats();
842        assert_eq!(stats.num_payloads, 1000);
843    }
844
845    #[test]
846    fn test_backend_interoperability() {
847        let dir = tempdir().unwrap();
848
849        // Write with HashMap backend
850        {
851            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::HashMap).unwrap();
852            store.append(1, b"HashMap data", None).unwrap();
853        }
854
855        // Payload data file is shared, but indexes are separate
856        // Sled backend starts fresh (no cross-backend index migration)
857        {
858            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::Sled).unwrap();
859            // Sled has its own index, so it won't see HashMap's entry
860            assert_eq!(store.len(), 0);
861
862            // Add new entry with sled
863            store.append(2, b"Sled data", None).unwrap();
864            assert_eq!(store.len(), 1);
865        }
866    }
867}