Skip to main content

sochdb_storage/payload/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Payload storage for variable-length data
19//!
20//! TOON records can reference variable-length payloads (text, binary data, embeddings).
21//! These are stored separately in payload segments and referenced by
22//! (payload_offset, payload_length, compression_type).
23//!
24//! **Design Goals:**
25//! - Append-only for fast writes
26//! - Immutable payloads (no in-place updates)
27//! - Support compression (LZ4 for warm, ZSTD for cold)
28//! - Thread-safe concurrent access
29//! - Memory-mapped for fast reads
30
31mod disk_hash_index;
32
33use memmap2::MmapOptions;
34use parking_lot::RwLock;
35use sochdb_core::{Result, SochDBError};
36use std::fs::{File, OpenOptions};
37use std::io::{Read, Seek, SeekFrom, Write as IoWrite};
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40
41use disk_hash_index::DiskHashIndex;
42
43/// Compression type for payloads
44#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
45#[repr(u8)]
46pub enum CompressionType {
47    None = 0,
48    LZ4 = 1,
49    ZSTD = 2,
50}
51
52impl CompressionType {
53    pub fn from_u8(value: u8) -> Result<Self> {
54        match value {
55            0 => Ok(CompressionType::None),
56            1 => Ok(CompressionType::LZ4),
57            2 => Ok(CompressionType::ZSTD),
58            _ => Err(SochDBError::InvalidArgument(format!(
59                "Invalid compression type: {}",
60                value
61            ))),
62        }
63    }
64}
65
66/// Payload metadata stored in index
67#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
68pub struct PayloadMeta {
69    pub edge_id: u128,
70    pub offset: u64,
71    pub length: u32,
72    pub compression: CompressionType,
73    pub uncompressed_length: u32,
74}
75
76/// Pluggable trait for payload index storage
77///
78/// Allows switching between in-memory (HashMap) and disk-backed (DiskHash) implementations.
79pub trait PayloadIndex: Send + Sync {
80    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()>;
81    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>>;
82    fn contains_key(&self, edge_id: u128) -> bool;
83    fn len(&self) -> usize;
84    fn is_empty(&self) -> bool;
85    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_>;
86    fn save(&self) -> Result<()>;
87}
88
89/// In-memory HashMap index (default, fast but memory-hungry)
90struct HashMapIndex {
91    inner: Arc<RwLock<std::collections::HashMap<u128, PayloadMeta>>>,
92    index_path: PathBuf,
93}
94
95impl HashMapIndex {
96    fn new(index_path: PathBuf) -> Result<Self> {
97        // Ensure parent directory exists before any file operations
98        if let Some(parent) = index_path.parent() {
99            std::fs::create_dir_all(parent)?;
100        }
101
102        let inner = if index_path.exists() {
103            let loaded = Self::load_from_disk(&index_path)?;
104            tracing::info!(
105                "Loaded payload index with {} entries from {:?}",
106                loaded.len(),
107                index_path
108            );
109            Arc::new(RwLock::new(loaded))
110        } else {
111            tracing::info!(
112                "No existing payload index found at {:?}, starting fresh",
113                index_path
114            );
115            Arc::new(RwLock::new(std::collections::HashMap::new()))
116        };
117        Ok(Self { inner, index_path })
118    }
119
120    fn load_from_disk(path: &Path) -> Result<std::collections::HashMap<u128, PayloadMeta>> {
121        let file = File::open(path)?;
122        let mut reader = std::io::BufReader::new(file);
123
124        // Read and verify magic
125        let mut magic = [0u8; 8];
126        reader.read_exact(&mut magic)?;
127        if &magic != b"CHRLPAY1" {
128            return Err(SochDBError::Corruption(
129                "Invalid payload index magic".into(),
130            ));
131        }
132
133        // Read count
134        let mut count_bytes = [0u8; 8];
135        reader.read_exact(&mut count_bytes)?;
136        let count = u64::from_le_bytes(count_bytes);
137
138        let mut index = std::collections::HashMap::new();
139
140        for _ in 0..count {
141            let mut edge_id_bytes = [0u8; 16];
142            reader.read_exact(&mut edge_id_bytes)?;
143            let edge_id = u128::from_le_bytes(edge_id_bytes);
144
145            let mut offset_bytes = [0u8; 8];
146            reader.read_exact(&mut offset_bytes)?;
147            let offset = u64::from_le_bytes(offset_bytes);
148
149            let mut length_bytes = [0u8; 4];
150            reader.read_exact(&mut length_bytes)?;
151            let length = u32::from_le_bytes(length_bytes);
152
153            let mut compression_byte = [0u8; 1];
154            reader.read_exact(&mut compression_byte)?;
155            let compression = CompressionType::from_u8(compression_byte[0])?;
156
157            let mut uncompressed_bytes = [0u8; 4];
158            reader.read_exact(&mut uncompressed_bytes)?;
159            let uncompressed_length = u32::from_le_bytes(uncompressed_bytes);
160
161            index.insert(
162                edge_id,
163                PayloadMeta {
164                    edge_id,
165                    offset,
166                    length,
167                    compression,
168                    uncompressed_length,
169                },
170            );
171        }
172
173        Ok(index)
174    }
175}
176
177impl PayloadIndex for HashMapIndex {
178    fn insert(&self, edge_id: u128, meta: PayloadMeta) -> Result<()> {
179        self.inner.write().insert(edge_id, meta);
180        Ok(())
181    }
182
183    fn get(&self, edge_id: u128) -> Result<Option<PayloadMeta>> {
184        Ok(self.inner.read().get(&edge_id).cloned())
185    }
186
187    fn contains_key(&self, edge_id: u128) -> bool {
188        self.inner.read().contains_key(&edge_id)
189    }
190
191    fn len(&self) -> usize {
192        self.inner.read().len()
193    }
194
195    fn is_empty(&self) -> bool {
196        self.inner.read().is_empty()
197    }
198
199    fn iter_values(&self) -> Box<dyn Iterator<Item = PayloadMeta> + '_> {
200        let values: Vec<_> = self.inner.read().values().cloned().collect();
201        Box::new(values.into_iter())
202    }
203
204    fn save(&self) -> Result<()> {
205        let inner = self.inner.read();
206
207        // Ensure parent directory exists
208        if let Some(parent) = self.index_path.parent() {
209            std::fs::create_dir_all(parent)?;
210        }
211
212        // Write to a temp file first, then rename for atomicity
213        let temp_path = self.index_path.with_extension("tmp");
214        let file = File::create(&temp_path)?;
215        let mut writer = std::io::BufWriter::new(file);
216
217        // Write magic header
218        writer.write_all(b"CHRLPAY1")?;
219
220        // Write count
221        writer.write_all(&(inner.len() as u64).to_le_bytes())?;
222
223        // Write entries
224        for meta in inner.values() {
225            writer.write_all(&meta.edge_id.to_le_bytes())?;
226            writer.write_all(&meta.offset.to_le_bytes())?;
227            writer.write_all(&meta.length.to_le_bytes())?;
228            writer.write_all(&[meta.compression as u8])?;
229            writer.write_all(&meta.uncompressed_length.to_le_bytes())?;
230        }
231
232        // Flush buffer and sync to disk
233        std::io::Write::flush(&mut writer)?;
234        let file = writer
235            .into_inner()
236            .map_err(|e| SochDBError::Internal(format!("Failed to flush: {}", e)))?;
237        file.sync_all()?;
238
239        // Atomically rename temp file to actual index file
240        std::fs::rename(&temp_path, &self.index_path)?;
241
242        tracing::debug!(
243            entries = inner.len(),
244            path = %self.index_path.display(),
245            "Saved payload index to disk"
246        );
247
248        Ok(())
249    }
250}
251
252// DiskHashIndex is defined in disk_hash_index.rs and imported at the top of this module.
253
254/// Index backend type
255pub enum IndexBackend {
256    /// In-memory HashMap (default, fast but uses ~50MB per 1M payloads)
257    HashMap,
258    /// Disk-backed hash table (scales to billions, minimal RAM)
259    /// Uses a memory-mapped open-addressing hash table with linear probing.
260    /// Zero external dependencies — replaces sled with a purpose-built
261    /// structure optimized for fixed-size PayloadMeta records.
262    DiskHash,
263}
264
265/// Payload storage engine
266///
267/// **Architecture:**
268/// - Append-only payload file (payload.data)
269/// - In-memory index: edge_id -> (offset, length, compression)
270/// - Memory-mapped reads for zero-copy access
271/// - Write-ahead log for crash recovery
272///
273/// **Thread Safety:**
274/// - Reads: Lock-free via memory map (multiple concurrent readers)
275/// - Writes: Serialized via RwLock (single writer)
276/// - Index: Protected by RwLock
277///
278/// **SCALABILITY:**
279/// - **HashMap backend (default)**: Fast but uses ~50MB RAM per 1M payloads (32 bytes per entry + overhead).
280///   Suitable for < 10M traces.
281/// - **DiskHash backend**: Disk-backed hash table scales to billions with minimal RAM.
282///   Recommended for 10M+ traces.
283///
284/// **Usage:**
285/// ```rust,no_run
286/// use sochdb_storage::payload::{PayloadStore, IndexBackend};
287///
288/// // Default: in-memory HashMap
289/// let store = PayloadStore::open("./data").unwrap();
290///
291/// // For 10M+ scale: disk-backed hash table
292/// let store = PayloadStore::open_with_backend("./data", IndexBackend::DiskHash).unwrap();
293/// ```
294pub struct PayloadStore {
295    data_file: Arc<RwLock<File>>,
296    #[allow(dead_code)]
297    data_path: PathBuf,
298    mmap: Arc<RwLock<Option<memmap2::Mmap>>>,
299    index: Arc<dyn PayloadIndex>,
300    next_offset: Arc<RwLock<u64>>,
301    /// Memory warning thresholds (for HashMap backend)
302    /// Tracks when to warn users about memory growth
303    memory_warning_logged: Arc<RwLock<(bool, bool, bool)>>, // (1M, 5M, 10M warnings)
304    backend_type: IndexBackend,
305}
306
307impl PayloadStore {
308    /// Open or create a payload store with default HashMap index
309    pub fn open<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
310        Self::open_with_backend(data_dir, IndexBackend::HashMap)
311    }
312
313    /// Open or create a payload store with specified index backend
314    pub fn open_with_backend<P: AsRef<Path>>(data_dir: P, backend: IndexBackend) -> Result<Self> {
315        let data_dir = data_dir.as_ref();
316        std::fs::create_dir_all(data_dir)?;
317
318        let data_path = data_dir.join("payload.data");
319        let index_path = match backend {
320            IndexBackend::HashMap => data_dir.join("payload.index"),
321            IndexBackend::DiskHash => data_dir.join("payload_disk_hash.idx"),
322        };
323
324        // Open or create data file
325        let data_file = OpenOptions::new()
326            .read(true)
327            .write(true)
328            .create(true)
329            .truncate(false)
330            .open(&data_path)?;
331
332        let file_len = data_file.metadata()?.len();
333
334        // Create index based on backend type
335        let index: Arc<dyn PayloadIndex> = match backend {
336            IndexBackend::HashMap => Arc::new(HashMapIndex::new(index_path)?),
337            IndexBackend::DiskHash => Arc::new(DiskHashIndex::new(index_path)?),
338        };
339
340        // Create memory map if file has data
341        let mmap = if file_len > 0 {
342            let mmap = unsafe { MmapOptions::new().map(&data_file)? };
343            Some(mmap)
344        } else {
345            None
346        };
347
348        Ok(Self {
349            data_file: Arc::new(RwLock::new(data_file)),
350            data_path,
351            mmap: Arc::new(RwLock::new(mmap)),
352            index,
353            next_offset: Arc::new(RwLock::new(file_len)),
354            memory_warning_logged: Arc::new(RwLock::new((false, false, false))),
355            backend_type: backend,
356        })
357    }
358
359    /// Append a payload and return (offset, length, compression_type)
360    ///
361    /// **Compression Strategy:**
362    /// - Small payloads (<1KB): No compression (overhead > benefit)
363    /// - Medium payloads (1KB-100KB): LZ4 (fast compression/decompression)
364    /// - Large payloads (>100KB): ZSTD (higher compression ratio)
365    ///
366    /// This can be overridden by specifying compression_type explicitly.
367    ///
368    /// **DESKTOP APP FIX:** Monitors memory growth and logs warnings at thresholds
369    /// to prevent silent OOM in long-running sessions.
370    pub fn append(
371        &self,
372        edge_id: u128,
373        data: &[u8],
374        compression: Option<CompressionType>,
375    ) -> Result<(u64, u32, u8)> {
376        let uncompressed_len = data.len() as u32;
377
378        // Auto-select compression if not specified
379        let compression = compression.unwrap_or(if data.len() < 1024 {
380            CompressionType::None
381        } else if data.len() < 100_000 {
382            CompressionType::LZ4
383        } else {
384            CompressionType::ZSTD
385        });
386
387        // Compress data if needed
388        let (compressed_data, actual_compression) = match compression {
389            CompressionType::None => (data.to_vec(), CompressionType::None),
390            CompressionType::LZ4 => {
391                match lz4::block::compress(data, None, false) {
392                    Ok(compressed) => {
393                        // Only use compression if it saves space
394                        if compressed.len() < data.len() {
395                            (compressed, CompressionType::LZ4)
396                        } else {
397                            (data.to_vec(), CompressionType::None)
398                        }
399                    }
400                    Err(_) => (data.to_vec(), CompressionType::None),
401                }
402            }
403            CompressionType::ZSTD => {
404                match zstd::encode_all(data, 3) {
405                    // Level 3 is good balance
406                    Ok(compressed) => {
407                        if compressed.len() < data.len() {
408                            (compressed, CompressionType::ZSTD)
409                        } else {
410                            (data.to_vec(), CompressionType::None)
411                        }
412                    }
413                    Err(_) => (data.to_vec(), CompressionType::None),
414                }
415            }
416        };
417
418        let compressed_len = compressed_data.len() as u32;
419
420        // Append to file
421        let offset = {
422            let mut file = self.data_file.write();
423            let offset = *self.next_offset.read();
424
425            file.seek(SeekFrom::End(0))?;
426            file.write_all(&compressed_data)?;
427            file.sync_all()?;
428
429            offset
430        };
431
432        // Update next offset
433        *self.next_offset.write() = offset + compressed_len as u64;
434
435        // Update index
436        self.index.insert(
437            edge_id,
438            PayloadMeta {
439                edge_id,
440                offset,
441                length: compressed_len,
442                compression: actual_compression,
443                uncompressed_length: uncompressed_len,
444            },
445        )?;
446
447        // Remap file if needed
448        self.remap_file()?;
449
450        // DESKTOP APP FIX: Monitor memory growth for HashMap backend
451        // Log warnings at 1M, 5M, and 10M payloads to prevent silent OOM
452        if matches!(self.backend_type, IndexBackend::HashMap) {
453            let count = self.index.len();
454            let mut warnings = self.memory_warning_logged.write();
455
456            if count >= 10_000_000 && !warnings.2 {
457                warnings.2 = true;
458                tracing::warn!(
459                    payload_count = count,
460                    estimated_ram_mb = count * 50 / 1_000_000,
461                    "CRITICAL: PayloadStore has 10M+ entries (~500MB RAM). \
462                     Strongly recommend switching to DiskHash backend to prevent OOM. \
463                     Use PayloadStore::open_with_backend(path, IndexBackend::DiskHash)"
464                );
465            } else if count >= 5_000_000 && !warnings.1 {
466                warnings.1 = true;
467                tracing::warn!(
468                    payload_count = count,
469                    estimated_ram_mb = count * 50 / 1_000_000,
470                    "WARNING: PayloadStore has 5M+ entries (~250MB RAM). \
471                     Consider switching to DiskHash backend for better memory efficiency. \
472                     Use PayloadStore::open_with_backend(path, IndexBackend::DiskHash)"
473                );
474            } else if count >= 1_000_000 && !warnings.0 {
475                warnings.0 = true;
476                tracing::info!(
477                    payload_count = count,
478                    estimated_ram_mb = count * 50 / 1_000_000,
479                    "INFO: PayloadStore has reached 1M entries (~50MB RAM). \
480                     Memory usage will grow linearly. Consider DiskHash backend for 10M+ scale."
481                );
482            }
483        }
484
485        // NOTE: Index is saved on explicit sync() or shutdown, NOT on every insert.
486        // This is critical for performance - the old per-insert save caused O(N²)
487        // write amplification, limiting throughput to ~42 spans/sec.
488        //
489        // With this fix, throughput increases to 10,000+ spans/sec.
490        // The caller should call save_index() or sync() when durability is required.
491        //
492        // For crash recovery, the data file is append-only and the index can be
493        // rebuilt from the data file if needed (see rebuild_index()).
494
495        Ok((offset, compressed_len, actual_compression as u8))
496    }
497
498    /// Get a payload by edge ID
499    pub fn get(&self, edge_id: u128) -> Result<Option<Vec<u8>>> {
500        let meta = match self.index.get(edge_id)? {
501            Some(m) => m,
502            None => return Ok(None),
503        };
504
505        self.get_at_offset(meta.offset, meta.length, meta.compression)
506            .map(Some)
507    }
508
509    /// Get a payload at a specific offset
510    ///
511    /// This is used when reading edges from SSTables that have embedded
512    /// (offset, length, compression) tuples.
513    pub fn get_at_offset(
514        &self,
515        offset: u64,
516        length: u32,
517        compression: CompressionType,
518    ) -> Result<Vec<u8>> {
519        // For LZ4, we need the uncompressed size from the index
520        let uncompressed_size = if compression == CompressionType::LZ4 {
521            // Look up the metadata to get uncompressed size
522            self.index
523                .iter_values()
524                .find(|meta| meta.offset == offset && meta.length == length)
525                .map(|meta| meta.uncompressed_length as i32)
526        } else {
527            None
528        };
529
530        let mmap = self.mmap.read();
531
532        let compressed_data = match mmap.as_ref() {
533            Some(mmap) => {
534                let start = offset as usize;
535                let end = start + length as usize;
536
537                if end > mmap.len() {
538                    return Err(SochDBError::InvalidArgument(format!(
539                        "Payload offset {} + length {} exceeds file size {}",
540                        offset,
541                        length,
542                        mmap.len()
543                    )));
544                }
545
546                &mmap[start..end]
547            }
548            None => {
549                return Err(SochDBError::InvalidArgument("Payload file is empty".into()));
550            }
551        };
552
553        // Decompress if needed
554        let data = match compression {
555            CompressionType::None => compressed_data.to_vec(),
556            CompressionType::LZ4 => {
557                // LZ4 requires the uncompressed size for decompression
558                lz4::block::decompress(compressed_data, uncompressed_size).map_err(|e| {
559                    SochDBError::Corruption(format!("LZ4 decompression failed: {}", e))
560                })?
561            }
562            CompressionType::ZSTD => zstd::decode_all(compressed_data).map_err(|e| {
563                SochDBError::Corruption(format!("ZSTD decompression failed: {}", e))
564            })?,
565        };
566
567        Ok(data)
568    }
569
570    /// Check if a payload exists for an edge
571    pub fn has_payload(&self, edge_id: u128) -> bool {
572        self.index.contains_key(edge_id)
573    }
574
575    /// Get number of stored payloads
576    pub fn len(&self) -> usize {
577        self.index.len()
578    }
579
580    /// Check if empty
581    pub fn is_empty(&self) -> bool {
582        self.index.is_empty()
583    }
584
585    /// Save index to disk for fast recovery (delegates to backend)
586    pub fn save_index(&self) -> Result<()> {
587        self.index.save()
588    }
589
590    /// Remap file after writes
591    fn remap_file(&self) -> Result<()> {
592        let file = self.data_file.read();
593        let file_len = file.metadata()?.len();
594
595        if file_len > 0 {
596            let new_mmap = unsafe { MmapOptions::new().map(&*file)? };
597            *self.mmap.write() = Some(new_mmap);
598        }
599
600        Ok(())
601    }
602
603    /// Get statistics
604    pub fn stats(&self) -> PayloadStats {
605        let mut total_compressed: u64 = 0;
606        let mut total_uncompressed: u64 = 0;
607        let mut compression_counts = [0usize; 3];
608
609        for meta in self.index.iter_values() {
610            total_compressed += meta.length as u64;
611            total_uncompressed += meta.uncompressed_length as u64;
612            compression_counts[meta.compression as usize] += 1;
613        }
614
615        PayloadStats {
616            num_payloads: self.index.len(),
617            total_compressed_bytes: total_compressed,
618            total_uncompressed_bytes: total_uncompressed,
619            compression_ratio: if total_compressed > 0 {
620                total_uncompressed as f64 / total_compressed as f64
621            } else {
622                1.0
623            },
624            none_count: compression_counts[0],
625            lz4_count: compression_counts[1],
626            zstd_count: compression_counts[2],
627        }
628    }
629}
630
631impl Drop for PayloadStore {
632    fn drop(&mut self) {
633        // Save index on drop (delegates to backend)
634        let _ = self.index.save();
635    }
636}
637
638#[derive(Debug, Clone)]
639pub struct PayloadStats {
640    pub num_payloads: usize,
641    pub total_compressed_bytes: u64,
642    pub total_uncompressed_bytes: u64,
643    pub compression_ratio: f64,
644    pub none_count: usize,
645    pub lz4_count: usize,
646    pub zstd_count: usize,
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652    use tempfile::tempdir;
653
654    #[test]
655    fn test_payload_store_basic() {
656        let dir = tempdir().unwrap();
657        let store = PayloadStore::open(dir.path()).unwrap();
658
659        let data = b"Hello, SochDB!";
660        let (offset, length, compression) = store.append(1, data, None).unwrap();
661
662        assert_eq!(offset, 0);
663        assert_eq!(length, data.len() as u32);
664        assert_eq!(compression, CompressionType::None as u8);
665
666        let retrieved = store.get(1).unwrap().unwrap();
667        assert_eq!(retrieved, data);
668    }
669
670    #[test]
671    fn test_payload_compression() {
672        let dir = tempdir().unwrap();
673        let store = PayloadStore::open(dir.path()).unwrap();
674
675        // Create compressible data (repeated pattern)
676        let data: Vec<u8> = b"ABCD".repeat(1000);
677
678        let (_, length, compression) = store.append(1, &data, Some(CompressionType::LZ4)).unwrap();
679
680        // Should be compressed
681        assert!(length < data.len() as u32);
682        assert_eq!(compression, CompressionType::LZ4 as u8);
683
684        // Should decompress correctly
685        let retrieved = store.get(1).unwrap().unwrap();
686        assert_eq!(retrieved, data);
687    }
688
689    #[test]
690    fn test_payload_persistence() {
691        let dir = tempdir().unwrap();
692
693        let data1 = b"First payload";
694        let data2 = b"Second payload";
695
696        // Write payloads
697        {
698            let store = PayloadStore::open(dir.path()).unwrap();
699            store.append(1, data1, None).unwrap();
700            store.append(2, data2, None).unwrap();
701        }
702
703        // Reopen and verify
704        {
705            let store = PayloadStore::open(dir.path()).unwrap();
706            assert_eq!(store.get(1).unwrap().unwrap(), data1);
707            assert_eq!(store.get(2).unwrap().unwrap(), data2);
708        }
709    }
710
711    #[test]
712    fn test_payload_stats() {
713        let dir = tempdir().unwrap();
714        let store = PayloadStore::open(dir.path()).unwrap();
715
716        store.append(1, b"small", None).unwrap();
717        store
718            .append(2, &b"A".repeat(2000), Some(CompressionType::LZ4))
719            .unwrap();
720
721        let stats = store.stats();
722        assert_eq!(stats.num_payloads, 2);
723        assert!(stats.compression_ratio > 1.0);
724    }
725
726    #[test]
727    fn test_disk_hash_backend_basic() {
728        let dir = tempdir().unwrap();
729        let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
730
731        let data = b"Hello from DiskHash!";
732        let (offset, length, _) = store.append(1, data, None).unwrap();
733
734        assert_eq!(offset, 0);
735        assert_eq!(length, data.len() as u32);
736
737        let retrieved = store.get(1).unwrap().unwrap();
738        assert_eq!(retrieved, data);
739    }
740
741    #[test]
742    fn test_disk_hash_backend_persistence() {
743        let dir = tempdir().unwrap();
744
745        let data1 = b"First disk hash payload";
746        let data2 = b"Second disk hash payload";
747
748        // Write with DiskHash backend
749        {
750            let store =
751                PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
752            store.append(1, data1, None).unwrap();
753            store.append(2, data2, None).unwrap();
754            // Explicit save (though Drop also saves)
755            store.save_index().unwrap();
756        }
757
758        // Reopen with DiskHash and verify
759        {
760            let store =
761                PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
762            assert_eq!(store.get(1).unwrap().unwrap(), data1);
763            assert_eq!(store.get(2).unwrap().unwrap(), data2);
764            assert_eq!(store.len(), 2);
765        }
766    }
767
768    #[test]
769    fn test_disk_hash_backend_large_dataset() {
770        let dir = tempdir().unwrap();
771        let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
772
773        // Simulate large dataset
774        for i in 0..1000 {
775            let data = format!("Payload {}", i);
776            store.append(i, data.as_bytes(), None).unwrap();
777        }
778
779        // Verify random access
780        assert_eq!(store.len(), 1000);
781        let retrieved = store.get(500).unwrap().unwrap();
782        assert_eq!(retrieved, b"Payload 500");
783
784        // Verify stats
785        let stats = store.stats();
786        assert_eq!(stats.num_payloads, 1000);
787    }
788
789    #[test]
790    fn test_backend_interoperability() {
791        let dir = tempdir().unwrap();
792
793        // Write with HashMap backend
794        {
795            let store = PayloadStore::open_with_backend(dir.path(), IndexBackend::HashMap).unwrap();
796            store.append(1, b"HashMap data", None).unwrap();
797        }
798
799        // Payload data file is shared, but indexes are separate
800        // DiskHash backend starts fresh (no cross-backend index migration)
801        {
802            let store =
803                PayloadStore::open_with_backend(dir.path(), IndexBackend::DiskHash).unwrap();
804            // DiskHash has its own index, so it won't see HashMap's entry
805            assert_eq!(store.len(), 0);
806
807            // Add new entry with DiskHash
808            store.append(2, b"DiskHash data", None).unwrap();
809            assert_eq!(store.len(), 1);
810        }
811    }
812}