Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (256KB for better compression)
29/// Larger blocks = better compression ratio but more memory per block load
30pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32/// Default dictionary size (64KB is a good balance)
33pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35/// Default compression level for document store
36#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
38
39/// Binary document format:
40///   num_fields: u16
41///   per field: field_id: u16, type_tag: u8, value data
42///     0=Text:         len:u32 + utf8
43///     1=U64:          u64 LE
44///     2=I64:          i64 LE
45///     3=F64:          f64 LE
46///     4=Bytes:        len:u32 + raw
47///     5=SparseVector: count:u32 + count*(u32+f32)
48///     6=DenseVector:  count:u32 + count*f32
49///     7=Json:         len:u32 + json utf8
50pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
51    use crate::dsl::FieldValue;
52
53    let stored: Vec<_> = doc
54        .field_values()
55        .iter()
56        .filter(|(field, _)| schema.get_field_entry(*field).is_some_and(|e| e.stored))
57        .collect();
58
59    let mut buf = Vec::with_capacity(256);
60    buf.write_u16::<LittleEndian>(stored.len() as u16)?;
61
62    for (field, value) in &stored {
63        buf.write_u16::<LittleEndian>(field.0 as u16)?;
64        match value {
65            FieldValue::Text(s) => {
66                buf.push(0);
67                let bytes = s.as_bytes();
68                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
69                buf.extend_from_slice(bytes);
70            }
71            FieldValue::U64(v) => {
72                buf.push(1);
73                buf.write_u64::<LittleEndian>(*v)?;
74            }
75            FieldValue::I64(v) => {
76                buf.push(2);
77                buf.write_i64::<LittleEndian>(*v)?;
78            }
79            FieldValue::F64(v) => {
80                buf.push(3);
81                buf.write_f64::<LittleEndian>(*v)?;
82            }
83            FieldValue::Bytes(b) => {
84                buf.push(4);
85                buf.write_u32::<LittleEndian>(b.len() as u32)?;
86                buf.extend_from_slice(b);
87            }
88            FieldValue::SparseVector(entries) => {
89                buf.push(5);
90                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
91                for (idx, val) in entries {
92                    buf.write_u32::<LittleEndian>(*idx)?;
93                    buf.write_f32::<LittleEndian>(*val)?;
94                }
95            }
96            FieldValue::DenseVector(values) => {
97                buf.push(6);
98                buf.write_u32::<LittleEndian>(values.len() as u32)?;
99                // Write raw f32 bytes directly
100                let byte_slice = unsafe {
101                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
102                };
103                buf.extend_from_slice(byte_slice);
104            }
105            FieldValue::Json(v) => {
106                buf.push(7);
107                let json_bytes = serde_json::to_vec(v)
108                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
109                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
110                buf.extend_from_slice(&json_bytes);
111            }
112        }
113    }
114
115    Ok(buf)
116}
117
118/// Compressed block result
119#[cfg(feature = "native")]
120struct CompressedBlock {
121    seq: usize,
122    first_doc_id: DocId,
123    num_docs: u32,
124    compressed: Vec<u8>,
125}
126
127/// Parallel document store writer - compresses blocks immediately when queued
128///
129/// Spawns compression tasks as soon as blocks are ready, overlapping document
130/// ingestion with compression to reduce total indexing time.
131///
132/// Uses background threads to compress blocks while the main thread continues
133/// accepting documents.
134#[cfg(feature = "native")]
135pub struct EagerParallelStoreWriter<'a> {
136    writer: &'a mut dyn Write,
137    block_buffer: Vec<u8>,
138    /// Compressed blocks ready to be written (may arrive out of order)
139    compressed_blocks: Vec<CompressedBlock>,
140    /// Handles for in-flight compression tasks
141    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
142    next_seq: usize,
143    next_doc_id: DocId,
144    block_first_doc: DocId,
145    dict: Option<Arc<CompressionDict>>,
146    compression_level: CompressionLevel,
147}
148
149#[cfg(feature = "native")]
150impl<'a> EagerParallelStoreWriter<'a> {
151    /// Create a new eager parallel store writer
152    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
153        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
154    }
155
156    /// Create with specific compression level
157    pub fn with_compression_level(
158        writer: &'a mut dyn Write,
159        _num_threads: usize,
160        compression_level: CompressionLevel,
161    ) -> Self {
162        Self {
163            writer,
164            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
165            compressed_blocks: Vec::new(),
166            pending_handles: Vec::new(),
167            next_seq: 0,
168            next_doc_id: 0,
169            block_first_doc: 0,
170            dict: None,
171            compression_level,
172        }
173    }
174
175    /// Create with dictionary
176    pub fn with_dict(
177        writer: &'a mut dyn Write,
178        dict: CompressionDict,
179        _num_threads: usize,
180    ) -> Self {
181        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
182    }
183
184    /// Create with dictionary and specific compression level
185    pub fn with_dict_and_level(
186        writer: &'a mut dyn Write,
187        dict: CompressionDict,
188        _num_threads: usize,
189        compression_level: CompressionLevel,
190    ) -> Self {
191        Self {
192            writer,
193            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
194            compressed_blocks: Vec::new(),
195            pending_handles: Vec::new(),
196            next_seq: 0,
197            next_doc_id: 0,
198            block_first_doc: 0,
199            dict: Some(Arc::new(dict)),
200            compression_level,
201        }
202    }
203
204    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
205        let doc_id = self.next_doc_id;
206        self.next_doc_id += 1;
207
208        let doc_bytes = serialize_document(doc, schema)?;
209
210        self.block_buffer
211            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
212        self.block_buffer.extend_from_slice(&doc_bytes);
213
214        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
215            self.spawn_compression();
216        }
217
218        Ok(doc_id)
219    }
220
221    /// Spawn compression for the current block immediately
222    fn spawn_compression(&mut self) {
223        if self.block_buffer.is_empty() {
224            return;
225        }
226
227        let num_docs = self.next_doc_id - self.block_first_doc;
228        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
229        let seq = self.next_seq;
230        let first_doc_id = self.block_first_doc;
231        let dict = self.dict.clone();
232
233        self.next_seq += 1;
234        self.block_first_doc = self.next_doc_id;
235
236        // Spawn compression task using thread
237        let level = self.compression_level;
238        let handle = std::thread::spawn(move || {
239            let compressed = if let Some(ref d) = dict {
240                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
241            } else {
242                crate::compression::compress(&data, level).expect("compression failed")
243            };
244
245            CompressedBlock {
246                seq,
247                first_doc_id,
248                num_docs,
249                compressed,
250            }
251        });
252
253        self.pending_handles.push(handle);
254    }
255
256    /// Collect any completed compression tasks
257    fn collect_completed(&mut self) {
258        let mut remaining = Vec::new();
259        for handle in self.pending_handles.drain(..) {
260            if handle.is_finished() {
261                if let Ok(block) = handle.join() {
262                    self.compressed_blocks.push(block);
263                }
264            } else {
265                remaining.push(handle);
266            }
267        }
268        self.pending_handles = remaining;
269    }
270
271    pub fn finish(mut self) -> io::Result<u32> {
272        // Spawn compression for any remaining data
273        self.spawn_compression();
274
275        // Collect any already-completed tasks
276        self.collect_completed();
277
278        // Wait for all remaining compression tasks
279        for handle in self.pending_handles.drain(..) {
280            if let Ok(block) = handle.join() {
281                self.compressed_blocks.push(block);
282            }
283        }
284
285        if self.compressed_blocks.is_empty() {
286            // Write empty store
287            let data_end_offset = 0u64;
288            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
289            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
290            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
291            self.writer.write_u32::<LittleEndian>(0)?; // num docs
292            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
293            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
294            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
295            return Ok(0);
296        }
297
298        // Sort by sequence to maintain order
299        self.compressed_blocks.sort_by_key(|b| b.seq);
300
301        // Write blocks in order and build index
302        let mut index = Vec::with_capacity(self.compressed_blocks.len());
303        let mut current_offset = 0u64;
304
305        for block in &self.compressed_blocks {
306            index.push(StoreBlockIndex {
307                first_doc_id: block.first_doc_id,
308                offset: current_offset,
309                length: block.compressed.len() as u32,
310                num_docs: block.num_docs,
311            });
312
313            self.writer.write_all(&block.compressed)?;
314            current_offset += block.compressed.len() as u64;
315        }
316
317        let data_end_offset = current_offset;
318
319        // Write dictionary if present
320        let dict_offset = if let Some(ref dict) = self.dict {
321            let offset = current_offset;
322            let dict_bytes = dict.as_bytes();
323            self.writer
324                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
325            self.writer.write_all(dict_bytes)?;
326            Some(offset)
327        } else {
328            None
329        };
330
331        // Write index
332        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
333        for entry in &index {
334            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
335            self.writer.write_u64::<LittleEndian>(entry.offset)?;
336            self.writer.write_u32::<LittleEndian>(entry.length)?;
337            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
338        }
339
340        // Write footer
341        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
342        self.writer
343            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
344        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
345        self.writer
346            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
347        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
348        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
349
350        Ok(self.next_doc_id)
351    }
352}
353
354/// Block index entry for document store
355#[derive(Debug, Clone)]
356struct StoreBlockIndex {
357    first_doc_id: DocId,
358    offset: u64,
359    length: u32,
360    num_docs: u32,
361}
362
363/// Async document store reader - loads blocks on demand
364pub struct AsyncStoreReader {
365    /// LazyFileSlice for the data portion - fetches ranges on demand
366    data_slice: LazyFileSlice,
367    /// Block index
368    index: Vec<StoreBlockIndex>,
369    num_docs: u32,
370    /// Optional compression dictionary
371    dict: Option<CompressionDict>,
372    /// Block cache
373    cache: RwLock<StoreBlockCache>,
374}
375
376struct StoreBlockCache {
377    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
378    access_order: Vec<DocId>,
379    max_blocks: usize,
380}
381
382impl StoreBlockCache {
383    fn new(max_blocks: usize) -> Self {
384        Self {
385            blocks: FxHashMap::default(),
386            access_order: Vec::new(),
387            max_blocks,
388        }
389    }
390
391    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
392        if let Some(block) = self.blocks.get(&first_doc_id) {
393            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
394                self.access_order.remove(pos);
395                self.access_order.push(first_doc_id);
396            }
397            Some(Arc::clone(block))
398        } else {
399            None
400        }
401    }
402
403    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
404        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
405            let evict = self.access_order.remove(0);
406            self.blocks.remove(&evict);
407        }
408        self.blocks.insert(first_doc_id, block);
409        self.access_order.push(first_doc_id);
410    }
411}
412
413impl AsyncStoreReader {
414    /// Open a document store from LazyFileHandle
415    /// Only loads footer and index into memory, data blocks are fetched on-demand
416    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
417        let file_len = file_handle.len();
418        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
419        if file_len < 32 {
420            return Err(io::Error::new(
421                io::ErrorKind::InvalidData,
422                "Store too small",
423            ));
424        }
425
426        // Read footer (32 bytes)
427        let footer = file_handle
428            .read_bytes_range(file_len - 32..file_len)
429            .await?;
430        let mut reader = footer.as_slice();
431        let data_end_offset = reader.read_u64::<LittleEndian>()?;
432        let dict_offset = reader.read_u64::<LittleEndian>()?;
433        let num_docs = reader.read_u32::<LittleEndian>()?;
434        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
435        let version = reader.read_u32::<LittleEndian>()?;
436        let magic = reader.read_u32::<LittleEndian>()?;
437
438        if magic != STORE_MAGIC {
439            return Err(io::Error::new(
440                io::ErrorKind::InvalidData,
441                "Invalid store magic",
442            ));
443        }
444        if version != STORE_VERSION {
445            return Err(io::Error::new(
446                io::ErrorKind::InvalidData,
447                format!("Unsupported store version: {}", version),
448            ));
449        }
450
451        // Load dictionary if present
452        let dict = if has_dict && dict_offset > 0 {
453            let dict_start = dict_offset;
454            let dict_len_bytes = file_handle
455                .read_bytes_range(dict_start..dict_start + 4)
456                .await?;
457            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
458            let dict_bytes = file_handle
459                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
460                .await?;
461            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
462        } else {
463            None
464        };
465
466        // Calculate index location
467        let index_start = if has_dict && dict_offset > 0 {
468            let dict_start = dict_offset;
469            let dict_len_bytes = file_handle
470                .read_bytes_range(dict_start..dict_start + 4)
471                .await?;
472            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
473            dict_start + 4 + dict_len
474        } else {
475            data_end_offset
476        };
477        let index_end = file_len - 32;
478
479        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
480        let mut reader = index_bytes.as_slice();
481
482        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
483        let mut index = Vec::with_capacity(num_blocks);
484
485        for _ in 0..num_blocks {
486            let first_doc_id = reader.read_u32::<LittleEndian>()?;
487            let offset = reader.read_u64::<LittleEndian>()?;
488            let length = reader.read_u32::<LittleEndian>()?;
489            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
490
491            index.push(StoreBlockIndex {
492                first_doc_id,
493                offset,
494                length,
495                num_docs: num_docs_in_block,
496            });
497        }
498
499        // Create lazy slice for data portion only
500        let data_slice = file_handle.slice(0..data_end_offset);
501
502        Ok(Self {
503            data_slice,
504            index,
505            num_docs,
506            dict,
507            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
508        })
509    }
510
511    /// Number of documents
512    pub fn num_docs(&self) -> u32 {
513        self.num_docs
514    }
515
516    /// Number of blocks currently in the cache
517    pub fn cached_blocks(&self) -> usize {
518        self.cache.read().blocks.len()
519    }
520
521    /// Get a document by doc_id (async - may load block)
522    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
523        if doc_id >= self.num_docs {
524            return Ok(None);
525        }
526
527        // Find block containing this doc_id
528        let block_idx = self
529            .index
530            .binary_search_by(|entry| {
531                if doc_id < entry.first_doc_id {
532                    std::cmp::Ordering::Greater
533                } else if doc_id >= entry.first_doc_id + entry.num_docs {
534                    std::cmp::Ordering::Less
535                } else {
536                    std::cmp::Ordering::Equal
537                }
538            })
539            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
540
541        let entry = &self.index[block_idx];
542        let block_data = self.load_block(entry).await?;
543
544        // Find document within block
545        let doc_offset_in_block = doc_id - entry.first_doc_id;
546        let mut reader = &block_data[..];
547
548        for _ in 0..doc_offset_in_block {
549            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
550            if doc_len > reader.len() {
551                return Err(io::Error::new(
552                    io::ErrorKind::InvalidData,
553                    "Invalid doc length",
554                ));
555            }
556            reader = &reader[doc_len..];
557        }
558
559        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
560        let doc_bytes = &reader[..doc_len];
561
562        deserialize_document(doc_bytes, schema).map(Some)
563    }
564
565    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
566        // Check cache
567        {
568            let mut cache = self.cache.write();
569            if let Some(block) = cache.get(entry.first_doc_id) {
570                return Ok(block);
571            }
572        }
573
574        // Load from FileSlice
575        let start = entry.offset;
576        let end = start + entry.length as u64;
577        let compressed = self.data_slice.read_bytes_range(start..end).await?;
578
579        // Use dictionary decompression if available
580        let decompressed = if let Some(ref dict) = self.dict {
581            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
582        } else {
583            crate::compression::decompress(compressed.as_slice())?
584        };
585
586        let block = Arc::new(decompressed);
587
588        // Insert into cache
589        {
590            let mut cache = self.cache.write();
591            cache.insert(entry.first_doc_id, Arc::clone(&block));
592        }
593
594        Ok(block)
595    }
596}
597
598pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
599    use crate::dsl::Field;
600
601    let mut reader = data;
602    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
603    let mut doc = Document::new();
604
605    for _ in 0..num_fields {
606        let field_id = reader.read_u16::<LittleEndian>()?;
607        let field = Field(field_id as u32);
608        let type_tag = reader.read_u8()?;
609
610        match type_tag {
611            0 => {
612                // Text
613                let len = reader.read_u32::<LittleEndian>()? as usize;
614                let s = std::str::from_utf8(&reader[..len])
615                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
616                doc.add_text(field, s);
617                reader = &reader[len..];
618            }
619            1 => {
620                // U64
621                doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
622            }
623            2 => {
624                // I64
625                doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
626            }
627            3 => {
628                // F64
629                doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
630            }
631            4 => {
632                // Bytes
633                let len = reader.read_u32::<LittleEndian>()? as usize;
634                doc.add_bytes(field, reader[..len].to_vec());
635                reader = &reader[len..];
636            }
637            5 => {
638                // SparseVector
639                let count = reader.read_u32::<LittleEndian>()? as usize;
640                let mut entries = Vec::with_capacity(count);
641                for _ in 0..count {
642                    let idx = reader.read_u32::<LittleEndian>()?;
643                    let val = reader.read_f32::<LittleEndian>()?;
644                    entries.push((idx, val));
645                }
646                doc.add_sparse_vector(field, entries);
647            }
648            6 => {
649                // DenseVector
650                let count = reader.read_u32::<LittleEndian>()? as usize;
651                let byte_len = count * 4;
652                let mut values = vec![0.0f32; count];
653                // Read raw f32 bytes directly
654                unsafe {
655                    std::ptr::copy_nonoverlapping(
656                        reader.as_ptr(),
657                        values.as_mut_ptr() as *mut u8,
658                        byte_len,
659                    );
660                }
661                reader = &reader[byte_len..];
662                doc.add_dense_vector(field, values);
663            }
664            7 => {
665                // Json
666                let len = reader.read_u32::<LittleEndian>()? as usize;
667                let v: serde_json::Value = serde_json::from_slice(&reader[..len])
668                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
669                doc.add_json(field, v);
670                reader = &reader[len..];
671            }
672            _ => {
673                return Err(io::Error::new(
674                    io::ErrorKind::InvalidData,
675                    format!("Unknown field type tag: {}", type_tag),
676                ));
677            }
678        }
679    }
680
681    Ok(doc)
682}
683
684/// Raw block info for store merging (without decompression)
685#[derive(Debug, Clone)]
686pub struct RawStoreBlock {
687    pub first_doc_id: DocId,
688    pub num_docs: u32,
689    pub offset: u64,
690    pub length: u32,
691}
692
693/// Store merger - concatenates compressed blocks from multiple stores without recompression
694///
695/// This is much faster than rebuilding stores since it avoids:
696/// - Decompressing blocks from source stores
697/// - Re-serializing documents
698/// - Re-compressing blocks at level 22
699///
700/// Limitations:
701/// - All source stores must NOT use dictionaries (or use the same dictionary)
702/// - Doc IDs are remapped sequentially
703pub struct StoreMerger<'a, W: Write> {
704    writer: &'a mut W,
705    index: Vec<StoreBlockIndex>,
706    current_offset: u64,
707    next_doc_id: DocId,
708}
709
710impl<'a, W: Write> StoreMerger<'a, W> {
711    pub fn new(writer: &'a mut W) -> Self {
712        Self {
713            writer,
714            index: Vec::new(),
715            current_offset: 0,
716            next_doc_id: 0,
717        }
718    }
719
720    /// Append raw compressed blocks from a store file
721    ///
722    /// `data_slice` should be the data portion of the store (before index/footer)
723    /// `blocks` contains the block metadata from the source store
724    pub async fn append_store<F: AsyncFileRead>(
725        &mut self,
726        data_slice: &F,
727        blocks: &[RawStoreBlock],
728    ) -> io::Result<()> {
729        for block in blocks {
730            // Read raw compressed block data
731            let start = block.offset;
732            let end = start + block.length as u64;
733            let compressed_data = data_slice.read_bytes_range(start..end).await?;
734
735            // Write to output
736            self.writer.write_all(compressed_data.as_slice())?;
737
738            // Add to index with remapped doc IDs
739            self.index.push(StoreBlockIndex {
740                first_doc_id: self.next_doc_id,
741                offset: self.current_offset,
742                length: block.length,
743                num_docs: block.num_docs,
744            });
745
746            self.current_offset += block.length as u64;
747            self.next_doc_id += block.num_docs;
748        }
749
750        Ok(())
751    }
752
753    /// Finish writing the merged store
754    pub fn finish(self) -> io::Result<u32> {
755        let data_end_offset = self.current_offset;
756
757        // No dictionary support for merged stores (would need same dict across all sources)
758        let dict_offset = 0u64;
759
760        // Write index
761        self.writer
762            .write_u32::<LittleEndian>(self.index.len() as u32)?;
763        for entry in &self.index {
764            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
765            self.writer.write_u64::<LittleEndian>(entry.offset)?;
766            self.writer.write_u32::<LittleEndian>(entry.length)?;
767            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
768        }
769
770        // Write footer
771        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
772        self.writer.write_u64::<LittleEndian>(dict_offset)?;
773        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
774        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
775        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
776        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
777
778        Ok(self.next_doc_id)
779    }
780}
781
782impl AsyncStoreReader {
783    /// Get raw block metadata for merging (without loading block data)
784    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
785        self.index
786            .iter()
787            .map(|entry| RawStoreBlock {
788                first_doc_id: entry.first_doc_id,
789                num_docs: entry.num_docs,
790                offset: entry.offset,
791                length: entry.length,
792            })
793            .collect()
794    }
795
796    /// Get the data slice for raw block access
797    pub fn data_slice(&self) -> &LazyFileSlice {
798        &self.data_slice
799    }
800
801    /// Check if this store uses a dictionary (incompatible with raw merging)
802    pub fn has_dict(&self) -> bool {
803        self.dict.is_some()
804    }
805}