hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
24const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
25
26/// Block size for document store (256KB for better compression)
27/// Larger blocks = better compression ratio but more memory per block load
28pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30/// Default dictionary size (64KB is a good balance)
31pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33/// Default compression level for document store
34const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
35
36pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
37    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
38}
39
40/// Compressed block result
41#[cfg(feature = "native")]
42struct CompressedBlock {
43    seq: usize,
44    first_doc_id: DocId,
45    num_docs: u32,
46    compressed: Vec<u8>,
47}
48
49/// Parallel document store writer - compresses blocks immediately when queued
50///
51/// Spawns compression tasks as soon as blocks are ready, overlapping document
52/// ingestion with compression to reduce total indexing time.
53///
54/// Uses background threads to compress blocks while the main thread continues
55/// accepting documents.
56#[cfg(feature = "native")]
57pub struct EagerParallelStoreWriter<'a> {
58    writer: &'a mut dyn Write,
59    block_buffer: Vec<u8>,
60    /// Compressed blocks ready to be written (may arrive out of order)
61    compressed_blocks: Vec<CompressedBlock>,
62    /// Handles for in-flight compression tasks
63    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
64    next_seq: usize,
65    next_doc_id: DocId,
66    block_first_doc: DocId,
67    dict: Option<Arc<CompressionDict>>,
68    compression_level: CompressionLevel,
69}
70
71#[cfg(feature = "native")]
72impl<'a> EagerParallelStoreWriter<'a> {
73    /// Create a new eager parallel store writer
74    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
75        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
76    }
77
78    /// Create with specific compression level
79    pub fn with_compression_level(
80        writer: &'a mut dyn Write,
81        _num_threads: usize,
82        compression_level: CompressionLevel,
83    ) -> Self {
84        Self {
85            writer,
86            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
87            compressed_blocks: Vec::new(),
88            pending_handles: Vec::new(),
89            next_seq: 0,
90            next_doc_id: 0,
91            block_first_doc: 0,
92            dict: None,
93            compression_level,
94        }
95    }
96
97    /// Create with dictionary
98    pub fn with_dict(
99        writer: &'a mut dyn Write,
100        dict: CompressionDict,
101        _num_threads: usize,
102    ) -> Self {
103        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
104    }
105
106    /// Create with dictionary and specific compression level
107    pub fn with_dict_and_level(
108        writer: &'a mut dyn Write,
109        dict: CompressionDict,
110        _num_threads: usize,
111        compression_level: CompressionLevel,
112    ) -> Self {
113        Self {
114            writer,
115            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
116            compressed_blocks: Vec::new(),
117            pending_handles: Vec::new(),
118            next_seq: 0,
119            next_doc_id: 0,
120            block_first_doc: 0,
121            dict: Some(Arc::new(dict)),
122            compression_level,
123        }
124    }
125
126    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
127        let doc_id = self.next_doc_id;
128        self.next_doc_id += 1;
129
130        let doc_bytes = serialize_document(doc, schema)?;
131
132        self.block_buffer
133            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
134        self.block_buffer.extend_from_slice(&doc_bytes);
135
136        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
137            self.spawn_compression();
138        }
139
140        Ok(doc_id)
141    }
142
143    /// Spawn compression for the current block immediately
144    fn spawn_compression(&mut self) {
145        if self.block_buffer.is_empty() {
146            return;
147        }
148
149        let num_docs = self.next_doc_id - self.block_first_doc;
150        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
151        let seq = self.next_seq;
152        let first_doc_id = self.block_first_doc;
153        let dict = self.dict.clone();
154
155        self.next_seq += 1;
156        self.block_first_doc = self.next_doc_id;
157
158        // Spawn compression task using thread
159        let level = self.compression_level;
160        let handle = std::thread::spawn(move || {
161            let compressed = if let Some(ref d) = dict {
162                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
163            } else {
164                crate::compression::compress(&data, level).expect("compression failed")
165            };
166
167            CompressedBlock {
168                seq,
169                first_doc_id,
170                num_docs,
171                compressed,
172            }
173        });
174
175        self.pending_handles.push(handle);
176    }
177
178    /// Collect any completed compression tasks
179    fn collect_completed(&mut self) {
180        let mut remaining = Vec::new();
181        for handle in self.pending_handles.drain(..) {
182            if handle.is_finished() {
183                if let Ok(block) = handle.join() {
184                    self.compressed_blocks.push(block);
185                }
186            } else {
187                remaining.push(handle);
188            }
189        }
190        self.pending_handles = remaining;
191    }
192
193    pub fn finish(mut self) -> io::Result<u32> {
194        // Spawn compression for any remaining data
195        self.spawn_compression();
196
197        // Collect any already-completed tasks
198        self.collect_completed();
199
200        // Wait for all remaining compression tasks
201        for handle in self.pending_handles.drain(..) {
202            if let Ok(block) = handle.join() {
203                self.compressed_blocks.push(block);
204            }
205        }
206
207        if self.compressed_blocks.is_empty() {
208            // Write empty store
209            let data_end_offset = 0u64;
210            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
211            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
212            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
213            self.writer.write_u32::<LittleEndian>(0)?; // num docs
214            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
215            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
216            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
217            return Ok(0);
218        }
219
220        // Sort by sequence to maintain order
221        self.compressed_blocks.sort_by_key(|b| b.seq);
222
223        // Write blocks in order and build index
224        let mut index = Vec::with_capacity(self.compressed_blocks.len());
225        let mut current_offset = 0u64;
226
227        for block in &self.compressed_blocks {
228            index.push(StoreBlockIndex {
229                first_doc_id: block.first_doc_id,
230                offset: current_offset,
231                length: block.compressed.len() as u32,
232                num_docs: block.num_docs,
233            });
234
235            self.writer.write_all(&block.compressed)?;
236            current_offset += block.compressed.len() as u64;
237        }
238
239        let data_end_offset = current_offset;
240
241        // Write dictionary if present
242        let dict_offset = if let Some(ref dict) = self.dict {
243            let offset = current_offset;
244            let dict_bytes = dict.as_bytes();
245            self.writer
246                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
247            self.writer.write_all(dict_bytes)?;
248            Some(offset)
249        } else {
250            None
251        };
252
253        // Write index
254        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
255        for entry in &index {
256            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
257            self.writer.write_u64::<LittleEndian>(entry.offset)?;
258            self.writer.write_u32::<LittleEndian>(entry.length)?;
259            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
260        }
261
262        // Write footer
263        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
264        self.writer
265            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
266        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
267        self.writer
268            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
269        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
270        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
271
272        Ok(self.next_doc_id)
273    }
274}
275
276/// Block index entry for document store
277#[derive(Debug, Clone)]
278struct StoreBlockIndex {
279    first_doc_id: DocId,
280    offset: u64,
281    length: u32,
282    num_docs: u32,
283}
284
285/// Async document store reader - loads blocks on demand
286pub struct AsyncStoreReader {
287    /// LazyFileSlice for the data portion - fetches ranges on demand
288    data_slice: LazyFileSlice,
289    /// Block index
290    index: Vec<StoreBlockIndex>,
291    num_docs: u32,
292    /// Optional compression dictionary
293    dict: Option<CompressionDict>,
294    /// Block cache
295    cache: RwLock<StoreBlockCache>,
296}
297
298struct StoreBlockCache {
299    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
300    access_order: Vec<DocId>,
301    max_blocks: usize,
302}
303
304impl StoreBlockCache {
305    fn new(max_blocks: usize) -> Self {
306        Self {
307            blocks: FxHashMap::default(),
308            access_order: Vec::new(),
309            max_blocks,
310        }
311    }
312
313    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
314        if let Some(block) = self.blocks.get(&first_doc_id) {
315            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
316                self.access_order.remove(pos);
317                self.access_order.push(first_doc_id);
318            }
319            Some(Arc::clone(block))
320        } else {
321            None
322        }
323    }
324
325    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
326        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
327            let evict = self.access_order.remove(0);
328            self.blocks.remove(&evict);
329        }
330        self.blocks.insert(first_doc_id, block);
331        self.access_order.push(first_doc_id);
332    }
333}
334
335impl AsyncStoreReader {
336    /// Open a document store from LazyFileHandle
337    /// Only loads footer and index into memory, data blocks are fetched on-demand
338    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
339        let file_len = file_handle.len();
340        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
341        if file_len < 32 {
342            return Err(io::Error::new(
343                io::ErrorKind::InvalidData,
344                "Store too small",
345            ));
346        }
347
348        // Read footer (32 bytes)
349        let footer = file_handle
350            .read_bytes_range(file_len - 32..file_len)
351            .await?;
352        let mut reader = footer.as_slice();
353        let data_end_offset = reader.read_u64::<LittleEndian>()?;
354        let dict_offset = reader.read_u64::<LittleEndian>()?;
355        let num_docs = reader.read_u32::<LittleEndian>()?;
356        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
357        let version = reader.read_u32::<LittleEndian>()?;
358        let magic = reader.read_u32::<LittleEndian>()?;
359
360        if magic != STORE_MAGIC {
361            return Err(io::Error::new(
362                io::ErrorKind::InvalidData,
363                "Invalid store magic",
364            ));
365        }
366        if version != STORE_VERSION {
367            return Err(io::Error::new(
368                io::ErrorKind::InvalidData,
369                format!("Unsupported store version: {}", version),
370            ));
371        }
372
373        // Load dictionary if present
374        let dict = if has_dict && dict_offset > 0 {
375            let dict_start = dict_offset as usize;
376            let dict_len_bytes = file_handle
377                .read_bytes_range(dict_start..dict_start + 4)
378                .await?;
379            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
380            let dict_bytes = file_handle
381                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
382                .await?;
383            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
384        } else {
385            None
386        };
387
388        // Calculate index location
389        let index_start = if has_dict && dict_offset > 0 {
390            let dict_start = dict_offset as usize;
391            let dict_len_bytes = file_handle
392                .read_bytes_range(dict_start..dict_start + 4)
393                .await?;
394            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
395            dict_start + 4 + dict_len
396        } else {
397            data_end_offset as usize
398        };
399        let index_end = file_len - 32;
400
401        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
402        let mut reader = index_bytes.as_slice();
403
404        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
405        let mut index = Vec::with_capacity(num_blocks);
406
407        for _ in 0..num_blocks {
408            let first_doc_id = reader.read_u32::<LittleEndian>()?;
409            let offset = reader.read_u64::<LittleEndian>()?;
410            let length = reader.read_u32::<LittleEndian>()?;
411            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
412
413            index.push(StoreBlockIndex {
414                first_doc_id,
415                offset,
416                length,
417                num_docs: num_docs_in_block,
418            });
419        }
420
421        // Create lazy slice for data portion only
422        let data_slice = file_handle.slice(0..data_end_offset as usize);
423
424        Ok(Self {
425            data_slice,
426            index,
427            num_docs,
428            dict,
429            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
430        })
431    }
432
433    /// Number of documents
434    pub fn num_docs(&self) -> u32 {
435        self.num_docs
436    }
437
438    /// Get a document by doc_id (async - may load block)
439    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
440        if doc_id >= self.num_docs {
441            return Ok(None);
442        }
443
444        // Find block containing this doc_id
445        let block_idx = self
446            .index
447            .binary_search_by(|entry| {
448                if doc_id < entry.first_doc_id {
449                    std::cmp::Ordering::Greater
450                } else if doc_id >= entry.first_doc_id + entry.num_docs {
451                    std::cmp::Ordering::Less
452                } else {
453                    std::cmp::Ordering::Equal
454                }
455            })
456            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
457
458        let entry = &self.index[block_idx];
459        let block_data = self.load_block(entry).await?;
460
461        // Find document within block
462        let doc_offset_in_block = doc_id - entry.first_doc_id;
463        let mut reader = &block_data[..];
464
465        for _ in 0..doc_offset_in_block {
466            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
467            if doc_len > reader.len() {
468                return Err(io::Error::new(
469                    io::ErrorKind::InvalidData,
470                    "Invalid doc length",
471                ));
472            }
473            reader = &reader[doc_len..];
474        }
475
476        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
477        let doc_bytes = &reader[..doc_len];
478
479        deserialize_document(doc_bytes, schema).map(Some)
480    }
481
482    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
483        // Check cache
484        {
485            let mut cache = self.cache.write();
486            if let Some(block) = cache.get(entry.first_doc_id) {
487                return Ok(block);
488            }
489        }
490
491        // Load from FileSlice
492        let start = entry.offset as usize;
493        let end = start + entry.length as usize;
494        let compressed = self.data_slice.read_bytes_range(start..end).await?;
495
496        // Use dictionary decompression if available
497        let decompressed = if let Some(ref dict) = self.dict {
498            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
499        } else {
500            crate::compression::decompress(compressed.as_slice())?
501        };
502
503        let block = Arc::new(decompressed);
504
505        // Insert into cache
506        {
507            let mut cache = self.cache.write();
508            cache.insert(entry.first_doc_id, Arc::clone(&block));
509        }
510
511        Ok(block)
512    }
513}
514
515pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
516    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
517}
518
519/// Raw block info for store merging (without decompression)
520#[derive(Debug, Clone)]
521pub struct RawStoreBlock {
522    pub first_doc_id: DocId,
523    pub num_docs: u32,
524    pub offset: u64,
525    pub length: u32,
526}
527
528/// Store merger - concatenates compressed blocks from multiple stores without recompression
529///
530/// This is much faster than rebuilding stores since it avoids:
531/// - Decompressing blocks from source stores
532/// - Re-serializing documents
533/// - Re-compressing blocks at level 22
534///
535/// Limitations:
536/// - All source stores must NOT use dictionaries (or use the same dictionary)
537/// - Doc IDs are remapped sequentially
538pub struct StoreMerger<'a, W: Write> {
539    writer: &'a mut W,
540    index: Vec<StoreBlockIndex>,
541    current_offset: u64,
542    next_doc_id: DocId,
543}
544
545impl<'a, W: Write> StoreMerger<'a, W> {
546    pub fn new(writer: &'a mut W) -> Self {
547        Self {
548            writer,
549            index: Vec::new(),
550            current_offset: 0,
551            next_doc_id: 0,
552        }
553    }
554
555    /// Append raw compressed blocks from a store file
556    ///
557    /// `data_slice` should be the data portion of the store (before index/footer)
558    /// `blocks` contains the block metadata from the source store
559    pub async fn append_store<F: AsyncFileRead>(
560        &mut self,
561        data_slice: &F,
562        blocks: &[RawStoreBlock],
563    ) -> io::Result<()> {
564        for block in blocks {
565            // Read raw compressed block data
566            let start = block.offset as usize;
567            let end = start + block.length as usize;
568            let compressed_data = data_slice.read_bytes_range(start..end).await?;
569
570            // Write to output
571            self.writer.write_all(compressed_data.as_slice())?;
572
573            // Add to index with remapped doc IDs
574            self.index.push(StoreBlockIndex {
575                first_doc_id: self.next_doc_id,
576                offset: self.current_offset,
577                length: block.length,
578                num_docs: block.num_docs,
579            });
580
581            self.current_offset += block.length as u64;
582            self.next_doc_id += block.num_docs;
583        }
584
585        Ok(())
586    }
587
588    /// Finish writing the merged store
589    pub fn finish(self) -> io::Result<u32> {
590        let data_end_offset = self.current_offset;
591
592        // No dictionary support for merged stores (would need same dict across all sources)
593        let dict_offset = 0u64;
594
595        // Write index
596        self.writer
597            .write_u32::<LittleEndian>(self.index.len() as u32)?;
598        for entry in &self.index {
599            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
600            self.writer.write_u64::<LittleEndian>(entry.offset)?;
601            self.writer.write_u32::<LittleEndian>(entry.length)?;
602            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
603        }
604
605        // Write footer
606        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
607        self.writer.write_u64::<LittleEndian>(dict_offset)?;
608        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
609        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
610        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
611        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
612
613        Ok(self.next_doc_id)
614    }
615}
616
617impl AsyncStoreReader {
618    /// Get raw block metadata for merging (without loading block data)
619    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
620        self.index
621            .iter()
622            .map(|entry| RawStoreBlock {
623                first_doc_id: entry.first_doc_id,
624                num_docs: entry.num_docs,
625                offset: entry.offset,
626                length: entry.length,
627            })
628            .collect()
629    }
630
631    /// Get the data slice for raw block access
632    pub fn data_slice(&self) -> &LazyFileSlice {
633        &self.data_slice
634    }
635
636    /// Check if this store uses a dictionary (incompatible with raw merging)
637    pub fn has_dict(&self) -> bool {
638        self.dict.is_some()
639    }
640}