hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
24const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
25
26/// Block size for document store (256KB for better compression)
27/// Larger blocks = better compression ratio but more memory per block load
28pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30/// Default dictionary size (64KB is a good balance)
31pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33/// Default compression level for document store
34const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
35
36/// Document store writer with optional dictionary compression
37pub struct StoreWriter<'a> {
38    writer: &'a mut dyn Write,
39    block_buffer: Vec<u8>,
40    index: Vec<StoreBlockIndex>,
41    current_offset: u64,
42    next_doc_id: DocId,
43    block_first_doc: DocId,
44    dict: Option<CompressionDict>,
45    compression_level: CompressionLevel,
46}
47
48impl<'a> StoreWriter<'a> {
49    /// Create a new store writer without dictionary
50    pub fn new(writer: &'a mut dyn Write) -> Self {
51        Self::with_compression_level(writer, DEFAULT_COMPRESSION_LEVEL)
52    }
53
54    /// Create a new store writer with a specific compression level
55    pub fn with_compression_level(
56        writer: &'a mut dyn Write,
57        compression_level: CompressionLevel,
58    ) -> Self {
59        Self {
60            writer,
61            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
62            index: Vec::new(),
63            current_offset: 0,
64            next_doc_id: 0,
65            block_first_doc: 0,
66            dict: None,
67            compression_level,
68        }
69    }
70
71    /// Create a new store writer with a trained dictionary
72    pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
73        Self::with_dict_and_level(writer, dict, DEFAULT_COMPRESSION_LEVEL)
74    }
75
76    /// Create a new store writer with a trained dictionary and specific compression level
77    pub fn with_dict_and_level(
78        writer: &'a mut dyn Write,
79        dict: CompressionDict,
80        compression_level: CompressionLevel,
81    ) -> Self {
82        Self {
83            writer,
84            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
85            index: Vec::new(),
86            current_offset: 0,
87            next_doc_id: 0,
88            block_first_doc: 0,
89            dict: Some(dict),
90            compression_level,
91        }
92    }
93
94    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
95        let doc_id = self.next_doc_id;
96        self.next_doc_id += 1;
97
98        let doc_bytes = serialize_document(doc, schema)?;
99
100        self.block_buffer
101            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
102        self.block_buffer.extend_from_slice(&doc_bytes);
103
104        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
105            self.flush_block()?;
106        }
107
108        Ok(doc_id)
109    }
110
111    fn flush_block(&mut self) -> io::Result<()> {
112        if self.block_buffer.is_empty() {
113            return Ok(());
114        }
115
116        let num_docs = self.next_doc_id - self.block_first_doc;
117
118        // Use dictionary compression if available, otherwise standard compression
119        let compressed = if let Some(ref dict) = self.dict {
120            crate::compression::compress_with_dict(
121                &self.block_buffer,
122                self.compression_level,
123                dict,
124            )?
125        } else {
126            crate::compression::compress(&self.block_buffer, self.compression_level)?
127        };
128
129        self.index.push(StoreBlockIndex {
130            first_doc_id: self.block_first_doc,
131            offset: self.current_offset,
132            length: compressed.len() as u32,
133            num_docs,
134        });
135
136        self.writer.write_all(&compressed)?;
137        self.current_offset += compressed.len() as u64;
138
139        self.block_buffer.clear();
140        self.block_first_doc = self.next_doc_id;
141
142        Ok(())
143    }
144
145    pub fn finish(mut self) -> io::Result<u32> {
146        self.flush_block()?;
147
148        let data_end_offset = self.current_offset;
149
150        // Write dictionary if present
151        let dict_offset = if let Some(ref dict) = self.dict {
152            let offset = self.current_offset;
153            let dict_bytes = dict.as_bytes();
154            self.writer
155                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
156            self.writer.write_all(dict_bytes)?;
157            self.current_offset += 4 + dict_bytes.len() as u64;
158            Some(offset)
159        } else {
160            None
161        };
162
163        // Write index
164        self.writer
165            .write_u32::<LittleEndian>(self.index.len() as u32)?;
166        for entry in &self.index {
167            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
168            self.writer.write_u64::<LittleEndian>(entry.offset)?;
169            self.writer.write_u32::<LittleEndian>(entry.length)?;
170            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
171        }
172
173        // Write footer
174        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
175        self.writer
176            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
177        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
178        self.writer
179            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; // has_dict flag
180        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
181        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
182
183        Ok(self.next_doc_id)
184    }
185}
186
187pub fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
188    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
189}
190
191/// Compressed block result
192#[cfg(feature = "native")]
193struct CompressedBlock {
194    seq: usize,
195    first_doc_id: DocId,
196    num_docs: u32,
197    compressed: Vec<u8>,
198}
199
200/// Parallel document store writer - compresses blocks immediately when queued
201///
202/// Spawns compression tasks as soon as blocks are ready, overlapping document
203/// ingestion with compression to reduce total indexing time.
204///
205/// Uses background threads to compress blocks while the main thread continues
206/// accepting documents.
207#[cfg(feature = "native")]
208pub struct EagerParallelStoreWriter<'a> {
209    writer: &'a mut dyn Write,
210    block_buffer: Vec<u8>,
211    /// Compressed blocks ready to be written (may arrive out of order)
212    compressed_blocks: Vec<CompressedBlock>,
213    /// Handles for in-flight compression tasks
214    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
215    next_seq: usize,
216    next_doc_id: DocId,
217    block_first_doc: DocId,
218    dict: Option<Arc<CompressionDict>>,
219    compression_level: CompressionLevel,
220}
221
222#[cfg(feature = "native")]
223impl<'a> EagerParallelStoreWriter<'a> {
224    /// Create a new eager parallel store writer
225    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
226        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
227    }
228
229    /// Create with specific compression level
230    pub fn with_compression_level(
231        writer: &'a mut dyn Write,
232        _num_threads: usize,
233        compression_level: CompressionLevel,
234    ) -> Self {
235        Self {
236            writer,
237            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
238            compressed_blocks: Vec::new(),
239            pending_handles: Vec::new(),
240            next_seq: 0,
241            next_doc_id: 0,
242            block_first_doc: 0,
243            dict: None,
244            compression_level,
245        }
246    }
247
248    /// Create with dictionary
249    pub fn with_dict(
250        writer: &'a mut dyn Write,
251        dict: CompressionDict,
252        _num_threads: usize,
253    ) -> Self {
254        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
255    }
256
257    /// Create with dictionary and specific compression level
258    pub fn with_dict_and_level(
259        writer: &'a mut dyn Write,
260        dict: CompressionDict,
261        _num_threads: usize,
262        compression_level: CompressionLevel,
263    ) -> Self {
264        Self {
265            writer,
266            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
267            compressed_blocks: Vec::new(),
268            pending_handles: Vec::new(),
269            next_seq: 0,
270            next_doc_id: 0,
271            block_first_doc: 0,
272            dict: Some(Arc::new(dict)),
273            compression_level,
274        }
275    }
276
277    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
278        let doc_id = self.next_doc_id;
279        self.next_doc_id += 1;
280
281        let doc_bytes = serialize_document(doc, schema)?;
282
283        self.block_buffer
284            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
285        self.block_buffer.extend_from_slice(&doc_bytes);
286
287        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
288            self.spawn_compression();
289        }
290
291        Ok(doc_id)
292    }
293
294    /// Spawn compression for the current block immediately
295    fn spawn_compression(&mut self) {
296        if self.block_buffer.is_empty() {
297            return;
298        }
299
300        let num_docs = self.next_doc_id - self.block_first_doc;
301        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
302        let seq = self.next_seq;
303        let first_doc_id = self.block_first_doc;
304        let dict = self.dict.clone();
305
306        self.next_seq += 1;
307        self.block_first_doc = self.next_doc_id;
308
309        // Spawn compression task using thread
310        let level = self.compression_level;
311        let handle = std::thread::spawn(move || {
312            let compressed = if let Some(ref d) = dict {
313                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
314            } else {
315                crate::compression::compress(&data, level).expect("compression failed")
316            };
317
318            CompressedBlock {
319                seq,
320                first_doc_id,
321                num_docs,
322                compressed,
323            }
324        });
325
326        self.pending_handles.push(handle);
327    }
328
329    /// Collect any completed compression tasks
330    fn collect_completed(&mut self) {
331        let mut remaining = Vec::new();
332        for handle in self.pending_handles.drain(..) {
333            if handle.is_finished() {
334                if let Ok(block) = handle.join() {
335                    self.compressed_blocks.push(block);
336                }
337            } else {
338                remaining.push(handle);
339            }
340        }
341        self.pending_handles = remaining;
342    }
343
344    pub fn finish(mut self) -> io::Result<u32> {
345        // Spawn compression for any remaining data
346        self.spawn_compression();
347
348        // Collect any already-completed tasks
349        self.collect_completed();
350
351        // Wait for all remaining compression tasks
352        for handle in self.pending_handles.drain(..) {
353            if let Ok(block) = handle.join() {
354                self.compressed_blocks.push(block);
355            }
356        }
357
358        if self.compressed_blocks.is_empty() {
359            // Write empty store
360            let data_end_offset = 0u64;
361            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
362            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
363            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
364            self.writer.write_u32::<LittleEndian>(0)?; // num docs
365            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
366            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
367            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
368            return Ok(0);
369        }
370
371        // Sort by sequence to maintain order
372        self.compressed_blocks.sort_by_key(|b| b.seq);
373
374        // Write blocks in order and build index
375        let mut index = Vec::with_capacity(self.compressed_blocks.len());
376        let mut current_offset = 0u64;
377
378        for block in &self.compressed_blocks {
379            index.push(StoreBlockIndex {
380                first_doc_id: block.first_doc_id,
381                offset: current_offset,
382                length: block.compressed.len() as u32,
383                num_docs: block.num_docs,
384            });
385
386            self.writer.write_all(&block.compressed)?;
387            current_offset += block.compressed.len() as u64;
388        }
389
390        let data_end_offset = current_offset;
391
392        // Write dictionary if present
393        let dict_offset = if let Some(ref dict) = self.dict {
394            let offset = current_offset;
395            let dict_bytes = dict.as_bytes();
396            self.writer
397                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
398            self.writer.write_all(dict_bytes)?;
399            Some(offset)
400        } else {
401            None
402        };
403
404        // Write index
405        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
406        for entry in &index {
407            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
408            self.writer.write_u64::<LittleEndian>(entry.offset)?;
409            self.writer.write_u32::<LittleEndian>(entry.length)?;
410            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
411        }
412
413        // Write footer
414        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
415        self.writer
416            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
417        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
418        self.writer
419            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
420        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
421        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
422
423        Ok(self.next_doc_id)
424    }
425}
426
427/// Block index entry for document store
428#[derive(Debug, Clone)]
429struct StoreBlockIndex {
430    first_doc_id: DocId,
431    offset: u64,
432    length: u32,
433    num_docs: u32,
434}
435
436/// Async document store reader - loads blocks on demand
437pub struct AsyncStoreReader {
438    /// LazyFileSlice for the data portion - fetches ranges on demand
439    data_slice: LazyFileSlice,
440    /// Block index
441    index: Vec<StoreBlockIndex>,
442    num_docs: u32,
443    /// Optional compression dictionary
444    dict: Option<CompressionDict>,
445    /// Block cache
446    cache: RwLock<StoreBlockCache>,
447}
448
449struct StoreBlockCache {
450    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
451    access_order: Vec<DocId>,
452    max_blocks: usize,
453}
454
455impl StoreBlockCache {
456    fn new(max_blocks: usize) -> Self {
457        Self {
458            blocks: FxHashMap::default(),
459            access_order: Vec::new(),
460            max_blocks,
461        }
462    }
463
464    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
465        if let Some(block) = self.blocks.get(&first_doc_id) {
466            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
467                self.access_order.remove(pos);
468                self.access_order.push(first_doc_id);
469            }
470            Some(Arc::clone(block))
471        } else {
472            None
473        }
474    }
475
476    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
477        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
478            let evict = self.access_order.remove(0);
479            self.blocks.remove(&evict);
480        }
481        self.blocks.insert(first_doc_id, block);
482        self.access_order.push(first_doc_id);
483    }
484}
485
486impl AsyncStoreReader {
487    /// Open a document store from LazyFileHandle
488    /// Only loads footer and index into memory, data blocks are fetched on-demand
489    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
490        let file_len = file_handle.len();
491        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
492        if file_len < 32 {
493            return Err(io::Error::new(
494                io::ErrorKind::InvalidData,
495                "Store too small",
496            ));
497        }
498
499        // Read footer (32 bytes)
500        let footer = file_handle
501            .read_bytes_range(file_len - 32..file_len)
502            .await?;
503        let mut reader = footer.as_slice();
504        let data_end_offset = reader.read_u64::<LittleEndian>()?;
505        let dict_offset = reader.read_u64::<LittleEndian>()?;
506        let num_docs = reader.read_u32::<LittleEndian>()?;
507        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
508        let version = reader.read_u32::<LittleEndian>()?;
509        let magic = reader.read_u32::<LittleEndian>()?;
510
511        if magic != STORE_MAGIC {
512            return Err(io::Error::new(
513                io::ErrorKind::InvalidData,
514                "Invalid store magic",
515            ));
516        }
517        if version != STORE_VERSION {
518            return Err(io::Error::new(
519                io::ErrorKind::InvalidData,
520                format!("Unsupported store version: {}", version),
521            ));
522        }
523
524        // Load dictionary if present
525        let dict = if has_dict && dict_offset > 0 {
526            let dict_start = dict_offset as usize;
527            let dict_len_bytes = file_handle
528                .read_bytes_range(dict_start..dict_start + 4)
529                .await?;
530            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
531            let dict_bytes = file_handle
532                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
533                .await?;
534            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
535        } else {
536            None
537        };
538
539        // Calculate index location
540        let index_start = if has_dict && dict_offset > 0 {
541            let dict_start = dict_offset as usize;
542            let dict_len_bytes = file_handle
543                .read_bytes_range(dict_start..dict_start + 4)
544                .await?;
545            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
546            dict_start + 4 + dict_len
547        } else {
548            data_end_offset as usize
549        };
550        let index_end = file_len - 32;
551
552        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
553        let mut reader = index_bytes.as_slice();
554
555        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
556        let mut index = Vec::with_capacity(num_blocks);
557
558        for _ in 0..num_blocks {
559            let first_doc_id = reader.read_u32::<LittleEndian>()?;
560            let offset = reader.read_u64::<LittleEndian>()?;
561            let length = reader.read_u32::<LittleEndian>()?;
562            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
563
564            index.push(StoreBlockIndex {
565                first_doc_id,
566                offset,
567                length,
568                num_docs: num_docs_in_block,
569            });
570        }
571
572        // Create lazy slice for data portion only
573        let data_slice = file_handle.slice(0..data_end_offset as usize);
574
575        Ok(Self {
576            data_slice,
577            index,
578            num_docs,
579            dict,
580            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
581        })
582    }
583
584    /// Number of documents
585    pub fn num_docs(&self) -> u32 {
586        self.num_docs
587    }
588
589    /// Get a document by doc_id (async - may load block)
590    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
591        if doc_id >= self.num_docs {
592            return Ok(None);
593        }
594
595        // Find block containing this doc_id
596        let block_idx = self
597            .index
598            .binary_search_by(|entry| {
599                if doc_id < entry.first_doc_id {
600                    std::cmp::Ordering::Greater
601                } else if doc_id >= entry.first_doc_id + entry.num_docs {
602                    std::cmp::Ordering::Less
603                } else {
604                    std::cmp::Ordering::Equal
605                }
606            })
607            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
608
609        let entry = &self.index[block_idx];
610        let block_data = self.load_block(entry).await?;
611
612        // Find document within block
613        let doc_offset_in_block = doc_id - entry.first_doc_id;
614        let mut reader = &block_data[..];
615
616        for _ in 0..doc_offset_in_block {
617            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
618            if doc_len > reader.len() {
619                return Err(io::Error::new(
620                    io::ErrorKind::InvalidData,
621                    "Invalid doc length",
622                ));
623            }
624            reader = &reader[doc_len..];
625        }
626
627        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
628        let doc_bytes = &reader[..doc_len];
629
630        deserialize_document(doc_bytes, schema).map(Some)
631    }
632
633    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
634        // Check cache
635        {
636            let mut cache = self.cache.write();
637            if let Some(block) = cache.get(entry.first_doc_id) {
638                return Ok(block);
639            }
640        }
641
642        // Load from FileSlice
643        let start = entry.offset as usize;
644        let end = start + entry.length as usize;
645        let compressed = self.data_slice.read_bytes_range(start..end).await?;
646
647        // Use dictionary decompression if available
648        let decompressed = if let Some(ref dict) = self.dict {
649            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
650        } else {
651            crate::compression::decompress(compressed.as_slice())?
652        };
653
654        let block = Arc::new(decompressed);
655
656        // Insert into cache
657        {
658            let mut cache = self.cache.write();
659            cache.insert(entry.first_doc_id, Arc::clone(&block));
660        }
661
662        Ok(block)
663    }
664}
665
666pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
667    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
668}
669
670/// Raw block info for store merging (without decompression)
671#[derive(Debug, Clone)]
672pub struct RawStoreBlock {
673    pub first_doc_id: DocId,
674    pub num_docs: u32,
675    pub offset: u64,
676    pub length: u32,
677}
678
679/// Store merger - concatenates compressed blocks from multiple stores without recompression
680///
681/// This is much faster than rebuilding stores since it avoids:
682/// - Decompressing blocks from source stores
683/// - Re-serializing documents
684/// - Re-compressing blocks at level 22
685///
686/// Limitations:
687/// - All source stores must NOT use dictionaries (or use the same dictionary)
688/// - Doc IDs are remapped sequentially
689pub struct StoreMerger<'a, W: Write> {
690    writer: &'a mut W,
691    index: Vec<StoreBlockIndex>,
692    current_offset: u64,
693    next_doc_id: DocId,
694}
695
696impl<'a, W: Write> StoreMerger<'a, W> {
697    pub fn new(writer: &'a mut W) -> Self {
698        Self {
699            writer,
700            index: Vec::new(),
701            current_offset: 0,
702            next_doc_id: 0,
703        }
704    }
705
706    /// Append raw compressed blocks from a store file
707    ///
708    /// `data_slice` should be the data portion of the store (before index/footer)
709    /// `blocks` contains the block metadata from the source store
710    pub async fn append_store<F: AsyncFileRead>(
711        &mut self,
712        data_slice: &F,
713        blocks: &[RawStoreBlock],
714    ) -> io::Result<()> {
715        for block in blocks {
716            // Read raw compressed block data
717            let start = block.offset as usize;
718            let end = start + block.length as usize;
719            let compressed_data = data_slice.read_bytes_range(start..end).await?;
720
721            // Write to output
722            self.writer.write_all(compressed_data.as_slice())?;
723
724            // Add to index with remapped doc IDs
725            self.index.push(StoreBlockIndex {
726                first_doc_id: self.next_doc_id,
727                offset: self.current_offset,
728                length: block.length,
729                num_docs: block.num_docs,
730            });
731
732            self.current_offset += block.length as u64;
733            self.next_doc_id += block.num_docs;
734        }
735
736        Ok(())
737    }
738
739    /// Finish writing the merged store
740    pub fn finish(self) -> io::Result<u32> {
741        let data_end_offset = self.current_offset;
742
743        // No dictionary support for merged stores (would need same dict across all sources)
744        let dict_offset = 0u64;
745
746        // Write index
747        self.writer
748            .write_u32::<LittleEndian>(self.index.len() as u32)?;
749        for entry in &self.index {
750            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
751            self.writer.write_u64::<LittleEndian>(entry.offset)?;
752            self.writer.write_u32::<LittleEndian>(entry.length)?;
753            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
754        }
755
756        // Write footer
757        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
758        self.writer.write_u64::<LittleEndian>(dict_offset)?;
759        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
760        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
761        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
762        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
763
764        Ok(self.next_doc_id)
765    }
766}
767
768impl AsyncStoreReader {
769    /// Get raw block metadata for merging (without loading block data)
770    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
771        self.index
772            .iter()
773            .map(|entry| RawStoreBlock {
774                first_doc_id: entry.first_doc_id,
775                num_docs: entry.num_docs,
776                offset: entry.offset,
777                length: entry.length,
778            })
779            .collect()
780    }
781
782    /// Get the data slice for raw block access
783    pub fn data_slice(&self) -> &LazyFileSlice {
784        &self.data_slice
785    }
786
787    /// Check if this store uses a dictionary (incompatible with raw merging)
788    pub fn has_dict(&self) -> bool {
789        self.dict.is_some()
790    }
791}