Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (256KB for better compression)
29/// Larger blocks = better compression ratio but more memory per block load
30pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32/// Default dictionary size (64KB is a good balance)
33pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35/// Default compression level for document store
36#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
38
39/// Binary document format:
40///   num_fields: u16
41///   per field: field_id: u16, type_tag: u8, value data
42///     0=Text:         len:u32 + utf8
43///     1=U64:          u64 LE
44///     2=I64:          i64 LE
45///     3=F64:          f64 LE
46///     4=Bytes:        len:u32 + raw
47///     5=SparseVector: count:u32 + count*(u32+f32)
48///     6=DenseVector:  count:u32 + count*f32
49///     7=Json:         len:u32 + json utf8
50pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
51    use crate::dsl::FieldValue;
52
53    let stored: Vec<_> = doc
54        .field_values()
55        .iter()
56        .filter(|(field, value)| {
57            // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
58            if matches!(value, crate::dsl::FieldValue::DenseVector(_)) {
59                return false;
60            }
61            schema.get_field_entry(*field).is_some_and(|e| e.stored)
62        })
63        .collect();
64
65    let mut buf = Vec::with_capacity(256);
66    buf.write_u16::<LittleEndian>(stored.len() as u16)?;
67
68    for (field, value) in &stored {
69        buf.write_u16::<LittleEndian>(field.0 as u16)?;
70        match value {
71            FieldValue::Text(s) => {
72                buf.push(0);
73                let bytes = s.as_bytes();
74                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
75                buf.extend_from_slice(bytes);
76            }
77            FieldValue::U64(v) => {
78                buf.push(1);
79                buf.write_u64::<LittleEndian>(*v)?;
80            }
81            FieldValue::I64(v) => {
82                buf.push(2);
83                buf.write_i64::<LittleEndian>(*v)?;
84            }
85            FieldValue::F64(v) => {
86                buf.push(3);
87                buf.write_f64::<LittleEndian>(*v)?;
88            }
89            FieldValue::Bytes(b) => {
90                buf.push(4);
91                buf.write_u32::<LittleEndian>(b.len() as u32)?;
92                buf.extend_from_slice(b);
93            }
94            FieldValue::SparseVector(entries) => {
95                buf.push(5);
96                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
97                for (idx, val) in entries {
98                    buf.write_u32::<LittleEndian>(*idx)?;
99                    buf.write_f32::<LittleEndian>(*val)?;
100                }
101            }
102            FieldValue::DenseVector(values) => {
103                buf.push(6);
104                buf.write_u32::<LittleEndian>(values.len() as u32)?;
105                // Write raw f32 bytes directly
106                let byte_slice = unsafe {
107                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
108                };
109                buf.extend_from_slice(byte_slice);
110            }
111            FieldValue::Json(v) => {
112                buf.push(7);
113                let json_bytes = serde_json::to_vec(v)
114                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
115                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
116                buf.extend_from_slice(&json_bytes);
117            }
118        }
119    }
120
121    Ok(buf)
122}
123
124/// Compressed block result
125#[cfg(feature = "native")]
126struct CompressedBlock {
127    seq: usize,
128    first_doc_id: DocId,
129    num_docs: u32,
130    compressed: Vec<u8>,
131}
132
133/// Parallel document store writer - compresses blocks immediately when queued
134///
135/// Spawns compression tasks as soon as blocks are ready, overlapping document
136/// ingestion with compression to reduce total indexing time.
137///
138/// Uses background threads to compress blocks while the main thread continues
139/// accepting documents.
140#[cfg(feature = "native")]
141pub struct EagerParallelStoreWriter<'a> {
142    writer: &'a mut dyn Write,
143    block_buffer: Vec<u8>,
144    /// Compressed blocks ready to be written (may arrive out of order)
145    compressed_blocks: Vec<CompressedBlock>,
146    /// Handles for in-flight compression tasks
147    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
148    next_seq: usize,
149    next_doc_id: DocId,
150    block_first_doc: DocId,
151    dict: Option<Arc<CompressionDict>>,
152    compression_level: CompressionLevel,
153}
154
155#[cfg(feature = "native")]
156impl<'a> EagerParallelStoreWriter<'a> {
157    /// Create a new eager parallel store writer
158    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
159        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
160    }
161
162    /// Create with specific compression level
163    pub fn with_compression_level(
164        writer: &'a mut dyn Write,
165        _num_threads: usize,
166        compression_level: CompressionLevel,
167    ) -> Self {
168        Self {
169            writer,
170            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
171            compressed_blocks: Vec::new(),
172            pending_handles: Vec::new(),
173            next_seq: 0,
174            next_doc_id: 0,
175            block_first_doc: 0,
176            dict: None,
177            compression_level,
178        }
179    }
180
181    /// Create with dictionary
182    pub fn with_dict(
183        writer: &'a mut dyn Write,
184        dict: CompressionDict,
185        _num_threads: usize,
186    ) -> Self {
187        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
188    }
189
190    /// Create with dictionary and specific compression level
191    pub fn with_dict_and_level(
192        writer: &'a mut dyn Write,
193        dict: CompressionDict,
194        _num_threads: usize,
195        compression_level: CompressionLevel,
196    ) -> Self {
197        Self {
198            writer,
199            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
200            compressed_blocks: Vec::new(),
201            pending_handles: Vec::new(),
202            next_seq: 0,
203            next_doc_id: 0,
204            block_first_doc: 0,
205            dict: Some(Arc::new(dict)),
206            compression_level,
207        }
208    }
209
210    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
211        let doc_id = self.next_doc_id;
212        self.next_doc_id += 1;
213
214        let doc_bytes = serialize_document(doc, schema)?;
215
216        self.block_buffer
217            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
218        self.block_buffer.extend_from_slice(&doc_bytes);
219
220        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
221            self.spawn_compression();
222        }
223
224        Ok(doc_id)
225    }
226
227    /// Spawn compression for the current block immediately
228    fn spawn_compression(&mut self) {
229        if self.block_buffer.is_empty() {
230            return;
231        }
232
233        let num_docs = self.next_doc_id - self.block_first_doc;
234        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
235        let seq = self.next_seq;
236        let first_doc_id = self.block_first_doc;
237        let dict = self.dict.clone();
238
239        self.next_seq += 1;
240        self.block_first_doc = self.next_doc_id;
241
242        // Spawn compression task using thread
243        let level = self.compression_level;
244        let handle = std::thread::spawn(move || {
245            let compressed = if let Some(ref d) = dict {
246                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
247            } else {
248                crate::compression::compress(&data, level).expect("compression failed")
249            };
250
251            CompressedBlock {
252                seq,
253                first_doc_id,
254                num_docs,
255                compressed,
256            }
257        });
258
259        self.pending_handles.push(handle);
260    }
261
262    /// Collect any completed compression tasks
263    fn collect_completed(&mut self) {
264        let mut remaining = Vec::new();
265        for handle in self.pending_handles.drain(..) {
266            if handle.is_finished() {
267                if let Ok(block) = handle.join() {
268                    self.compressed_blocks.push(block);
269                }
270            } else {
271                remaining.push(handle);
272            }
273        }
274        self.pending_handles = remaining;
275    }
276
277    pub fn finish(mut self) -> io::Result<u32> {
278        // Spawn compression for any remaining data
279        self.spawn_compression();
280
281        // Collect any already-completed tasks
282        self.collect_completed();
283
284        // Wait for all remaining compression tasks
285        for handle in self.pending_handles.drain(..) {
286            if let Ok(block) = handle.join() {
287                self.compressed_blocks.push(block);
288            }
289        }
290
291        if self.compressed_blocks.is_empty() {
292            // Write empty store
293            let data_end_offset = 0u64;
294            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
295            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
296            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
297            self.writer.write_u32::<LittleEndian>(0)?; // num docs
298            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
299            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
300            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
301            return Ok(0);
302        }
303
304        // Sort by sequence to maintain order
305        self.compressed_blocks.sort_by_key(|b| b.seq);
306
307        // Write blocks in order and build index
308        let mut index = Vec::with_capacity(self.compressed_blocks.len());
309        let mut current_offset = 0u64;
310
311        for block in &self.compressed_blocks {
312            index.push(StoreBlockIndex {
313                first_doc_id: block.first_doc_id,
314                offset: current_offset,
315                length: block.compressed.len() as u32,
316                num_docs: block.num_docs,
317            });
318
319            self.writer.write_all(&block.compressed)?;
320            current_offset += block.compressed.len() as u64;
321        }
322
323        let data_end_offset = current_offset;
324
325        // Write dictionary if present
326        let dict_offset = if let Some(ref dict) = self.dict {
327            let offset = current_offset;
328            let dict_bytes = dict.as_bytes();
329            self.writer
330                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
331            self.writer.write_all(dict_bytes)?;
332            Some(offset)
333        } else {
334            None
335        };
336
337        // Write index
338        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
339        for entry in &index {
340            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
341            self.writer.write_u64::<LittleEndian>(entry.offset)?;
342            self.writer.write_u32::<LittleEndian>(entry.length)?;
343            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
344        }
345
346        // Write footer
347        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
348        self.writer
349            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
350        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
351        self.writer
352            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
353        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
354        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
355
356        Ok(self.next_doc_id)
357    }
358}
359
360/// Block index entry for document store
361#[derive(Debug, Clone)]
362pub(crate) struct StoreBlockIndex {
363    pub(crate) first_doc_id: DocId,
364    pub(crate) offset: u64,
365    pub(crate) length: u32,
366    pub(crate) num_docs: u32,
367}
368
369/// Async document store reader - loads blocks on demand
370pub struct AsyncStoreReader {
371    /// LazyFileSlice for the data portion - fetches ranges on demand
372    data_slice: LazyFileSlice,
373    /// Block index
374    index: Vec<StoreBlockIndex>,
375    num_docs: u32,
376    /// Optional compression dictionary
377    dict: Option<CompressionDict>,
378    /// Block cache
379    cache: RwLock<StoreBlockCache>,
380}
381
382struct StoreBlockCache {
383    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
384    access_order: Vec<DocId>,
385    max_blocks: usize,
386}
387
388impl StoreBlockCache {
389    fn new(max_blocks: usize) -> Self {
390        Self {
391            blocks: FxHashMap::default(),
392            access_order: Vec::new(),
393            max_blocks,
394        }
395    }
396
397    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
398        if let Some(block) = self.blocks.get(&first_doc_id) {
399            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
400                self.access_order.remove(pos);
401                self.access_order.push(first_doc_id);
402            }
403            Some(Arc::clone(block))
404        } else {
405            None
406        }
407    }
408
409    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
410        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
411            let evict = self.access_order.remove(0);
412            self.blocks.remove(&evict);
413        }
414        self.blocks.insert(first_doc_id, block);
415        self.access_order.push(first_doc_id);
416    }
417}
418
419impl AsyncStoreReader {
420    /// Open a document store from LazyFileHandle
421    /// Only loads footer and index into memory, data blocks are fetched on-demand
422    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
423        let file_len = file_handle.len();
424        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
425        if file_len < 32 {
426            return Err(io::Error::new(
427                io::ErrorKind::InvalidData,
428                "Store too small",
429            ));
430        }
431
432        // Read footer (32 bytes)
433        let footer = file_handle
434            .read_bytes_range(file_len - 32..file_len)
435            .await?;
436        let mut reader = footer.as_slice();
437        let data_end_offset = reader.read_u64::<LittleEndian>()?;
438        let dict_offset = reader.read_u64::<LittleEndian>()?;
439        let num_docs = reader.read_u32::<LittleEndian>()?;
440        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
441        let version = reader.read_u32::<LittleEndian>()?;
442        let magic = reader.read_u32::<LittleEndian>()?;
443
444        if magic != STORE_MAGIC {
445            return Err(io::Error::new(
446                io::ErrorKind::InvalidData,
447                "Invalid store magic",
448            ));
449        }
450        if version != STORE_VERSION {
451            return Err(io::Error::new(
452                io::ErrorKind::InvalidData,
453                format!("Unsupported store version: {}", version),
454            ));
455        }
456
457        // Load dictionary if present
458        let dict = if has_dict && dict_offset > 0 {
459            let dict_start = dict_offset;
460            let dict_len_bytes = file_handle
461                .read_bytes_range(dict_start..dict_start + 4)
462                .await?;
463            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
464            let dict_bytes = file_handle
465                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
466                .await?;
467            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
468        } else {
469            None
470        };
471
472        // Calculate index location
473        let index_start = if has_dict && dict_offset > 0 {
474            let dict_start = dict_offset;
475            let dict_len_bytes = file_handle
476                .read_bytes_range(dict_start..dict_start + 4)
477                .await?;
478            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
479            dict_start + 4 + dict_len
480        } else {
481            data_end_offset
482        };
483        let index_end = file_len - 32;
484
485        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
486        let mut reader = index_bytes.as_slice();
487
488        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
489        let mut index = Vec::with_capacity(num_blocks);
490
491        for _ in 0..num_blocks {
492            let first_doc_id = reader.read_u32::<LittleEndian>()?;
493            let offset = reader.read_u64::<LittleEndian>()?;
494            let length = reader.read_u32::<LittleEndian>()?;
495            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
496
497            index.push(StoreBlockIndex {
498                first_doc_id,
499                offset,
500                length,
501                num_docs: num_docs_in_block,
502            });
503        }
504
505        // Create lazy slice for data portion only
506        let data_slice = file_handle.slice(0..data_end_offset);
507
508        Ok(Self {
509            data_slice,
510            index,
511            num_docs,
512            dict,
513            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
514        })
515    }
516
517    /// Number of documents
518    pub fn num_docs(&self) -> u32 {
519        self.num_docs
520    }
521
522    /// Number of blocks currently in the cache
523    pub fn cached_blocks(&self) -> usize {
524        self.cache.read().blocks.len()
525    }
526
527    /// Get a document by doc_id (async - may load block)
528    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
529        if doc_id >= self.num_docs {
530            return Ok(None);
531        }
532
533        // Find block containing this doc_id
534        let block_idx = self
535            .index
536            .binary_search_by(|entry| {
537                if doc_id < entry.first_doc_id {
538                    std::cmp::Ordering::Greater
539                } else if doc_id >= entry.first_doc_id + entry.num_docs {
540                    std::cmp::Ordering::Less
541                } else {
542                    std::cmp::Ordering::Equal
543                }
544            })
545            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
546
547        let entry = &self.index[block_idx];
548        let block_data = self.load_block(entry).await?;
549
550        // Find document within block
551        let doc_offset_in_block = doc_id - entry.first_doc_id;
552        let mut reader = &block_data[..];
553
554        for _ in 0..doc_offset_in_block {
555            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
556            if doc_len > reader.len() {
557                return Err(io::Error::new(
558                    io::ErrorKind::InvalidData,
559                    "Invalid doc length",
560                ));
561            }
562            reader = &reader[doc_len..];
563        }
564
565        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
566        let doc_bytes = &reader[..doc_len];
567
568        deserialize_document(doc_bytes, schema).map(Some)
569    }
570
571    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
572        // Check cache
573        {
574            let mut cache = self.cache.write();
575            if let Some(block) = cache.get(entry.first_doc_id) {
576                return Ok(block);
577            }
578        }
579
580        // Load from FileSlice
581        let start = entry.offset;
582        let end = start + entry.length as u64;
583        let compressed = self.data_slice.read_bytes_range(start..end).await?;
584
585        // Use dictionary decompression if available
586        let decompressed = if let Some(ref dict) = self.dict {
587            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
588        } else {
589            crate::compression::decompress(compressed.as_slice())?
590        };
591
592        let block = Arc::new(decompressed);
593
594        // Insert into cache
595        {
596            let mut cache = self.cache.write();
597            cache.insert(entry.first_doc_id, Arc::clone(&block));
598        }
599
600        Ok(block)
601    }
602}
603
604pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
605    use crate::dsl::Field;
606
607    let mut reader = data;
608    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
609    let mut doc = Document::new();
610
611    for _ in 0..num_fields {
612        let field_id = reader.read_u16::<LittleEndian>()?;
613        let field = Field(field_id as u32);
614        let type_tag = reader.read_u8()?;
615
616        match type_tag {
617            0 => {
618                // Text
619                let len = reader.read_u32::<LittleEndian>()? as usize;
620                let s = std::str::from_utf8(&reader[..len])
621                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
622                doc.add_text(field, s);
623                reader = &reader[len..];
624            }
625            1 => {
626                // U64
627                doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
628            }
629            2 => {
630                // I64
631                doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
632            }
633            3 => {
634                // F64
635                doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
636            }
637            4 => {
638                // Bytes
639                let len = reader.read_u32::<LittleEndian>()? as usize;
640                doc.add_bytes(field, reader[..len].to_vec());
641                reader = &reader[len..];
642            }
643            5 => {
644                // SparseVector
645                let count = reader.read_u32::<LittleEndian>()? as usize;
646                let mut entries = Vec::with_capacity(count);
647                for _ in 0..count {
648                    let idx = reader.read_u32::<LittleEndian>()?;
649                    let val = reader.read_f32::<LittleEndian>()?;
650                    entries.push((idx, val));
651                }
652                doc.add_sparse_vector(field, entries);
653            }
654            6 => {
655                // DenseVector
656                let count = reader.read_u32::<LittleEndian>()? as usize;
657                let byte_len = count * 4;
658                if reader.len() < byte_len {
659                    return Err(io::Error::new(
660                        io::ErrorKind::UnexpectedEof,
661                        format!(
662                            "dense vector field {}: need {} bytes but only {} remain",
663                            field.0,
664                            byte_len,
665                            reader.len()
666                        ),
667                    ));
668                }
669                let mut values = vec![0.0f32; count];
670                // Read raw f32 bytes directly
671                unsafe {
672                    std::ptr::copy_nonoverlapping(
673                        reader.as_ptr(),
674                        values.as_mut_ptr() as *mut u8,
675                        byte_len,
676                    );
677                }
678                reader = &reader[byte_len..];
679                doc.add_dense_vector(field, values);
680            }
681            7 => {
682                // Json
683                let len = reader.read_u32::<LittleEndian>()? as usize;
684                let v: serde_json::Value = serde_json::from_slice(&reader[..len])
685                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
686                doc.add_json(field, v);
687                reader = &reader[len..];
688            }
689            _ => {
690                return Err(io::Error::new(
691                    io::ErrorKind::InvalidData,
692                    format!("Unknown field type tag: {}", type_tag),
693                ));
694            }
695        }
696    }
697
698    Ok(doc)
699}
700
701/// Raw block info for store merging (without decompression)
702#[derive(Debug, Clone)]
703pub struct RawStoreBlock {
704    pub first_doc_id: DocId,
705    pub num_docs: u32,
706    pub offset: u64,
707    pub length: u32,
708}
709
710/// Store merger - concatenates compressed blocks from multiple stores without recompression
711///
712/// This is much faster than rebuilding stores since it avoids:
713/// - Decompressing blocks from source stores
714/// - Re-serializing documents
715/// - Re-compressing blocks at level 22
716///
717/// Limitations:
718/// - All source stores must NOT use dictionaries (or use the same dictionary)
719/// - Doc IDs are remapped sequentially
720pub struct StoreMerger<'a, W: Write> {
721    writer: &'a mut W,
722    index: Vec<StoreBlockIndex>,
723    current_offset: u64,
724    next_doc_id: DocId,
725}
726
727impl<'a, W: Write> StoreMerger<'a, W> {
728    pub fn new(writer: &'a mut W) -> Self {
729        Self {
730            writer,
731            index: Vec::new(),
732            current_offset: 0,
733            next_doc_id: 0,
734        }
735    }
736
737    /// Append raw compressed blocks from a store file
738    ///
739    /// `data_slice` should be the data portion of the store (before index/footer)
740    /// `blocks` contains the block metadata from the source store
741    pub async fn append_store<F: AsyncFileRead>(
742        &mut self,
743        data_slice: &F,
744        blocks: &[RawStoreBlock],
745    ) -> io::Result<()> {
746        for block in blocks {
747            // Read raw compressed block data
748            let start = block.offset;
749            let end = start + block.length as u64;
750            let compressed_data = data_slice.read_bytes_range(start..end).await?;
751
752            // Write to output
753            self.writer.write_all(compressed_data.as_slice())?;
754
755            // Add to index with remapped doc IDs
756            self.index.push(StoreBlockIndex {
757                first_doc_id: self.next_doc_id,
758                offset: self.current_offset,
759                length: block.length,
760                num_docs: block.num_docs,
761            });
762
763            self.current_offset += block.length as u64;
764            self.next_doc_id += block.num_docs;
765        }
766
767        Ok(())
768    }
769
770    /// Append blocks from a dict-compressed store by decompressing and recompressing.
771    ///
772    /// For stores that use dictionary compression, raw blocks can't be stacked
773    /// directly because the decompressor needs the original dictionary.
774    /// This method decompresses each block with the source dict, then
775    /// recompresses without a dictionary so the merged output is self-contained.
776    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
777        let dict = store.dict();
778        let data_slice = store.data_slice();
779        let blocks = store.block_index();
780
781        for block in blocks {
782            let start = block.offset;
783            let end = start + block.length as u64;
784            let compressed = data_slice.read_bytes_range(start..end).await?;
785
786            // Decompress with source dict (or without if no dict)
787            let decompressed = if let Some(d) = dict {
788                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
789            } else {
790                crate::compression::decompress(compressed.as_slice())?
791            };
792
793            // Recompress without dictionary
794            let recompressed = crate::compression::compress(
795                &decompressed,
796                crate::compression::CompressionLevel::default(),
797            )?;
798
799            self.writer.write_all(&recompressed)?;
800
801            self.index.push(StoreBlockIndex {
802                first_doc_id: self.next_doc_id,
803                offset: self.current_offset,
804                length: recompressed.len() as u32,
805                num_docs: block.num_docs,
806            });
807
808            self.current_offset += recompressed.len() as u64;
809            self.next_doc_id += block.num_docs;
810        }
811
812        Ok(())
813    }
814
815    /// Finish writing the merged store
816    pub fn finish(self) -> io::Result<u32> {
817        let data_end_offset = self.current_offset;
818
819        // No dictionary support for merged stores (would need same dict across all sources)
820        let dict_offset = 0u64;
821
822        // Write index
823        self.writer
824            .write_u32::<LittleEndian>(self.index.len() as u32)?;
825        for entry in &self.index {
826            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
827            self.writer.write_u64::<LittleEndian>(entry.offset)?;
828            self.writer.write_u32::<LittleEndian>(entry.length)?;
829            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
830        }
831
832        // Write footer
833        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
834        self.writer.write_u64::<LittleEndian>(dict_offset)?;
835        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
836        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
837        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
838        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
839
840        Ok(self.next_doc_id)
841    }
842}
843
844impl AsyncStoreReader {
845    /// Get raw block metadata for merging (without loading block data)
846    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
847        self.index
848            .iter()
849            .map(|entry| RawStoreBlock {
850                first_doc_id: entry.first_doc_id,
851                num_docs: entry.num_docs,
852                offset: entry.offset,
853                length: entry.length,
854            })
855            .collect()
856    }
857
858    /// Get the data slice for raw block access
859    pub fn data_slice(&self) -> &LazyFileSlice {
860        &self.data_slice
861    }
862
863    /// Check if this store uses a dictionary (incompatible with raw merging)
864    pub fn has_dict(&self) -> bool {
865        self.dict.is_some()
866    }
867
868    /// Get the decompression dictionary (if any)
869    pub fn dict(&self) -> Option<&CompressionDict> {
870        self.dict.as_ref()
871    }
872
873    /// Get block index for iteration
874    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
875        &self.index
876    }
877}