Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (256KB for better compression)
29/// Larger blocks = better compression ratio but more memory per block load
30pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32/// Default dictionary size (64KB is a good balance)
33pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35/// Default compression level for document store
36#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
38
39pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
40    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
41}
42
43/// Compressed block result
44#[cfg(feature = "native")]
45struct CompressedBlock {
46    seq: usize,
47    first_doc_id: DocId,
48    num_docs: u32,
49    compressed: Vec<u8>,
50}
51
52/// Parallel document store writer - compresses blocks immediately when queued
53///
54/// Spawns compression tasks as soon as blocks are ready, overlapping document
55/// ingestion with compression to reduce total indexing time.
56///
57/// Uses background threads to compress blocks while the main thread continues
58/// accepting documents.
59#[cfg(feature = "native")]
60pub struct EagerParallelStoreWriter<'a> {
61    writer: &'a mut dyn Write,
62    block_buffer: Vec<u8>,
63    /// Compressed blocks ready to be written (may arrive out of order)
64    compressed_blocks: Vec<CompressedBlock>,
65    /// Handles for in-flight compression tasks
66    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
67    next_seq: usize,
68    next_doc_id: DocId,
69    block_first_doc: DocId,
70    dict: Option<Arc<CompressionDict>>,
71    compression_level: CompressionLevel,
72}
73
74#[cfg(feature = "native")]
75impl<'a> EagerParallelStoreWriter<'a> {
76    /// Create a new eager parallel store writer
77    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
78        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
79    }
80
81    /// Create with specific compression level
82    pub fn with_compression_level(
83        writer: &'a mut dyn Write,
84        _num_threads: usize,
85        compression_level: CompressionLevel,
86    ) -> Self {
87        Self {
88            writer,
89            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
90            compressed_blocks: Vec::new(),
91            pending_handles: Vec::new(),
92            next_seq: 0,
93            next_doc_id: 0,
94            block_first_doc: 0,
95            dict: None,
96            compression_level,
97        }
98    }
99
100    /// Create with dictionary
101    pub fn with_dict(
102        writer: &'a mut dyn Write,
103        dict: CompressionDict,
104        _num_threads: usize,
105    ) -> Self {
106        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
107    }
108
109    /// Create with dictionary and specific compression level
110    pub fn with_dict_and_level(
111        writer: &'a mut dyn Write,
112        dict: CompressionDict,
113        _num_threads: usize,
114        compression_level: CompressionLevel,
115    ) -> Self {
116        Self {
117            writer,
118            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
119            compressed_blocks: Vec::new(),
120            pending_handles: Vec::new(),
121            next_seq: 0,
122            next_doc_id: 0,
123            block_first_doc: 0,
124            dict: Some(Arc::new(dict)),
125            compression_level,
126        }
127    }
128
129    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
130        let doc_id = self.next_doc_id;
131        self.next_doc_id += 1;
132
133        let doc_bytes = serialize_document(doc, schema)?;
134
135        self.block_buffer
136            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
137        self.block_buffer.extend_from_slice(&doc_bytes);
138
139        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
140            self.spawn_compression();
141        }
142
143        Ok(doc_id)
144    }
145
146    /// Spawn compression for the current block immediately
147    fn spawn_compression(&mut self) {
148        if self.block_buffer.is_empty() {
149            return;
150        }
151
152        let num_docs = self.next_doc_id - self.block_first_doc;
153        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
154        let seq = self.next_seq;
155        let first_doc_id = self.block_first_doc;
156        let dict = self.dict.clone();
157
158        self.next_seq += 1;
159        self.block_first_doc = self.next_doc_id;
160
161        // Spawn compression task using thread
162        let level = self.compression_level;
163        let handle = std::thread::spawn(move || {
164            let compressed = if let Some(ref d) = dict {
165                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
166            } else {
167                crate::compression::compress(&data, level).expect("compression failed")
168            };
169
170            CompressedBlock {
171                seq,
172                first_doc_id,
173                num_docs,
174                compressed,
175            }
176        });
177
178        self.pending_handles.push(handle);
179    }
180
181    /// Collect any completed compression tasks
182    fn collect_completed(&mut self) {
183        let mut remaining = Vec::new();
184        for handle in self.pending_handles.drain(..) {
185            if handle.is_finished() {
186                if let Ok(block) = handle.join() {
187                    self.compressed_blocks.push(block);
188                }
189            } else {
190                remaining.push(handle);
191            }
192        }
193        self.pending_handles = remaining;
194    }
195
196    pub fn finish(mut self) -> io::Result<u32> {
197        // Spawn compression for any remaining data
198        self.spawn_compression();
199
200        // Collect any already-completed tasks
201        self.collect_completed();
202
203        // Wait for all remaining compression tasks
204        for handle in self.pending_handles.drain(..) {
205            if let Ok(block) = handle.join() {
206                self.compressed_blocks.push(block);
207            }
208        }
209
210        if self.compressed_blocks.is_empty() {
211            // Write empty store
212            let data_end_offset = 0u64;
213            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
214            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
215            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
216            self.writer.write_u32::<LittleEndian>(0)?; // num docs
217            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
218            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
219            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
220            return Ok(0);
221        }
222
223        // Sort by sequence to maintain order
224        self.compressed_blocks.sort_by_key(|b| b.seq);
225
226        // Write blocks in order and build index
227        let mut index = Vec::with_capacity(self.compressed_blocks.len());
228        let mut current_offset = 0u64;
229
230        for block in &self.compressed_blocks {
231            index.push(StoreBlockIndex {
232                first_doc_id: block.first_doc_id,
233                offset: current_offset,
234                length: block.compressed.len() as u32,
235                num_docs: block.num_docs,
236            });
237
238            self.writer.write_all(&block.compressed)?;
239            current_offset += block.compressed.len() as u64;
240        }
241
242        let data_end_offset = current_offset;
243
244        // Write dictionary if present
245        let dict_offset = if let Some(ref dict) = self.dict {
246            let offset = current_offset;
247            let dict_bytes = dict.as_bytes();
248            self.writer
249                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
250            self.writer.write_all(dict_bytes)?;
251            Some(offset)
252        } else {
253            None
254        };
255
256        // Write index
257        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
258        for entry in &index {
259            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
260            self.writer.write_u64::<LittleEndian>(entry.offset)?;
261            self.writer.write_u32::<LittleEndian>(entry.length)?;
262            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
263        }
264
265        // Write footer
266        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
267        self.writer
268            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
269        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
270        self.writer
271            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
272        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
273        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
274
275        Ok(self.next_doc_id)
276    }
277}
278
279/// Block index entry for document store
280#[derive(Debug, Clone)]
281struct StoreBlockIndex {
282    first_doc_id: DocId,
283    offset: u64,
284    length: u32,
285    num_docs: u32,
286}
287
288/// Async document store reader - loads blocks on demand
289pub struct AsyncStoreReader {
290    /// LazyFileSlice for the data portion - fetches ranges on demand
291    data_slice: LazyFileSlice,
292    /// Block index
293    index: Vec<StoreBlockIndex>,
294    num_docs: u32,
295    /// Optional compression dictionary
296    dict: Option<CompressionDict>,
297    /// Block cache
298    cache: RwLock<StoreBlockCache>,
299}
300
301struct StoreBlockCache {
302    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
303    access_order: Vec<DocId>,
304    max_blocks: usize,
305}
306
307impl StoreBlockCache {
308    fn new(max_blocks: usize) -> Self {
309        Self {
310            blocks: FxHashMap::default(),
311            access_order: Vec::new(),
312            max_blocks,
313        }
314    }
315
316    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
317        if let Some(block) = self.blocks.get(&first_doc_id) {
318            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
319                self.access_order.remove(pos);
320                self.access_order.push(first_doc_id);
321            }
322            Some(Arc::clone(block))
323        } else {
324            None
325        }
326    }
327
328    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
329        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
330            let evict = self.access_order.remove(0);
331            self.blocks.remove(&evict);
332        }
333        self.blocks.insert(first_doc_id, block);
334        self.access_order.push(first_doc_id);
335    }
336}
337
338impl AsyncStoreReader {
339    /// Open a document store from LazyFileHandle
340    /// Only loads footer and index into memory, data blocks are fetched on-demand
341    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
342        let file_len = file_handle.len();
343        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
344        if file_len < 32 {
345            return Err(io::Error::new(
346                io::ErrorKind::InvalidData,
347                "Store too small",
348            ));
349        }
350
351        // Read footer (32 bytes)
352        let footer = file_handle
353            .read_bytes_range(file_len - 32..file_len)
354            .await?;
355        let mut reader = footer.as_slice();
356        let data_end_offset = reader.read_u64::<LittleEndian>()?;
357        let dict_offset = reader.read_u64::<LittleEndian>()?;
358        let num_docs = reader.read_u32::<LittleEndian>()?;
359        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
360        let version = reader.read_u32::<LittleEndian>()?;
361        let magic = reader.read_u32::<LittleEndian>()?;
362
363        if magic != STORE_MAGIC {
364            return Err(io::Error::new(
365                io::ErrorKind::InvalidData,
366                "Invalid store magic",
367            ));
368        }
369        if version != STORE_VERSION {
370            return Err(io::Error::new(
371                io::ErrorKind::InvalidData,
372                format!("Unsupported store version: {}", version),
373            ));
374        }
375
376        // Load dictionary if present
377        let dict = if has_dict && dict_offset > 0 {
378            let dict_start = dict_offset;
379            let dict_len_bytes = file_handle
380                .read_bytes_range(dict_start..dict_start + 4)
381                .await?;
382            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
383            let dict_bytes = file_handle
384                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
385                .await?;
386            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
387        } else {
388            None
389        };
390
391        // Calculate index location
392        let index_start = if has_dict && dict_offset > 0 {
393            let dict_start = dict_offset;
394            let dict_len_bytes = file_handle
395                .read_bytes_range(dict_start..dict_start + 4)
396                .await?;
397            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
398            dict_start + 4 + dict_len
399        } else {
400            data_end_offset
401        };
402        let index_end = file_len - 32;
403
404        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
405        let mut reader = index_bytes.as_slice();
406
407        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
408        let mut index = Vec::with_capacity(num_blocks);
409
410        for _ in 0..num_blocks {
411            let first_doc_id = reader.read_u32::<LittleEndian>()?;
412            let offset = reader.read_u64::<LittleEndian>()?;
413            let length = reader.read_u32::<LittleEndian>()?;
414            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
415
416            index.push(StoreBlockIndex {
417                first_doc_id,
418                offset,
419                length,
420                num_docs: num_docs_in_block,
421            });
422        }
423
424        // Create lazy slice for data portion only
425        let data_slice = file_handle.slice(0..data_end_offset);
426
427        Ok(Self {
428            data_slice,
429            index,
430            num_docs,
431            dict,
432            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
433        })
434    }
435
436    /// Number of documents
437    pub fn num_docs(&self) -> u32 {
438        self.num_docs
439    }
440
441    /// Get a document by doc_id (async - may load block)
442    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
443        if doc_id >= self.num_docs {
444            return Ok(None);
445        }
446
447        // Find block containing this doc_id
448        let block_idx = self
449            .index
450            .binary_search_by(|entry| {
451                if doc_id < entry.first_doc_id {
452                    std::cmp::Ordering::Greater
453                } else if doc_id >= entry.first_doc_id + entry.num_docs {
454                    std::cmp::Ordering::Less
455                } else {
456                    std::cmp::Ordering::Equal
457                }
458            })
459            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
460
461        let entry = &self.index[block_idx];
462        let block_data = self.load_block(entry).await?;
463
464        // Find document within block
465        let doc_offset_in_block = doc_id - entry.first_doc_id;
466        let mut reader = &block_data[..];
467
468        for _ in 0..doc_offset_in_block {
469            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
470            if doc_len > reader.len() {
471                return Err(io::Error::new(
472                    io::ErrorKind::InvalidData,
473                    "Invalid doc length",
474                ));
475            }
476            reader = &reader[doc_len..];
477        }
478
479        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
480        let doc_bytes = &reader[..doc_len];
481
482        deserialize_document(doc_bytes, schema).map(Some)
483    }
484
485    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
486        // Check cache
487        {
488            let mut cache = self.cache.write();
489            if let Some(block) = cache.get(entry.first_doc_id) {
490                return Ok(block);
491            }
492        }
493
494        // Load from FileSlice
495        let start = entry.offset;
496        let end = start + entry.length as u64;
497        let compressed = self.data_slice.read_bytes_range(start..end).await?;
498
499        // Use dictionary decompression if available
500        let decompressed = if let Some(ref dict) = self.dict {
501            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
502        } else {
503            crate::compression::decompress(compressed.as_slice())?
504        };
505
506        let block = Arc::new(decompressed);
507
508        // Insert into cache
509        {
510            let mut cache = self.cache.write();
511            cache.insert(entry.first_doc_id, Arc::clone(&block));
512        }
513
514        Ok(block)
515    }
516}
517
518pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
519    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
520}
521
522/// Raw block info for store merging (without decompression)
523#[derive(Debug, Clone)]
524pub struct RawStoreBlock {
525    pub first_doc_id: DocId,
526    pub num_docs: u32,
527    pub offset: u64,
528    pub length: u32,
529}
530
531/// Store merger - concatenates compressed blocks from multiple stores without recompression
532///
533/// This is much faster than rebuilding stores since it avoids:
534/// - Decompressing blocks from source stores
535/// - Re-serializing documents
536/// - Re-compressing blocks at level 22
537///
538/// Limitations:
539/// - All source stores must NOT use dictionaries (or use the same dictionary)
540/// - Doc IDs are remapped sequentially
541pub struct StoreMerger<'a, W: Write> {
542    writer: &'a mut W,
543    index: Vec<StoreBlockIndex>,
544    current_offset: u64,
545    next_doc_id: DocId,
546}
547
548impl<'a, W: Write> StoreMerger<'a, W> {
549    pub fn new(writer: &'a mut W) -> Self {
550        Self {
551            writer,
552            index: Vec::new(),
553            current_offset: 0,
554            next_doc_id: 0,
555        }
556    }
557
558    /// Append raw compressed blocks from a store file
559    ///
560    /// `data_slice` should be the data portion of the store (before index/footer)
561    /// `blocks` contains the block metadata from the source store
562    pub async fn append_store<F: AsyncFileRead>(
563        &mut self,
564        data_slice: &F,
565        blocks: &[RawStoreBlock],
566    ) -> io::Result<()> {
567        for block in blocks {
568            // Read raw compressed block data
569            let start = block.offset;
570            let end = start + block.length as u64;
571            let compressed_data = data_slice.read_bytes_range(start..end).await?;
572
573            // Write to output
574            self.writer.write_all(compressed_data.as_slice())?;
575
576            // Add to index with remapped doc IDs
577            self.index.push(StoreBlockIndex {
578                first_doc_id: self.next_doc_id,
579                offset: self.current_offset,
580                length: block.length,
581                num_docs: block.num_docs,
582            });
583
584            self.current_offset += block.length as u64;
585            self.next_doc_id += block.num_docs;
586        }
587
588        Ok(())
589    }
590
591    /// Finish writing the merged store
592    pub fn finish(self) -> io::Result<u32> {
593        let data_end_offset = self.current_offset;
594
595        // No dictionary support for merged stores (would need same dict across all sources)
596        let dict_offset = 0u64;
597
598        // Write index
599        self.writer
600            .write_u32::<LittleEndian>(self.index.len() as u32)?;
601        for entry in &self.index {
602            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
603            self.writer.write_u64::<LittleEndian>(entry.offset)?;
604            self.writer.write_u32::<LittleEndian>(entry.length)?;
605            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
606        }
607
608        // Write footer
609        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
610        self.writer.write_u64::<LittleEndian>(dict_offset)?;
611        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
612        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
613        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
614        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
615
616        Ok(self.next_doc_id)
617    }
618}
619
620impl AsyncStoreReader {
621    /// Get raw block metadata for merging (without loading block data)
622    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
623        self.index
624            .iter()
625            .map(|entry| RawStoreBlock {
626                first_doc_id: entry.first_doc_id,
627                num_docs: entry.num_docs,
628                offset: entry.offset,
629                length: entry.length,
630            })
631            .collect()
632    }
633
634    /// Get the data slice for raw block access
635    pub fn data_slice(&self) -> &LazyFileSlice {
636        &self.data_slice
637    }
638
639    /// Check if this store uses a dictionary (incompatible with raw merging)
640    pub fn has_dict(&self) -> bool {
641        self.dict.is_some()
642    }
643}