Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Write block index + footer to a store file.
29///
30/// Shared by `StoreWriter::finish`, `StoreWriter::finish` (empty), and `StoreMerger::finish`.
31fn write_store_index_and_footer(
32    writer: &mut (impl Write + ?Sized),
33    index: &[StoreBlockIndex],
34    data_end_offset: u64,
35    dict_offset: u64,
36    num_docs: u32,
37    has_dict: bool,
38) -> io::Result<()> {
39    writer.write_u32::<LittleEndian>(index.len() as u32)?;
40    for entry in index {
41        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
42        writer.write_u64::<LittleEndian>(entry.offset)?;
43        writer.write_u32::<LittleEndian>(entry.length)?;
44        writer.write_u32::<LittleEndian>(entry.num_docs)?;
45    }
46    writer.write_u64::<LittleEndian>(data_end_offset)?;
47    writer.write_u64::<LittleEndian>(dict_offset)?;
48    writer.write_u32::<LittleEndian>(num_docs)?;
49    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
50    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
51    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
52    Ok(())
53}
54
55/// Block size for document store (256KB for better compression)
56/// Larger blocks = better compression ratio but more memory per block load
57pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
58
59/// Default dictionary size (64KB is a good balance)
60pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
61
62/// Default compression level for document store
63#[cfg(feature = "native")]
64const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
65
66/// Binary document format:
67///   num_fields: u16
68///   per field: field_id: u16, type_tag: u8, value data
69///     0=Text:         len:u32 + utf8
70///     1=U64:          u64 LE
71///     2=I64:          i64 LE
72///     3=F64:          f64 LE
73///     4=Bytes:        len:u32 + raw
74///     5=SparseVector: count:u32 + count*(u32+f32)
75///     6=DenseVector:  count:u32 + count*f32
76///     7=Json:         len:u32 + json utf8
77pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78    use crate::dsl::FieldValue;
79
80    let stored: Vec<_> = doc
81        .field_values()
82        .iter()
83        .filter(|(field, value)| {
84            // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
85            if matches!(value, crate::dsl::FieldValue::DenseVector(_)) {
86                return false;
87            }
88            schema.get_field_entry(*field).is_some_and(|e| e.stored)
89        })
90        .collect();
91
92    let mut buf = Vec::with_capacity(256);
93    buf.write_u16::<LittleEndian>(stored.len() as u16)?;
94
95    for (field, value) in &stored {
96        buf.write_u16::<LittleEndian>(field.0 as u16)?;
97        match value {
98            FieldValue::Text(s) => {
99                buf.push(0);
100                let bytes = s.as_bytes();
101                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
102                buf.extend_from_slice(bytes);
103            }
104            FieldValue::U64(v) => {
105                buf.push(1);
106                buf.write_u64::<LittleEndian>(*v)?;
107            }
108            FieldValue::I64(v) => {
109                buf.push(2);
110                buf.write_i64::<LittleEndian>(*v)?;
111            }
112            FieldValue::F64(v) => {
113                buf.push(3);
114                buf.write_f64::<LittleEndian>(*v)?;
115            }
116            FieldValue::Bytes(b) => {
117                buf.push(4);
118                buf.write_u32::<LittleEndian>(b.len() as u32)?;
119                buf.extend_from_slice(b);
120            }
121            FieldValue::SparseVector(entries) => {
122                buf.push(5);
123                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
124                for (idx, val) in entries {
125                    buf.write_u32::<LittleEndian>(*idx)?;
126                    buf.write_f32::<LittleEndian>(*val)?;
127                }
128            }
129            FieldValue::DenseVector(values) => {
130                buf.push(6);
131                buf.write_u32::<LittleEndian>(values.len() as u32)?;
132                // Write raw f32 bytes directly
133                let byte_slice = unsafe {
134                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
135                };
136                buf.extend_from_slice(byte_slice);
137            }
138            FieldValue::Json(v) => {
139                buf.push(7);
140                let json_bytes = serde_json::to_vec(v)
141                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
142                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
143                buf.extend_from_slice(&json_bytes);
144            }
145        }
146    }
147
148    Ok(buf)
149}
150
151/// Compressed block result
152#[cfg(feature = "native")]
153struct CompressedBlock {
154    seq: usize,
155    first_doc_id: DocId,
156    num_docs: u32,
157    compressed: Vec<u8>,
158}
159
160/// Parallel document store writer - compresses blocks immediately when queued
161///
162/// Spawns compression tasks as soon as blocks are ready, overlapping document
163/// ingestion with compression to reduce total indexing time.
164///
165/// Uses background threads to compress blocks while the main thread continues
166/// accepting documents.
167#[cfg(feature = "native")]
168pub struct EagerParallelStoreWriter<'a> {
169    writer: &'a mut dyn Write,
170    block_buffer: Vec<u8>,
171    /// Compressed blocks ready to be written (may arrive out of order)
172    compressed_blocks: Vec<CompressedBlock>,
173    /// Handles for in-flight compression tasks
174    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
175    next_seq: usize,
176    next_doc_id: DocId,
177    block_first_doc: DocId,
178    dict: Option<Arc<CompressionDict>>,
179    compression_level: CompressionLevel,
180}
181
182#[cfg(feature = "native")]
183impl<'a> EagerParallelStoreWriter<'a> {
184    /// Create a new eager parallel store writer
185    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
186        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
187    }
188
189    /// Create with specific compression level
190    pub fn with_compression_level(
191        writer: &'a mut dyn Write,
192        _num_threads: usize,
193        compression_level: CompressionLevel,
194    ) -> Self {
195        Self {
196            writer,
197            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
198            compressed_blocks: Vec::new(),
199            pending_handles: Vec::new(),
200            next_seq: 0,
201            next_doc_id: 0,
202            block_first_doc: 0,
203            dict: None,
204            compression_level,
205        }
206    }
207
208    /// Create with dictionary
209    pub fn with_dict(
210        writer: &'a mut dyn Write,
211        dict: CompressionDict,
212        _num_threads: usize,
213    ) -> Self {
214        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
215    }
216
217    /// Create with dictionary and specific compression level
218    pub fn with_dict_and_level(
219        writer: &'a mut dyn Write,
220        dict: CompressionDict,
221        _num_threads: usize,
222        compression_level: CompressionLevel,
223    ) -> Self {
224        Self {
225            writer,
226            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
227            compressed_blocks: Vec::new(),
228            pending_handles: Vec::new(),
229            next_seq: 0,
230            next_doc_id: 0,
231            block_first_doc: 0,
232            dict: Some(Arc::new(dict)),
233            compression_level,
234        }
235    }
236
237    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
238        let doc_id = self.next_doc_id;
239        self.next_doc_id += 1;
240
241        let doc_bytes = serialize_document(doc, schema)?;
242
243        self.block_buffer
244            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
245        self.block_buffer.extend_from_slice(&doc_bytes);
246
247        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
248            self.spawn_compression();
249        }
250
251        Ok(doc_id)
252    }
253
254    /// Spawn compression for the current block immediately
255    fn spawn_compression(&mut self) {
256        if self.block_buffer.is_empty() {
257            return;
258        }
259
260        let num_docs = self.next_doc_id - self.block_first_doc;
261        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
262        let seq = self.next_seq;
263        let first_doc_id = self.block_first_doc;
264        let dict = self.dict.clone();
265
266        self.next_seq += 1;
267        self.block_first_doc = self.next_doc_id;
268
269        // Spawn compression task using thread
270        let level = self.compression_level;
271        let handle = std::thread::spawn(move || {
272            let compressed = if let Some(ref d) = dict {
273                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
274            } else {
275                crate::compression::compress(&data, level).expect("compression failed")
276            };
277
278            CompressedBlock {
279                seq,
280                first_doc_id,
281                num_docs,
282                compressed,
283            }
284        });
285
286        self.pending_handles.push(handle);
287    }
288
289    /// Collect any completed compression tasks
290    fn collect_completed(&mut self) {
291        let mut remaining = Vec::new();
292        for handle in self.pending_handles.drain(..) {
293            if handle.is_finished() {
294                if let Ok(block) = handle.join() {
295                    self.compressed_blocks.push(block);
296                }
297            } else {
298                remaining.push(handle);
299            }
300        }
301        self.pending_handles = remaining;
302    }
303
304    pub fn finish(mut self) -> io::Result<u32> {
305        // Spawn compression for any remaining data
306        self.spawn_compression();
307
308        // Collect any already-completed tasks
309        self.collect_completed();
310
311        // Wait for all remaining compression tasks
312        for handle in self.pending_handles.drain(..) {
313            if let Ok(block) = handle.join() {
314                self.compressed_blocks.push(block);
315            }
316        }
317
318        if self.compressed_blocks.is_empty() {
319            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
320            return Ok(0);
321        }
322
323        // Sort by sequence to maintain order
324        self.compressed_blocks.sort_by_key(|b| b.seq);
325
326        // Write blocks in order and build index
327        let mut index = Vec::with_capacity(self.compressed_blocks.len());
328        let mut current_offset = 0u64;
329
330        for block in &self.compressed_blocks {
331            index.push(StoreBlockIndex {
332                first_doc_id: block.first_doc_id,
333                offset: current_offset,
334                length: block.compressed.len() as u32,
335                num_docs: block.num_docs,
336            });
337
338            self.writer.write_all(&block.compressed)?;
339            current_offset += block.compressed.len() as u64;
340        }
341
342        let data_end_offset = current_offset;
343
344        // Write dictionary if present
345        let dict_offset = if let Some(ref dict) = self.dict {
346            let offset = current_offset;
347            let dict_bytes = dict.as_bytes();
348            self.writer
349                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
350            self.writer.write_all(dict_bytes)?;
351            Some(offset)
352        } else {
353            None
354        };
355
356        // Write index + footer
357        write_store_index_and_footer(
358            &mut self.writer,
359            &index,
360            data_end_offset,
361            dict_offset.unwrap_or(0),
362            self.next_doc_id,
363            self.dict.is_some(),
364        )?;
365
366        Ok(self.next_doc_id)
367    }
368}
369
370/// Block index entry for document store
371#[derive(Debug, Clone)]
372pub(crate) struct StoreBlockIndex {
373    pub(crate) first_doc_id: DocId,
374    pub(crate) offset: u64,
375    pub(crate) length: u32,
376    pub(crate) num_docs: u32,
377}
378
379/// Async document store reader - loads blocks on demand
380pub struct AsyncStoreReader {
381    /// LazyFileSlice for the data portion - fetches ranges on demand
382    data_slice: LazyFileSlice,
383    /// Block index
384    index: Vec<StoreBlockIndex>,
385    num_docs: u32,
386    /// Optional compression dictionary
387    dict: Option<CompressionDict>,
388    /// Block cache
389    cache: RwLock<StoreBlockCache>,
390}
391
392/// FIFO block cache — O(1) lookup, insert, and eviction.
393///
394/// Uses VecDeque for eviction order (pop_front = O(1)) instead of
395/// Vec::remove(0) which is O(n). get() is &self for read-lock compatibility.
396struct StoreBlockCache {
397    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
398    insert_order: std::collections::VecDeque<DocId>,
399    max_blocks: usize,
400}
401
402impl StoreBlockCache {
403    fn new(max_blocks: usize) -> Self {
404        Self {
405            blocks: FxHashMap::default(),
406            insert_order: std::collections::VecDeque::with_capacity(max_blocks),
407            max_blocks,
408        }
409    }
410
411    fn get(&self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
412        self.blocks.get(&first_doc_id).map(Arc::clone)
413    }
414
415    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
416        if self.blocks.contains_key(&first_doc_id) {
417            return; // already cached
418        }
419        while self.blocks.len() >= self.max_blocks {
420            if let Some(evict) = self.insert_order.pop_front() {
421                self.blocks.remove(&evict);
422            } else {
423                break;
424            }
425        }
426        self.blocks.insert(first_doc_id, block);
427        self.insert_order.push_back(first_doc_id);
428    }
429}
430
431impl AsyncStoreReader {
432    /// Open a document store from LazyFileHandle
433    /// Only loads footer and index into memory, data blocks are fetched on-demand
434    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
435        let file_len = file_handle.len();
436        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
437        if file_len < 32 {
438            return Err(io::Error::new(
439                io::ErrorKind::InvalidData,
440                "Store too small",
441            ));
442        }
443
444        // Read footer (32 bytes)
445        let footer = file_handle
446            .read_bytes_range(file_len - 32..file_len)
447            .await?;
448        let mut reader = footer.as_slice();
449        let data_end_offset = reader.read_u64::<LittleEndian>()?;
450        let dict_offset = reader.read_u64::<LittleEndian>()?;
451        let num_docs = reader.read_u32::<LittleEndian>()?;
452        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
453        let version = reader.read_u32::<LittleEndian>()?;
454        let magic = reader.read_u32::<LittleEndian>()?;
455
456        if magic != STORE_MAGIC {
457            return Err(io::Error::new(
458                io::ErrorKind::InvalidData,
459                "Invalid store magic",
460            ));
461        }
462        if version != STORE_VERSION {
463            return Err(io::Error::new(
464                io::ErrorKind::InvalidData,
465                format!("Unsupported store version: {}", version),
466            ));
467        }
468
469        // Load dictionary if present
470        let dict = if has_dict && dict_offset > 0 {
471            let dict_start = dict_offset;
472            let dict_len_bytes = file_handle
473                .read_bytes_range(dict_start..dict_start + 4)
474                .await?;
475            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
476            let dict_bytes = file_handle
477                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
478                .await?;
479            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
480        } else {
481            None
482        };
483
484        // Calculate index location
485        let index_start = if has_dict && dict_offset > 0 {
486            let dict_start = dict_offset;
487            let dict_len_bytes = file_handle
488                .read_bytes_range(dict_start..dict_start + 4)
489                .await?;
490            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
491            dict_start + 4 + dict_len
492        } else {
493            data_end_offset
494        };
495        let index_end = file_len - 32;
496
497        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
498        let mut reader = index_bytes.as_slice();
499
500        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
501        let mut index = Vec::with_capacity(num_blocks);
502
503        for _ in 0..num_blocks {
504            let first_doc_id = reader.read_u32::<LittleEndian>()?;
505            let offset = reader.read_u64::<LittleEndian>()?;
506            let length = reader.read_u32::<LittleEndian>()?;
507            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
508
509            index.push(StoreBlockIndex {
510                first_doc_id,
511                offset,
512                length,
513                num_docs: num_docs_in_block,
514            });
515        }
516
517        // Create lazy slice for data portion only
518        let data_slice = file_handle.slice(0..data_end_offset);
519
520        Ok(Self {
521            data_slice,
522            index,
523            num_docs,
524            dict,
525            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
526        })
527    }
528
529    /// Number of documents
530    pub fn num_docs(&self) -> u32 {
531        self.num_docs
532    }
533
534    /// Number of blocks currently in the cache
535    pub fn cached_blocks(&self) -> usize {
536        self.cache.read().blocks.len()
537    }
538
539    /// Get a document by doc_id (async - may load block)
540    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
541        if doc_id >= self.num_docs {
542            return Ok(None);
543        }
544
545        // Find block containing this doc_id
546        let block_idx = self
547            .index
548            .binary_search_by(|entry| {
549                if doc_id < entry.first_doc_id {
550                    std::cmp::Ordering::Greater
551                } else if doc_id >= entry.first_doc_id + entry.num_docs {
552                    std::cmp::Ordering::Less
553                } else {
554                    std::cmp::Ordering::Equal
555                }
556            })
557            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
558
559        let entry = &self.index[block_idx];
560        let block_data = self.load_block(entry).await?;
561
562        // Find document within block
563        let doc_offset_in_block = doc_id - entry.first_doc_id;
564        let mut reader = &block_data[..];
565
566        for _ in 0..doc_offset_in_block {
567            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
568            if doc_len > reader.len() {
569                return Err(io::Error::new(
570                    io::ErrorKind::InvalidData,
571                    "Invalid doc length",
572                ));
573            }
574            reader = &reader[doc_len..];
575        }
576
577        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
578        let doc_bytes = &reader[..doc_len];
579
580        deserialize_document(doc_bytes, schema).map(Some)
581    }
582
583    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
584        // Check cache first (read lock — concurrent cache hits don't serialize)
585        {
586            if let Some(block) = self.cache.read().get(entry.first_doc_id) {
587                return Ok(block);
588            }
589        }
590
591        // Load from FileSlice
592        let start = entry.offset;
593        let end = start + entry.length as u64;
594        let compressed = self.data_slice.read_bytes_range(start..end).await?;
595
596        // Use dictionary decompression if available
597        let decompressed = if let Some(ref dict) = self.dict {
598            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
599        } else {
600            crate::compression::decompress(compressed.as_slice())?
601        };
602
603        let block = Arc::new(decompressed);
604
605        // Insert into cache
606        {
607            let mut cache = self.cache.write();
608            cache.insert(entry.first_doc_id, Arc::clone(&block));
609        }
610
611        Ok(block)
612    }
613}
614
615pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
616    use crate::dsl::Field;
617
618    let mut reader = data;
619    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
620    let mut doc = Document::new();
621
622    for _ in 0..num_fields {
623        let field_id = reader.read_u16::<LittleEndian>()?;
624        let field = Field(field_id as u32);
625        let type_tag = reader.read_u8()?;
626
627        match type_tag {
628            0 => {
629                // Text
630                let len = reader.read_u32::<LittleEndian>()? as usize;
631                let s = std::str::from_utf8(&reader[..len])
632                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
633                doc.add_text(field, s);
634                reader = &reader[len..];
635            }
636            1 => {
637                // U64
638                doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
639            }
640            2 => {
641                // I64
642                doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
643            }
644            3 => {
645                // F64
646                doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
647            }
648            4 => {
649                // Bytes
650                let len = reader.read_u32::<LittleEndian>()? as usize;
651                doc.add_bytes(field, reader[..len].to_vec());
652                reader = &reader[len..];
653            }
654            5 => {
655                // SparseVector
656                let count = reader.read_u32::<LittleEndian>()? as usize;
657                let mut entries = Vec::with_capacity(count);
658                for _ in 0..count {
659                    let idx = reader.read_u32::<LittleEndian>()?;
660                    let val = reader.read_f32::<LittleEndian>()?;
661                    entries.push((idx, val));
662                }
663                doc.add_sparse_vector(field, entries);
664            }
665            6 => {
666                // DenseVector
667                let count = reader.read_u32::<LittleEndian>()? as usize;
668                let byte_len = count * 4;
669                if reader.len() < byte_len {
670                    return Err(io::Error::new(
671                        io::ErrorKind::UnexpectedEof,
672                        format!(
673                            "dense vector field {}: need {} bytes but only {} remain",
674                            field.0,
675                            byte_len,
676                            reader.len()
677                        ),
678                    ));
679                }
680                let mut values = vec![0.0f32; count];
681                // Read raw f32 bytes directly
682                unsafe {
683                    std::ptr::copy_nonoverlapping(
684                        reader.as_ptr(),
685                        values.as_mut_ptr() as *mut u8,
686                        byte_len,
687                    );
688                }
689                reader = &reader[byte_len..];
690                doc.add_dense_vector(field, values);
691            }
692            7 => {
693                // Json
694                let len = reader.read_u32::<LittleEndian>()? as usize;
695                let v: serde_json::Value = serde_json::from_slice(&reader[..len])
696                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
697                doc.add_json(field, v);
698                reader = &reader[len..];
699            }
700            _ => {
701                return Err(io::Error::new(
702                    io::ErrorKind::InvalidData,
703                    format!("Unknown field type tag: {}", type_tag),
704                ));
705            }
706        }
707    }
708
709    Ok(doc)
710}
711
712/// Raw block info for store merging (without decompression)
713#[derive(Debug, Clone)]
714pub struct RawStoreBlock {
715    pub first_doc_id: DocId,
716    pub num_docs: u32,
717    pub offset: u64,
718    pub length: u32,
719}
720
721/// Store merger - concatenates compressed blocks from multiple stores without recompression
722///
723/// This is much faster than rebuilding stores since it avoids:
724/// - Decompressing blocks from source stores
725/// - Re-serializing documents
726/// - Re-compressing blocks at level 22
727///
728/// Limitations:
729/// - All source stores must NOT use dictionaries (or use the same dictionary)
730/// - Doc IDs are remapped sequentially
731pub struct StoreMerger<'a, W: Write> {
732    writer: &'a mut W,
733    index: Vec<StoreBlockIndex>,
734    current_offset: u64,
735    next_doc_id: DocId,
736}
737
738impl<'a, W: Write> StoreMerger<'a, W> {
739    pub fn new(writer: &'a mut W) -> Self {
740        Self {
741            writer,
742            index: Vec::new(),
743            current_offset: 0,
744            next_doc_id: 0,
745        }
746    }
747
748    /// Append raw compressed blocks from a store file
749    ///
750    /// `data_slice` should be the data portion of the store (before index/footer)
751    /// `blocks` contains the block metadata from the source store
752    pub async fn append_store<F: AsyncFileRead>(
753        &mut self,
754        data_slice: &F,
755        blocks: &[RawStoreBlock],
756    ) -> io::Result<()> {
757        for block in blocks {
758            // Read raw compressed block data
759            let start = block.offset;
760            let end = start + block.length as u64;
761            let compressed_data = data_slice.read_bytes_range(start..end).await?;
762
763            // Write to output
764            self.writer.write_all(compressed_data.as_slice())?;
765
766            // Add to index with remapped doc IDs
767            self.index.push(StoreBlockIndex {
768                first_doc_id: self.next_doc_id,
769                offset: self.current_offset,
770                length: block.length,
771                num_docs: block.num_docs,
772            });
773
774            self.current_offset += block.length as u64;
775            self.next_doc_id += block.num_docs;
776        }
777
778        Ok(())
779    }
780
781    /// Append blocks from a dict-compressed store by decompressing and recompressing.
782    ///
783    /// For stores that use dictionary compression, raw blocks can't be stacked
784    /// directly because the decompressor needs the original dictionary.
785    /// This method decompresses each block with the source dict, then
786    /// recompresses without a dictionary so the merged output is self-contained.
787    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
788        let dict = store.dict();
789        let data_slice = store.data_slice();
790        let blocks = store.block_index();
791
792        for block in blocks {
793            let start = block.offset;
794            let end = start + block.length as u64;
795            let compressed = data_slice.read_bytes_range(start..end).await?;
796
797            // Decompress with source dict (or without if no dict)
798            let decompressed = if let Some(d) = dict {
799                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
800            } else {
801                crate::compression::decompress(compressed.as_slice())?
802            };
803
804            // Recompress without dictionary
805            let recompressed = crate::compression::compress(
806                &decompressed,
807                crate::compression::CompressionLevel::default(),
808            )?;
809
810            self.writer.write_all(&recompressed)?;
811
812            self.index.push(StoreBlockIndex {
813                first_doc_id: self.next_doc_id,
814                offset: self.current_offset,
815                length: recompressed.len() as u32,
816                num_docs: block.num_docs,
817            });
818
819            self.current_offset += recompressed.len() as u64;
820            self.next_doc_id += block.num_docs;
821        }
822
823        Ok(())
824    }
825
826    /// Finish writing the merged store
827    pub fn finish(self) -> io::Result<u32> {
828        let data_end_offset = self.current_offset;
829
830        // No dictionary support for merged stores (would need same dict across all sources)
831        let dict_offset = 0u64;
832
833        // Write index + footer
834        write_store_index_and_footer(
835            self.writer,
836            &self.index,
837            data_end_offset,
838            dict_offset,
839            self.next_doc_id,
840            false,
841        )?;
842
843        Ok(self.next_doc_id)
844    }
845}
846
847impl AsyncStoreReader {
848    /// Get raw block metadata for merging (without loading block data)
849    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
850        self.index
851            .iter()
852            .map(|entry| RawStoreBlock {
853                first_doc_id: entry.first_doc_id,
854                num_docs: entry.num_docs,
855                offset: entry.offset,
856                length: entry.length,
857            })
858            .collect()
859    }
860
861    /// Get the data slice for raw block access
862    pub fn data_slice(&self) -> &LazyFileSlice {
863        &self.data_slice
864    }
865
866    /// Check if this store uses a dictionary (incompatible with raw merging)
867    pub fn has_dict(&self) -> bool {
868        self.dict.is_some()
869    }
870
871    /// Get the decompression dictionary (if any)
872    pub fn dict(&self) -> Option<&CompressionDict> {
873        self.dict.as_ref()
874    }
875
876    /// Get block index for iteration
877    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
878        &self.index
879    }
880}