hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
24const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
25
26/// Block size for document store (256KB for better compression)
27/// Larger blocks = better compression ratio but more memory per block load
28pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30/// Default dictionary size (64KB is a good balance)
31pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33/// Default compression level for document store
34#[cfg(feature = "native")]
35const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
36
37pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
38    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
39}
40
41/// Compressed block result
42#[cfg(feature = "native")]
43struct CompressedBlock {
44    seq: usize,
45    first_doc_id: DocId,
46    num_docs: u32,
47    compressed: Vec<u8>,
48}
49
50/// Parallel document store writer - compresses blocks immediately when queued
51///
52/// Spawns compression tasks as soon as blocks are ready, overlapping document
53/// ingestion with compression to reduce total indexing time.
54///
55/// Uses background threads to compress blocks while the main thread continues
56/// accepting documents.
57#[cfg(feature = "native")]
58pub struct EagerParallelStoreWriter<'a> {
59    writer: &'a mut dyn Write,
60    block_buffer: Vec<u8>,
61    /// Compressed blocks ready to be written (may arrive out of order)
62    compressed_blocks: Vec<CompressedBlock>,
63    /// Handles for in-flight compression tasks
64    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
65    next_seq: usize,
66    next_doc_id: DocId,
67    block_first_doc: DocId,
68    dict: Option<Arc<CompressionDict>>,
69    compression_level: CompressionLevel,
70}
71
72#[cfg(feature = "native")]
73impl<'a> EagerParallelStoreWriter<'a> {
74    /// Create a new eager parallel store writer
75    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
76        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
77    }
78
79    /// Create with specific compression level
80    pub fn with_compression_level(
81        writer: &'a mut dyn Write,
82        _num_threads: usize,
83        compression_level: CompressionLevel,
84    ) -> Self {
85        Self {
86            writer,
87            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
88            compressed_blocks: Vec::new(),
89            pending_handles: Vec::new(),
90            next_seq: 0,
91            next_doc_id: 0,
92            block_first_doc: 0,
93            dict: None,
94            compression_level,
95        }
96    }
97
98    /// Create with dictionary
99    pub fn with_dict(
100        writer: &'a mut dyn Write,
101        dict: CompressionDict,
102        _num_threads: usize,
103    ) -> Self {
104        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
105    }
106
107    /// Create with dictionary and specific compression level
108    pub fn with_dict_and_level(
109        writer: &'a mut dyn Write,
110        dict: CompressionDict,
111        _num_threads: usize,
112        compression_level: CompressionLevel,
113    ) -> Self {
114        Self {
115            writer,
116            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
117            compressed_blocks: Vec::new(),
118            pending_handles: Vec::new(),
119            next_seq: 0,
120            next_doc_id: 0,
121            block_first_doc: 0,
122            dict: Some(Arc::new(dict)),
123            compression_level,
124        }
125    }
126
127    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
128        let doc_id = self.next_doc_id;
129        self.next_doc_id += 1;
130
131        let doc_bytes = serialize_document(doc, schema)?;
132
133        self.block_buffer
134            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
135        self.block_buffer.extend_from_slice(&doc_bytes);
136
137        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
138            self.spawn_compression();
139        }
140
141        Ok(doc_id)
142    }
143
144    /// Spawn compression for the current block immediately
145    fn spawn_compression(&mut self) {
146        if self.block_buffer.is_empty() {
147            return;
148        }
149
150        let num_docs = self.next_doc_id - self.block_first_doc;
151        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
152        let seq = self.next_seq;
153        let first_doc_id = self.block_first_doc;
154        let dict = self.dict.clone();
155
156        self.next_seq += 1;
157        self.block_first_doc = self.next_doc_id;
158
159        // Spawn compression task using thread
160        let level = self.compression_level;
161        let handle = std::thread::spawn(move || {
162            let compressed = if let Some(ref d) = dict {
163                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
164            } else {
165                crate::compression::compress(&data, level).expect("compression failed")
166            };
167
168            CompressedBlock {
169                seq,
170                first_doc_id,
171                num_docs,
172                compressed,
173            }
174        });
175
176        self.pending_handles.push(handle);
177    }
178
179    /// Collect any completed compression tasks
180    fn collect_completed(&mut self) {
181        let mut remaining = Vec::new();
182        for handle in self.pending_handles.drain(..) {
183            if handle.is_finished() {
184                if let Ok(block) = handle.join() {
185                    self.compressed_blocks.push(block);
186                }
187            } else {
188                remaining.push(handle);
189            }
190        }
191        self.pending_handles = remaining;
192    }
193
194    pub fn finish(mut self) -> io::Result<u32> {
195        // Spawn compression for any remaining data
196        self.spawn_compression();
197
198        // Collect any already-completed tasks
199        self.collect_completed();
200
201        // Wait for all remaining compression tasks
202        for handle in self.pending_handles.drain(..) {
203            if let Ok(block) = handle.join() {
204                self.compressed_blocks.push(block);
205            }
206        }
207
208        if self.compressed_blocks.is_empty() {
209            // Write empty store
210            let data_end_offset = 0u64;
211            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
212            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
213            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
214            self.writer.write_u32::<LittleEndian>(0)?; // num docs
215            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
216            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
217            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
218            return Ok(0);
219        }
220
221        // Sort by sequence to maintain order
222        self.compressed_blocks.sort_by_key(|b| b.seq);
223
224        // Write blocks in order and build index
225        let mut index = Vec::with_capacity(self.compressed_blocks.len());
226        let mut current_offset = 0u64;
227
228        for block in &self.compressed_blocks {
229            index.push(StoreBlockIndex {
230                first_doc_id: block.first_doc_id,
231                offset: current_offset,
232                length: block.compressed.len() as u32,
233                num_docs: block.num_docs,
234            });
235
236            self.writer.write_all(&block.compressed)?;
237            current_offset += block.compressed.len() as u64;
238        }
239
240        let data_end_offset = current_offset;
241
242        // Write dictionary if present
243        let dict_offset = if let Some(ref dict) = self.dict {
244            let offset = current_offset;
245            let dict_bytes = dict.as_bytes();
246            self.writer
247                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
248            self.writer.write_all(dict_bytes)?;
249            Some(offset)
250        } else {
251            None
252        };
253
254        // Write index
255        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
256        for entry in &index {
257            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
258            self.writer.write_u64::<LittleEndian>(entry.offset)?;
259            self.writer.write_u32::<LittleEndian>(entry.length)?;
260            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
261        }
262
263        // Write footer
264        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
265        self.writer
266            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
267        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
268        self.writer
269            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
270        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
271        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
272
273        Ok(self.next_doc_id)
274    }
275}
276
277/// Block index entry for document store
278#[derive(Debug, Clone)]
279struct StoreBlockIndex {
280    first_doc_id: DocId,
281    offset: u64,
282    length: u32,
283    num_docs: u32,
284}
285
286/// Async document store reader - loads blocks on demand
287pub struct AsyncStoreReader {
288    /// LazyFileSlice for the data portion - fetches ranges on demand
289    data_slice: LazyFileSlice,
290    /// Block index
291    index: Vec<StoreBlockIndex>,
292    num_docs: u32,
293    /// Optional compression dictionary
294    dict: Option<CompressionDict>,
295    /// Block cache
296    cache: RwLock<StoreBlockCache>,
297}
298
299struct StoreBlockCache {
300    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
301    access_order: Vec<DocId>,
302    max_blocks: usize,
303}
304
305impl StoreBlockCache {
306    fn new(max_blocks: usize) -> Self {
307        Self {
308            blocks: FxHashMap::default(),
309            access_order: Vec::new(),
310            max_blocks,
311        }
312    }
313
314    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
315        if let Some(block) = self.blocks.get(&first_doc_id) {
316            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
317                self.access_order.remove(pos);
318                self.access_order.push(first_doc_id);
319            }
320            Some(Arc::clone(block))
321        } else {
322            None
323        }
324    }
325
326    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
327        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
328            let evict = self.access_order.remove(0);
329            self.blocks.remove(&evict);
330        }
331        self.blocks.insert(first_doc_id, block);
332        self.access_order.push(first_doc_id);
333    }
334}
335
336impl AsyncStoreReader {
337    /// Open a document store from LazyFileHandle
338    /// Only loads footer and index into memory, data blocks are fetched on-demand
339    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
340        let file_len = file_handle.len();
341        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
342        if file_len < 32 {
343            return Err(io::Error::new(
344                io::ErrorKind::InvalidData,
345                "Store too small",
346            ));
347        }
348
349        // Read footer (32 bytes)
350        let footer = file_handle
351            .read_bytes_range(file_len - 32..file_len)
352            .await?;
353        let mut reader = footer.as_slice();
354        let data_end_offset = reader.read_u64::<LittleEndian>()?;
355        let dict_offset = reader.read_u64::<LittleEndian>()?;
356        let num_docs = reader.read_u32::<LittleEndian>()?;
357        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
358        let version = reader.read_u32::<LittleEndian>()?;
359        let magic = reader.read_u32::<LittleEndian>()?;
360
361        if magic != STORE_MAGIC {
362            return Err(io::Error::new(
363                io::ErrorKind::InvalidData,
364                "Invalid store magic",
365            ));
366        }
367        if version != STORE_VERSION {
368            return Err(io::Error::new(
369                io::ErrorKind::InvalidData,
370                format!("Unsupported store version: {}", version),
371            ));
372        }
373
374        // Load dictionary if present
375        let dict = if has_dict && dict_offset > 0 {
376            let dict_start = dict_offset as usize;
377            let dict_len_bytes = file_handle
378                .read_bytes_range(dict_start..dict_start + 4)
379                .await?;
380            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
381            let dict_bytes = file_handle
382                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
383                .await?;
384            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
385        } else {
386            None
387        };
388
389        // Calculate index location
390        let index_start = if has_dict && dict_offset > 0 {
391            let dict_start = dict_offset as usize;
392            let dict_len_bytes = file_handle
393                .read_bytes_range(dict_start..dict_start + 4)
394                .await?;
395            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
396            dict_start + 4 + dict_len
397        } else {
398            data_end_offset as usize
399        };
400        let index_end = file_len - 32;
401
402        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
403        let mut reader = index_bytes.as_slice();
404
405        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
406        let mut index = Vec::with_capacity(num_blocks);
407
408        for _ in 0..num_blocks {
409            let first_doc_id = reader.read_u32::<LittleEndian>()?;
410            let offset = reader.read_u64::<LittleEndian>()?;
411            let length = reader.read_u32::<LittleEndian>()?;
412            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
413
414            index.push(StoreBlockIndex {
415                first_doc_id,
416                offset,
417                length,
418                num_docs: num_docs_in_block,
419            });
420        }
421
422        // Create lazy slice for data portion only
423        let data_slice = file_handle.slice(0..data_end_offset as usize);
424
425        Ok(Self {
426            data_slice,
427            index,
428            num_docs,
429            dict,
430            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
431        })
432    }
433
434    /// Number of documents
435    pub fn num_docs(&self) -> u32 {
436        self.num_docs
437    }
438
439    /// Get a document by doc_id (async - may load block)
440    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
441        if doc_id >= self.num_docs {
442            return Ok(None);
443        }
444
445        // Find block containing this doc_id
446        let block_idx = self
447            .index
448            .binary_search_by(|entry| {
449                if doc_id < entry.first_doc_id {
450                    std::cmp::Ordering::Greater
451                } else if doc_id >= entry.first_doc_id + entry.num_docs {
452                    std::cmp::Ordering::Less
453                } else {
454                    std::cmp::Ordering::Equal
455                }
456            })
457            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
458
459        let entry = &self.index[block_idx];
460        let block_data = self.load_block(entry).await?;
461
462        // Find document within block
463        let doc_offset_in_block = doc_id - entry.first_doc_id;
464        let mut reader = &block_data[..];
465
466        for _ in 0..doc_offset_in_block {
467            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
468            if doc_len > reader.len() {
469                return Err(io::Error::new(
470                    io::ErrorKind::InvalidData,
471                    "Invalid doc length",
472                ));
473            }
474            reader = &reader[doc_len..];
475        }
476
477        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
478        let doc_bytes = &reader[..doc_len];
479
480        deserialize_document(doc_bytes, schema).map(Some)
481    }
482
483    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
484        // Check cache
485        {
486            let mut cache = self.cache.write();
487            if let Some(block) = cache.get(entry.first_doc_id) {
488                return Ok(block);
489            }
490        }
491
492        // Load from FileSlice
493        let start = entry.offset as usize;
494        let end = start + entry.length as usize;
495        let compressed = self.data_slice.read_bytes_range(start..end).await?;
496
497        // Use dictionary decompression if available
498        let decompressed = if let Some(ref dict) = self.dict {
499            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
500        } else {
501            crate::compression::decompress(compressed.as_slice())?
502        };
503
504        let block = Arc::new(decompressed);
505
506        // Insert into cache
507        {
508            let mut cache = self.cache.write();
509            cache.insert(entry.first_doc_id, Arc::clone(&block));
510        }
511
512        Ok(block)
513    }
514}
515
516pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
517    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
518}
519
520/// Raw block info for store merging (without decompression)
521#[derive(Debug, Clone)]
522pub struct RawStoreBlock {
523    pub first_doc_id: DocId,
524    pub num_docs: u32,
525    pub offset: u64,
526    pub length: u32,
527}
528
529/// Store merger - concatenates compressed blocks from multiple stores without recompression
530///
531/// This is much faster than rebuilding stores since it avoids:
532/// - Decompressing blocks from source stores
533/// - Re-serializing documents
534/// - Re-compressing blocks at level 22
535///
536/// Limitations:
537/// - All source stores must NOT use dictionaries (or use the same dictionary)
538/// - Doc IDs are remapped sequentially
539pub struct StoreMerger<'a, W: Write> {
540    writer: &'a mut W,
541    index: Vec<StoreBlockIndex>,
542    current_offset: u64,
543    next_doc_id: DocId,
544}
545
546impl<'a, W: Write> StoreMerger<'a, W> {
547    pub fn new(writer: &'a mut W) -> Self {
548        Self {
549            writer,
550            index: Vec::new(),
551            current_offset: 0,
552            next_doc_id: 0,
553        }
554    }
555
556    /// Append raw compressed blocks from a store file
557    ///
558    /// `data_slice` should be the data portion of the store (before index/footer)
559    /// `blocks` contains the block metadata from the source store
560    pub async fn append_store<F: AsyncFileRead>(
561        &mut self,
562        data_slice: &F,
563        blocks: &[RawStoreBlock],
564    ) -> io::Result<()> {
565        for block in blocks {
566            // Read raw compressed block data
567            let start = block.offset as usize;
568            let end = start + block.length as usize;
569            let compressed_data = data_slice.read_bytes_range(start..end).await?;
570
571            // Write to output
572            self.writer.write_all(compressed_data.as_slice())?;
573
574            // Add to index with remapped doc IDs
575            self.index.push(StoreBlockIndex {
576                first_doc_id: self.next_doc_id,
577                offset: self.current_offset,
578                length: block.length,
579                num_docs: block.num_docs,
580            });
581
582            self.current_offset += block.length as u64;
583            self.next_doc_id += block.num_docs;
584        }
585
586        Ok(())
587    }
588
589    /// Finish writing the merged store
590    pub fn finish(self) -> io::Result<u32> {
591        let data_end_offset = self.current_offset;
592
593        // No dictionary support for merged stores (would need same dict across all sources)
594        let dict_offset = 0u64;
595
596        // Write index
597        self.writer
598            .write_u32::<LittleEndian>(self.index.len() as u32)?;
599        for entry in &self.index {
600            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
601            self.writer.write_u64::<LittleEndian>(entry.offset)?;
602            self.writer.write_u32::<LittleEndian>(entry.length)?;
603            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
604        }
605
606        // Write footer
607        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
608        self.writer.write_u64::<LittleEndian>(dict_offset)?;
609        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
610        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
611        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
612        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
613
614        Ok(self.next_doc_id)
615    }
616}
617
618impl AsyncStoreReader {
619    /// Get raw block metadata for merging (without loading block data)
620    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
621        self.index
622            .iter()
623            .map(|entry| RawStoreBlock {
624                first_doc_id: entry.first_doc_id,
625                num_docs: entry.num_docs,
626                offset: entry.offset,
627                length: entry.length,
628            })
629            .collect()
630    }
631
632    /// Get the data slice for raw block access
633    pub fn data_slice(&self) -> &LazyFileSlice {
634        &self.data_slice
635    }
636
637    /// Check if this store uses a dictionary (incompatible with raw merging)
638    pub fn has_dict(&self) -> bool {
639        self.dict.is_some()
640    }
641}