Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (256KB for better compression)
29/// Larger blocks = better compression ratio but more memory per block load
30pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32/// Default dictionary size (64KB is a good balance)
33pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35/// Default compression level for document store
36#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
38
39/// Binary document format:
40///   num_fields: u16
41///   per field: field_id: u16, type_tag: u8, value data
42///     0=Text:         len:u32 + utf8
43///     1=U64:          u64 LE
44///     2=I64:          i64 LE
45///     3=F64:          f64 LE
46///     4=Bytes:        len:u32 + raw
47///     5=SparseVector: count:u32 + count*(u32+f32)
48///     6=DenseVector:  count:u32 + count*f32
49///     7=Json:         len:u32 + json utf8
50pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
51    use crate::dsl::FieldValue;
52
53    let stored: Vec<_> = doc
54        .field_values()
55        .iter()
56        .filter(|(field, value)| {
57            // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
58            if matches!(value, crate::dsl::FieldValue::DenseVector(_)) {
59                return false;
60            }
61            schema.get_field_entry(*field).is_some_and(|e| e.stored)
62        })
63        .collect();
64
65    let mut buf = Vec::with_capacity(256);
66    buf.write_u16::<LittleEndian>(stored.len() as u16)?;
67
68    for (field, value) in &stored {
69        buf.write_u16::<LittleEndian>(field.0 as u16)?;
70        match value {
71            FieldValue::Text(s) => {
72                buf.push(0);
73                let bytes = s.as_bytes();
74                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
75                buf.extend_from_slice(bytes);
76            }
77            FieldValue::U64(v) => {
78                buf.push(1);
79                buf.write_u64::<LittleEndian>(*v)?;
80            }
81            FieldValue::I64(v) => {
82                buf.push(2);
83                buf.write_i64::<LittleEndian>(*v)?;
84            }
85            FieldValue::F64(v) => {
86                buf.push(3);
87                buf.write_f64::<LittleEndian>(*v)?;
88            }
89            FieldValue::Bytes(b) => {
90                buf.push(4);
91                buf.write_u32::<LittleEndian>(b.len() as u32)?;
92                buf.extend_from_slice(b);
93            }
94            FieldValue::SparseVector(entries) => {
95                buf.push(5);
96                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
97                for (idx, val) in entries {
98                    buf.write_u32::<LittleEndian>(*idx)?;
99                    buf.write_f32::<LittleEndian>(*val)?;
100                }
101            }
102            FieldValue::DenseVector(values) => {
103                buf.push(6);
104                buf.write_u32::<LittleEndian>(values.len() as u32)?;
105                // Write raw f32 bytes directly
106                let byte_slice = unsafe {
107                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
108                };
109                buf.extend_from_slice(byte_slice);
110            }
111            FieldValue::Json(v) => {
112                buf.push(7);
113                let json_bytes = serde_json::to_vec(v)
114                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
115                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
116                buf.extend_from_slice(&json_bytes);
117            }
118        }
119    }
120
121    Ok(buf)
122}
123
124/// Compressed block result
125#[cfg(feature = "native")]
126struct CompressedBlock {
127    seq: usize,
128    first_doc_id: DocId,
129    num_docs: u32,
130    compressed: Vec<u8>,
131}
132
133/// Parallel document store writer - compresses blocks immediately when queued
134///
135/// Spawns compression tasks as soon as blocks are ready, overlapping document
136/// ingestion with compression to reduce total indexing time.
137///
138/// Uses background threads to compress blocks while the main thread continues
139/// accepting documents.
140#[cfg(feature = "native")]
141pub struct EagerParallelStoreWriter<'a> {
142    writer: &'a mut dyn Write,
143    block_buffer: Vec<u8>,
144    /// Compressed blocks ready to be written (may arrive out of order)
145    compressed_blocks: Vec<CompressedBlock>,
146    /// Handles for in-flight compression tasks
147    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
148    next_seq: usize,
149    next_doc_id: DocId,
150    block_first_doc: DocId,
151    dict: Option<Arc<CompressionDict>>,
152    compression_level: CompressionLevel,
153}
154
155#[cfg(feature = "native")]
156impl<'a> EagerParallelStoreWriter<'a> {
157    /// Create a new eager parallel store writer
158    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
159        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
160    }
161
162    /// Create with specific compression level
163    pub fn with_compression_level(
164        writer: &'a mut dyn Write,
165        _num_threads: usize,
166        compression_level: CompressionLevel,
167    ) -> Self {
168        Self {
169            writer,
170            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
171            compressed_blocks: Vec::new(),
172            pending_handles: Vec::new(),
173            next_seq: 0,
174            next_doc_id: 0,
175            block_first_doc: 0,
176            dict: None,
177            compression_level,
178        }
179    }
180
181    /// Create with dictionary
182    pub fn with_dict(
183        writer: &'a mut dyn Write,
184        dict: CompressionDict,
185        _num_threads: usize,
186    ) -> Self {
187        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
188    }
189
190    /// Create with dictionary and specific compression level
191    pub fn with_dict_and_level(
192        writer: &'a mut dyn Write,
193        dict: CompressionDict,
194        _num_threads: usize,
195        compression_level: CompressionLevel,
196    ) -> Self {
197        Self {
198            writer,
199            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
200            compressed_blocks: Vec::new(),
201            pending_handles: Vec::new(),
202            next_seq: 0,
203            next_doc_id: 0,
204            block_first_doc: 0,
205            dict: Some(Arc::new(dict)),
206            compression_level,
207        }
208    }
209
210    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
211        let doc_id = self.next_doc_id;
212        self.next_doc_id += 1;
213
214        let doc_bytes = serialize_document(doc, schema)?;
215
216        self.block_buffer
217            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
218        self.block_buffer.extend_from_slice(&doc_bytes);
219
220        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
221            self.spawn_compression();
222        }
223
224        Ok(doc_id)
225    }
226
227    /// Spawn compression for the current block immediately
228    fn spawn_compression(&mut self) {
229        if self.block_buffer.is_empty() {
230            return;
231        }
232
233        let num_docs = self.next_doc_id - self.block_first_doc;
234        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
235        let seq = self.next_seq;
236        let first_doc_id = self.block_first_doc;
237        let dict = self.dict.clone();
238
239        self.next_seq += 1;
240        self.block_first_doc = self.next_doc_id;
241
242        // Spawn compression task using thread
243        let level = self.compression_level;
244        let handle = std::thread::spawn(move || {
245            let compressed = if let Some(ref d) = dict {
246                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
247            } else {
248                crate::compression::compress(&data, level).expect("compression failed")
249            };
250
251            CompressedBlock {
252                seq,
253                first_doc_id,
254                num_docs,
255                compressed,
256            }
257        });
258
259        self.pending_handles.push(handle);
260    }
261
262    /// Collect any completed compression tasks
263    fn collect_completed(&mut self) {
264        let mut remaining = Vec::new();
265        for handle in self.pending_handles.drain(..) {
266            if handle.is_finished() {
267                if let Ok(block) = handle.join() {
268                    self.compressed_blocks.push(block);
269                }
270            } else {
271                remaining.push(handle);
272            }
273        }
274        self.pending_handles = remaining;
275    }
276
277    pub fn finish(mut self) -> io::Result<u32> {
278        // Spawn compression for any remaining data
279        self.spawn_compression();
280
281        // Collect any already-completed tasks
282        self.collect_completed();
283
284        // Wait for all remaining compression tasks
285        for handle in self.pending_handles.drain(..) {
286            if let Ok(block) = handle.join() {
287                self.compressed_blocks.push(block);
288            }
289        }
290
291        if self.compressed_blocks.is_empty() {
292            // Write empty store
293            let data_end_offset = 0u64;
294            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
295            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
296            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
297            self.writer.write_u32::<LittleEndian>(0)?; // num docs
298            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
299            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
300            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
301            return Ok(0);
302        }
303
304        // Sort by sequence to maintain order
305        self.compressed_blocks.sort_by_key(|b| b.seq);
306
307        // Write blocks in order and build index
308        let mut index = Vec::with_capacity(self.compressed_blocks.len());
309        let mut current_offset = 0u64;
310
311        for block in &self.compressed_blocks {
312            index.push(StoreBlockIndex {
313                first_doc_id: block.first_doc_id,
314                offset: current_offset,
315                length: block.compressed.len() as u32,
316                num_docs: block.num_docs,
317            });
318
319            self.writer.write_all(&block.compressed)?;
320            current_offset += block.compressed.len() as u64;
321        }
322
323        let data_end_offset = current_offset;
324
325        // Write dictionary if present
326        let dict_offset = if let Some(ref dict) = self.dict {
327            let offset = current_offset;
328            let dict_bytes = dict.as_bytes();
329            self.writer
330                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
331            self.writer.write_all(dict_bytes)?;
332            Some(offset)
333        } else {
334            None
335        };
336
337        // Write index
338        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
339        for entry in &index {
340            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
341            self.writer.write_u64::<LittleEndian>(entry.offset)?;
342            self.writer.write_u32::<LittleEndian>(entry.length)?;
343            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
344        }
345
346        // Write footer
347        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
348        self.writer
349            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
350        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
351        self.writer
352            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
353        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
354        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
355
356        Ok(self.next_doc_id)
357    }
358}
359
360/// Block index entry for document store
361#[derive(Debug, Clone)]
362pub(crate) struct StoreBlockIndex {
363    pub(crate) first_doc_id: DocId,
364    pub(crate) offset: u64,
365    pub(crate) length: u32,
366    pub(crate) num_docs: u32,
367}
368
369/// Async document store reader - loads blocks on demand
370pub struct AsyncStoreReader {
371    /// LazyFileSlice for the data portion - fetches ranges on demand
372    data_slice: LazyFileSlice,
373    /// Block index
374    index: Vec<StoreBlockIndex>,
375    num_docs: u32,
376    /// Optional compression dictionary
377    dict: Option<CompressionDict>,
378    /// Block cache
379    cache: RwLock<StoreBlockCache>,
380}
381
382/// FIFO block cache — O(1) lookup, insert, and eviction.
383///
384/// Uses VecDeque for eviction order (pop_front = O(1)) instead of
385/// Vec::remove(0) which is O(n). get() is &self for read-lock compatibility.
386struct StoreBlockCache {
387    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
388    insert_order: std::collections::VecDeque<DocId>,
389    max_blocks: usize,
390}
391
392impl StoreBlockCache {
393    fn new(max_blocks: usize) -> Self {
394        Self {
395            blocks: FxHashMap::default(),
396            insert_order: std::collections::VecDeque::with_capacity(max_blocks),
397            max_blocks,
398        }
399    }
400
401    fn get(&self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
402        self.blocks.get(&first_doc_id).map(Arc::clone)
403    }
404
405    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
406        if self.blocks.contains_key(&first_doc_id) {
407            return; // already cached
408        }
409        while self.blocks.len() >= self.max_blocks {
410            if let Some(evict) = self.insert_order.pop_front() {
411                self.blocks.remove(&evict);
412            } else {
413                break;
414            }
415        }
416        self.blocks.insert(first_doc_id, block);
417        self.insert_order.push_back(first_doc_id);
418    }
419}
420
421impl AsyncStoreReader {
422    /// Open a document store from LazyFileHandle
423    /// Only loads footer and index into memory, data blocks are fetched on-demand
424    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
425        let file_len = file_handle.len();
426        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
427        if file_len < 32 {
428            return Err(io::Error::new(
429                io::ErrorKind::InvalidData,
430                "Store too small",
431            ));
432        }
433
434        // Read footer (32 bytes)
435        let footer = file_handle
436            .read_bytes_range(file_len - 32..file_len)
437            .await?;
438        let mut reader = footer.as_slice();
439        let data_end_offset = reader.read_u64::<LittleEndian>()?;
440        let dict_offset = reader.read_u64::<LittleEndian>()?;
441        let num_docs = reader.read_u32::<LittleEndian>()?;
442        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
443        let version = reader.read_u32::<LittleEndian>()?;
444        let magic = reader.read_u32::<LittleEndian>()?;
445
446        if magic != STORE_MAGIC {
447            return Err(io::Error::new(
448                io::ErrorKind::InvalidData,
449                "Invalid store magic",
450            ));
451        }
452        if version != STORE_VERSION {
453            return Err(io::Error::new(
454                io::ErrorKind::InvalidData,
455                format!("Unsupported store version: {}", version),
456            ));
457        }
458
459        // Load dictionary if present
460        let dict = if has_dict && dict_offset > 0 {
461            let dict_start = dict_offset;
462            let dict_len_bytes = file_handle
463                .read_bytes_range(dict_start..dict_start + 4)
464                .await?;
465            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
466            let dict_bytes = file_handle
467                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
468                .await?;
469            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
470        } else {
471            None
472        };
473
474        // Calculate index location
475        let index_start = if has_dict && dict_offset > 0 {
476            let dict_start = dict_offset;
477            let dict_len_bytes = file_handle
478                .read_bytes_range(dict_start..dict_start + 4)
479                .await?;
480            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
481            dict_start + 4 + dict_len
482        } else {
483            data_end_offset
484        };
485        let index_end = file_len - 32;
486
487        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
488        let mut reader = index_bytes.as_slice();
489
490        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
491        let mut index = Vec::with_capacity(num_blocks);
492
493        for _ in 0..num_blocks {
494            let first_doc_id = reader.read_u32::<LittleEndian>()?;
495            let offset = reader.read_u64::<LittleEndian>()?;
496            let length = reader.read_u32::<LittleEndian>()?;
497            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
498
499            index.push(StoreBlockIndex {
500                first_doc_id,
501                offset,
502                length,
503                num_docs: num_docs_in_block,
504            });
505        }
506
507        // Create lazy slice for data portion only
508        let data_slice = file_handle.slice(0..data_end_offset);
509
510        Ok(Self {
511            data_slice,
512            index,
513            num_docs,
514            dict,
515            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
516        })
517    }
518
519    /// Number of documents
520    pub fn num_docs(&self) -> u32 {
521        self.num_docs
522    }
523
524    /// Number of blocks currently in the cache
525    pub fn cached_blocks(&self) -> usize {
526        self.cache.read().blocks.len()
527    }
528
529    /// Get a document by doc_id (async - may load block)
530    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
531        if doc_id >= self.num_docs {
532            return Ok(None);
533        }
534
535        // Find block containing this doc_id
536        let block_idx = self
537            .index
538            .binary_search_by(|entry| {
539                if doc_id < entry.first_doc_id {
540                    std::cmp::Ordering::Greater
541                } else if doc_id >= entry.first_doc_id + entry.num_docs {
542                    std::cmp::Ordering::Less
543                } else {
544                    std::cmp::Ordering::Equal
545                }
546            })
547            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
548
549        let entry = &self.index[block_idx];
550        let block_data = self.load_block(entry).await?;
551
552        // Find document within block
553        let doc_offset_in_block = doc_id - entry.first_doc_id;
554        let mut reader = &block_data[..];
555
556        for _ in 0..doc_offset_in_block {
557            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
558            if doc_len > reader.len() {
559                return Err(io::Error::new(
560                    io::ErrorKind::InvalidData,
561                    "Invalid doc length",
562                ));
563            }
564            reader = &reader[doc_len..];
565        }
566
567        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
568        let doc_bytes = &reader[..doc_len];
569
570        deserialize_document(doc_bytes, schema).map(Some)
571    }
572
573    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
574        // Check cache first (read lock — concurrent cache hits don't serialize)
575        {
576            if let Some(block) = self.cache.read().get(entry.first_doc_id) {
577                return Ok(block);
578            }
579        }
580
581        // Load from FileSlice
582        let start = entry.offset;
583        let end = start + entry.length as u64;
584        let compressed = self.data_slice.read_bytes_range(start..end).await?;
585
586        // Use dictionary decompression if available
587        let decompressed = if let Some(ref dict) = self.dict {
588            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
589        } else {
590            crate::compression::decompress(compressed.as_slice())?
591        };
592
593        let block = Arc::new(decompressed);
594
595        // Insert into cache
596        {
597            let mut cache = self.cache.write();
598            cache.insert(entry.first_doc_id, Arc::clone(&block));
599        }
600
601        Ok(block)
602    }
603}
604
605pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
606    use crate::dsl::Field;
607
608    let mut reader = data;
609    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
610    let mut doc = Document::new();
611
612    for _ in 0..num_fields {
613        let field_id = reader.read_u16::<LittleEndian>()?;
614        let field = Field(field_id as u32);
615        let type_tag = reader.read_u8()?;
616
617        match type_tag {
618            0 => {
619                // Text
620                let len = reader.read_u32::<LittleEndian>()? as usize;
621                let s = std::str::from_utf8(&reader[..len])
622                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
623                doc.add_text(field, s);
624                reader = &reader[len..];
625            }
626            1 => {
627                // U64
628                doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
629            }
630            2 => {
631                // I64
632                doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
633            }
634            3 => {
635                // F64
636                doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
637            }
638            4 => {
639                // Bytes
640                let len = reader.read_u32::<LittleEndian>()? as usize;
641                doc.add_bytes(field, reader[..len].to_vec());
642                reader = &reader[len..];
643            }
644            5 => {
645                // SparseVector
646                let count = reader.read_u32::<LittleEndian>()? as usize;
647                let mut entries = Vec::with_capacity(count);
648                for _ in 0..count {
649                    let idx = reader.read_u32::<LittleEndian>()?;
650                    let val = reader.read_f32::<LittleEndian>()?;
651                    entries.push((idx, val));
652                }
653                doc.add_sparse_vector(field, entries);
654            }
655            6 => {
656                // DenseVector
657                let count = reader.read_u32::<LittleEndian>()? as usize;
658                let byte_len = count * 4;
659                if reader.len() < byte_len {
660                    return Err(io::Error::new(
661                        io::ErrorKind::UnexpectedEof,
662                        format!(
663                            "dense vector field {}: need {} bytes but only {} remain",
664                            field.0,
665                            byte_len,
666                            reader.len()
667                        ),
668                    ));
669                }
670                let mut values = vec![0.0f32; count];
671                // Read raw f32 bytes directly
672                unsafe {
673                    std::ptr::copy_nonoverlapping(
674                        reader.as_ptr(),
675                        values.as_mut_ptr() as *mut u8,
676                        byte_len,
677                    );
678                }
679                reader = &reader[byte_len..];
680                doc.add_dense_vector(field, values);
681            }
682            7 => {
683                // Json
684                let len = reader.read_u32::<LittleEndian>()? as usize;
685                let v: serde_json::Value = serde_json::from_slice(&reader[..len])
686                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
687                doc.add_json(field, v);
688                reader = &reader[len..];
689            }
690            _ => {
691                return Err(io::Error::new(
692                    io::ErrorKind::InvalidData,
693                    format!("Unknown field type tag: {}", type_tag),
694                ));
695            }
696        }
697    }
698
699    Ok(doc)
700}
701
702/// Raw block info for store merging (without decompression)
703#[derive(Debug, Clone)]
704pub struct RawStoreBlock {
705    pub first_doc_id: DocId,
706    pub num_docs: u32,
707    pub offset: u64,
708    pub length: u32,
709}
710
711/// Store merger - concatenates compressed blocks from multiple stores without recompression
712///
713/// This is much faster than rebuilding stores since it avoids:
714/// - Decompressing blocks from source stores
715/// - Re-serializing documents
716/// - Re-compressing blocks at level 22
717///
718/// Limitations:
719/// - All source stores must NOT use dictionaries (or use the same dictionary)
720/// - Doc IDs are remapped sequentially
721pub struct StoreMerger<'a, W: Write> {
722    writer: &'a mut W,
723    index: Vec<StoreBlockIndex>,
724    current_offset: u64,
725    next_doc_id: DocId,
726}
727
728impl<'a, W: Write> StoreMerger<'a, W> {
729    pub fn new(writer: &'a mut W) -> Self {
730        Self {
731            writer,
732            index: Vec::new(),
733            current_offset: 0,
734            next_doc_id: 0,
735        }
736    }
737
738    /// Append raw compressed blocks from a store file
739    ///
740    /// `data_slice` should be the data portion of the store (before index/footer)
741    /// `blocks` contains the block metadata from the source store
742    pub async fn append_store<F: AsyncFileRead>(
743        &mut self,
744        data_slice: &F,
745        blocks: &[RawStoreBlock],
746    ) -> io::Result<()> {
747        for block in blocks {
748            // Read raw compressed block data
749            let start = block.offset;
750            let end = start + block.length as u64;
751            let compressed_data = data_slice.read_bytes_range(start..end).await?;
752
753            // Write to output
754            self.writer.write_all(compressed_data.as_slice())?;
755
756            // Add to index with remapped doc IDs
757            self.index.push(StoreBlockIndex {
758                first_doc_id: self.next_doc_id,
759                offset: self.current_offset,
760                length: block.length,
761                num_docs: block.num_docs,
762            });
763
764            self.current_offset += block.length as u64;
765            self.next_doc_id += block.num_docs;
766        }
767
768        Ok(())
769    }
770
771    /// Append blocks from a dict-compressed store by decompressing and recompressing.
772    ///
773    /// For stores that use dictionary compression, raw blocks can't be stacked
774    /// directly because the decompressor needs the original dictionary.
775    /// This method decompresses each block with the source dict, then
776    /// recompresses without a dictionary so the merged output is self-contained.
777    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
778        let dict = store.dict();
779        let data_slice = store.data_slice();
780        let blocks = store.block_index();
781
782        for block in blocks {
783            let start = block.offset;
784            let end = start + block.length as u64;
785            let compressed = data_slice.read_bytes_range(start..end).await?;
786
787            // Decompress with source dict (or without if no dict)
788            let decompressed = if let Some(d) = dict {
789                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
790            } else {
791                crate::compression::decompress(compressed.as_slice())?
792            };
793
794            // Recompress without dictionary
795            let recompressed = crate::compression::compress(
796                &decompressed,
797                crate::compression::CompressionLevel::default(),
798            )?;
799
800            self.writer.write_all(&recompressed)?;
801
802            self.index.push(StoreBlockIndex {
803                first_doc_id: self.next_doc_id,
804                offset: self.current_offset,
805                length: recompressed.len() as u32,
806                num_docs: block.num_docs,
807            });
808
809            self.current_offset += recompressed.len() as u64;
810            self.next_doc_id += block.num_docs;
811        }
812
813        Ok(())
814    }
815
816    /// Finish writing the merged store
817    pub fn finish(self) -> io::Result<u32> {
818        let data_end_offset = self.current_offset;
819
820        // No dictionary support for merged stores (would need same dict across all sources)
821        let dict_offset = 0u64;
822
823        // Write index
824        self.writer
825            .write_u32::<LittleEndian>(self.index.len() as u32)?;
826        for entry in &self.index {
827            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
828            self.writer.write_u64::<LittleEndian>(entry.offset)?;
829            self.writer.write_u32::<LittleEndian>(entry.length)?;
830            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
831        }
832
833        // Write footer
834        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
835        self.writer.write_u64::<LittleEndian>(dict_offset)?;
836        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
837        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
838        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
839        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
840
841        Ok(self.next_doc_id)
842    }
843}
844
845impl AsyncStoreReader {
846    /// Get raw block metadata for merging (without loading block data)
847    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
848        self.index
849            .iter()
850            .map(|entry| RawStoreBlock {
851                first_doc_id: entry.first_doc_id,
852                num_docs: entry.num_docs,
853                offset: entry.offset,
854                length: entry.length,
855            })
856            .collect()
857    }
858
859    /// Get the data slice for raw block access
860    pub fn data_slice(&self) -> &LazyFileSlice {
861        &self.data_slice
862    }
863
864    /// Check if this store uses a dictionary (incompatible with raw merging)
865    pub fn has_dict(&self) -> bool {
866        self.dict.is_some()
867    }
868
869    /// Get the decompression dictionary (if any)
870    pub fn dict(&self) -> Option<&CompressionDict> {
871        self.dict.as_ref()
872    }
873
874    /// Get block index for iteration
875    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
876        &self.index
877    }
878}