hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
24const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
25
26/// Block size for document store (256KB for better compression)
27/// Larger blocks = better compression ratio but more memory per block load
28pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30/// Default dictionary size (64KB is a good balance)
31pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33/// Compression level for document store (maximum for static indexes)
34const COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::MAX;
35
36/// Document store writer with optional dictionary compression
37pub struct StoreWriter<'a> {
38    writer: &'a mut dyn Write,
39    block_buffer: Vec<u8>,
40    index: Vec<StoreBlockIndex>,
41    current_offset: u64,
42    next_doc_id: DocId,
43    block_first_doc: DocId,
44    dict: Option<CompressionDict>,
45}
46
47impl<'a> StoreWriter<'a> {
48    /// Create a new store writer without dictionary
49    pub fn new(writer: &'a mut dyn Write) -> Self {
50        Self {
51            writer,
52            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
53            index: Vec::new(),
54            current_offset: 0,
55            next_doc_id: 0,
56            block_first_doc: 0,
57            dict: None,
58        }
59    }
60
61    /// Create a new store writer with a trained dictionary
62    pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
63        Self {
64            writer,
65            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
66            index: Vec::new(),
67            current_offset: 0,
68            next_doc_id: 0,
69            block_first_doc: 0,
70            dict: Some(dict),
71        }
72    }
73
74    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
75        let doc_id = self.next_doc_id;
76        self.next_doc_id += 1;
77
78        let doc_bytes = serialize_document(doc, schema)?;
79
80        self.block_buffer
81            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
82        self.block_buffer.extend_from_slice(&doc_bytes);
83
84        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
85            self.flush_block()?;
86        }
87
88        Ok(doc_id)
89    }
90
91    fn flush_block(&mut self) -> io::Result<()> {
92        if self.block_buffer.is_empty() {
93            return Ok(());
94        }
95
96        let num_docs = self.next_doc_id - self.block_first_doc;
97
98        // Use dictionary compression if available, otherwise standard compression
99        let compressed = if let Some(ref dict) = self.dict {
100            crate::compression::compress_with_dict(&self.block_buffer, COMPRESSION_LEVEL, dict)?
101        } else {
102            crate::compression::compress(&self.block_buffer, COMPRESSION_LEVEL)?
103        };
104
105        self.index.push(StoreBlockIndex {
106            first_doc_id: self.block_first_doc,
107            offset: self.current_offset,
108            length: compressed.len() as u32,
109            num_docs,
110        });
111
112        self.writer.write_all(&compressed)?;
113        self.current_offset += compressed.len() as u64;
114
115        self.block_buffer.clear();
116        self.block_first_doc = self.next_doc_id;
117
118        Ok(())
119    }
120
121    pub fn finish(mut self) -> io::Result<u32> {
122        self.flush_block()?;
123
124        let data_end_offset = self.current_offset;
125
126        // Write dictionary if present
127        let dict_offset = if let Some(ref dict) = self.dict {
128            let offset = self.current_offset;
129            let dict_bytes = dict.as_bytes();
130            self.writer
131                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
132            self.writer.write_all(dict_bytes)?;
133            self.current_offset += 4 + dict_bytes.len() as u64;
134            Some(offset)
135        } else {
136            None
137        };
138
139        // Write index
140        self.writer
141            .write_u32::<LittleEndian>(self.index.len() as u32)?;
142        for entry in &self.index {
143            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
144            self.writer.write_u64::<LittleEndian>(entry.offset)?;
145            self.writer.write_u32::<LittleEndian>(entry.length)?;
146            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
147        }
148
149        // Write footer
150        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
151        self.writer
152            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
153        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
154        self.writer
155            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; // has_dict flag
156        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
157        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
158
159        Ok(self.next_doc_id)
160    }
161}
162
163fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
164    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
165}
166
167/// Pending block for parallel compression
168struct PendingBlock {
169    seq: usize,
170    first_doc_id: DocId,
171    num_docs: u32,
172    data: Vec<u8>,
173}
174
175/// Compressed block result
176struct CompressedBlock {
177    seq: usize,
178    first_doc_id: DocId,
179    num_docs: u32,
180    compressed: Vec<u8>,
181}
182
183/// Parallel document store writer - compresses blocks in parallel while maintaining order
184///
185/// Uses a thread pool to compress blocks concurrently. Blocks are written in sequence
186/// order to maintain correct document ordering in the output file.
187#[cfg(feature = "native")]
188pub struct ParallelStoreWriter<'a> {
189    writer: &'a mut dyn Write,
190    block_buffer: Vec<u8>,
191    pending_blocks: Vec<PendingBlock>,
192    next_seq: usize,
193    next_doc_id: DocId,
194    block_first_doc: DocId,
195    dict: Option<CompressionDict>,
196    #[allow(dead_code)]
197    num_threads: usize,
198}
199
200#[cfg(feature = "native")]
201impl<'a> ParallelStoreWriter<'a> {
202    /// Create a new parallel store writer
203    pub fn new(writer: &'a mut dyn Write, num_threads: usize) -> Self {
204        Self {
205            writer,
206            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
207            pending_blocks: Vec::new(),
208            next_seq: 0,
209            next_doc_id: 0,
210            block_first_doc: 0,
211            dict: None,
212            num_threads: num_threads.max(1),
213        }
214    }
215
216    /// Create with dictionary
217    pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict, num_threads: usize) -> Self {
218        Self {
219            writer,
220            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
221            pending_blocks: Vec::new(),
222            next_seq: 0,
223            next_doc_id: 0,
224            block_first_doc: 0,
225            dict: Some(dict),
226            num_threads: num_threads.max(1),
227        }
228    }
229
230    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
231        let doc_id = self.next_doc_id;
232        self.next_doc_id += 1;
233
234        let doc_bytes = serialize_document(doc, schema)?;
235
236        self.block_buffer
237            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
238        self.block_buffer.extend_from_slice(&doc_bytes);
239
240        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
241            self.queue_block();
242        }
243
244        Ok(doc_id)
245    }
246
247    fn queue_block(&mut self) {
248        if self.block_buffer.is_empty() {
249            return;
250        }
251
252        let num_docs = self.next_doc_id - self.block_first_doc;
253        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
254
255        self.pending_blocks.push(PendingBlock {
256            seq: self.next_seq,
257            first_doc_id: self.block_first_doc,
258            num_docs,
259            data,
260        });
261
262        self.next_seq += 1;
263        self.block_first_doc = self.next_doc_id;
264    }
265
266    pub fn finish(mut self) -> io::Result<u32> {
267        // Queue any remaining data
268        self.queue_block();
269
270        if self.pending_blocks.is_empty() {
271            // Write empty store
272            let data_end_offset = 0u64;
273            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
274            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
275            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
276            self.writer.write_u32::<LittleEndian>(0)?; // num docs
277            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
278            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
279            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
280            return Ok(0);
281        }
282
283        // Compress all blocks in parallel using rayon
284        let dict = self.dict.clone();
285        let compressed_blocks: Vec<CompressedBlock> = {
286            use rayon::prelude::*;
287
288            self.pending_blocks
289                .into_par_iter()
290                .map(|block| {
291                    let compressed = if let Some(ref d) = dict {
292                        crate::compression::compress_with_dict(&block.data, COMPRESSION_LEVEL, d)
293                            .expect("compression failed")
294                    } else {
295                        crate::compression::compress(&block.data, COMPRESSION_LEVEL)
296                            .expect("compression failed")
297                    };
298
299                    CompressedBlock {
300                        seq: block.seq,
301                        first_doc_id: block.first_doc_id,
302                        num_docs: block.num_docs,
303                        compressed,
304                    }
305                })
306                .collect()
307        };
308
309        // Sort by sequence to maintain order
310        let mut sorted_blocks = compressed_blocks;
311        sorted_blocks.sort_by_key(|b| b.seq);
312
313        // Write blocks in order and build index
314        let mut index = Vec::with_capacity(sorted_blocks.len());
315        let mut current_offset = 0u64;
316
317        for block in sorted_blocks {
318            index.push(StoreBlockIndex {
319                first_doc_id: block.first_doc_id,
320                offset: current_offset,
321                length: block.compressed.len() as u32,
322                num_docs: block.num_docs,
323            });
324
325            self.writer.write_all(&block.compressed)?;
326            current_offset += block.compressed.len() as u64;
327        }
328
329        let data_end_offset = current_offset;
330
331        // Write dictionary if present
332        let dict_offset = if let Some(ref dict) = self.dict {
333            let offset = current_offset;
334            let dict_bytes = dict.as_bytes();
335            self.writer
336                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
337            self.writer.write_all(dict_bytes)?;
338            let _ = current_offset + 4 + dict_bytes.len() as u64; // offset tracking ends here
339            Some(offset)
340        } else {
341            None
342        };
343
344        // Write index
345        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
346        for entry in &index {
347            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
348            self.writer.write_u64::<LittleEndian>(entry.offset)?;
349            self.writer.write_u32::<LittleEndian>(entry.length)?;
350            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
351        }
352
353        // Write footer
354        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
355        self.writer
356            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
357        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
358        self.writer
359            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
360        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
361        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
362
363        Ok(self.next_doc_id)
364    }
365}
366
367/// Block index entry for document store
368#[derive(Debug, Clone)]
369struct StoreBlockIndex {
370    first_doc_id: DocId,
371    offset: u64,
372    length: u32,
373    num_docs: u32,
374}
375
376/// Async document store reader - loads blocks on demand
377pub struct AsyncStoreReader {
378    /// LazyFileSlice for the data portion - fetches ranges on demand
379    data_slice: LazyFileSlice,
380    /// Block index
381    index: Vec<StoreBlockIndex>,
382    num_docs: u32,
383    /// Optional compression dictionary
384    dict: Option<CompressionDict>,
385    /// Block cache
386    cache: RwLock<StoreBlockCache>,
387}
388
389struct StoreBlockCache {
390    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
391    access_order: Vec<DocId>,
392    max_blocks: usize,
393}
394
395impl StoreBlockCache {
396    fn new(max_blocks: usize) -> Self {
397        Self {
398            blocks: FxHashMap::default(),
399            access_order: Vec::new(),
400            max_blocks,
401        }
402    }
403
404    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
405        if let Some(block) = self.blocks.get(&first_doc_id) {
406            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
407                self.access_order.remove(pos);
408                self.access_order.push(first_doc_id);
409            }
410            Some(Arc::clone(block))
411        } else {
412            None
413        }
414    }
415
416    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
417        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
418            let evict = self.access_order.remove(0);
419            self.blocks.remove(&evict);
420        }
421        self.blocks.insert(first_doc_id, block);
422        self.access_order.push(first_doc_id);
423    }
424}
425
426impl AsyncStoreReader {
427    /// Open a document store from LazyFileHandle
428    /// Only loads footer and index into memory, data blocks are fetched on-demand
429    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
430        let file_len = file_handle.len();
431        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
432        if file_len < 32 {
433            return Err(io::Error::new(
434                io::ErrorKind::InvalidData,
435                "Store too small",
436            ));
437        }
438
439        // Read footer (32 bytes)
440        let footer = file_handle
441            .read_bytes_range(file_len - 32..file_len)
442            .await?;
443        let mut reader = footer.as_slice();
444        let data_end_offset = reader.read_u64::<LittleEndian>()?;
445        let dict_offset = reader.read_u64::<LittleEndian>()?;
446        let num_docs = reader.read_u32::<LittleEndian>()?;
447        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
448        let version = reader.read_u32::<LittleEndian>()?;
449        let magic = reader.read_u32::<LittleEndian>()?;
450
451        if magic != STORE_MAGIC {
452            return Err(io::Error::new(
453                io::ErrorKind::InvalidData,
454                "Invalid store magic",
455            ));
456        }
457        if version != STORE_VERSION {
458            return Err(io::Error::new(
459                io::ErrorKind::InvalidData,
460                format!("Unsupported store version: {}", version),
461            ));
462        }
463
464        // Load dictionary if present
465        let dict = if has_dict && dict_offset > 0 {
466            let dict_start = dict_offset as usize;
467            let dict_len_bytes = file_handle
468                .read_bytes_range(dict_start..dict_start + 4)
469                .await?;
470            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
471            let dict_bytes = file_handle
472                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
473                .await?;
474            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
475        } else {
476            None
477        };
478
479        // Calculate index location
480        let index_start = if has_dict && dict_offset > 0 {
481            let dict_start = dict_offset as usize;
482            let dict_len_bytes = file_handle
483                .read_bytes_range(dict_start..dict_start + 4)
484                .await?;
485            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
486            dict_start + 4 + dict_len
487        } else {
488            data_end_offset as usize
489        };
490        let index_end = file_len - 32;
491
492        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
493        let mut reader = index_bytes.as_slice();
494
495        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
496        let mut index = Vec::with_capacity(num_blocks);
497
498        for _ in 0..num_blocks {
499            let first_doc_id = reader.read_u32::<LittleEndian>()?;
500            let offset = reader.read_u64::<LittleEndian>()?;
501            let length = reader.read_u32::<LittleEndian>()?;
502            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
503
504            index.push(StoreBlockIndex {
505                first_doc_id,
506                offset,
507                length,
508                num_docs: num_docs_in_block,
509            });
510        }
511
512        // Create lazy slice for data portion only
513        let data_slice = file_handle.slice(0..data_end_offset as usize);
514
515        Ok(Self {
516            data_slice,
517            index,
518            num_docs,
519            dict,
520            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
521        })
522    }
523
524    /// Number of documents
525    pub fn num_docs(&self) -> u32 {
526        self.num_docs
527    }
528
529    /// Get a document by doc_id (async - may load block)
530    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
531        if doc_id >= self.num_docs {
532            return Ok(None);
533        }
534
535        // Find block containing this doc_id
536        let block_idx = self
537            .index
538            .binary_search_by(|entry| {
539                if doc_id < entry.first_doc_id {
540                    std::cmp::Ordering::Greater
541                } else if doc_id >= entry.first_doc_id + entry.num_docs {
542                    std::cmp::Ordering::Less
543                } else {
544                    std::cmp::Ordering::Equal
545                }
546            })
547            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
548
549        let entry = &self.index[block_idx];
550        let block_data = self.load_block(entry).await?;
551
552        // Find document within block
553        let doc_offset_in_block = doc_id - entry.first_doc_id;
554        let mut reader = &block_data[..];
555
556        for _ in 0..doc_offset_in_block {
557            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
558            if doc_len > reader.len() {
559                return Err(io::Error::new(
560                    io::ErrorKind::InvalidData,
561                    "Invalid doc length",
562                ));
563            }
564            reader = &reader[doc_len..];
565        }
566
567        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
568        let doc_bytes = &reader[..doc_len];
569
570        deserialize_document(doc_bytes, schema).map(Some)
571    }
572
573    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
574        // Check cache
575        {
576            let mut cache = self.cache.write();
577            if let Some(block) = cache.get(entry.first_doc_id) {
578                return Ok(block);
579            }
580        }
581
582        // Load from FileSlice
583        let start = entry.offset as usize;
584        let end = start + entry.length as usize;
585        let compressed = self.data_slice.read_bytes_range(start..end).await?;
586
587        // Use dictionary decompression if available
588        let decompressed = if let Some(ref dict) = self.dict {
589            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
590        } else {
591            crate::compression::decompress(compressed.as_slice())?
592        };
593
594        let block = Arc::new(decompressed);
595
596        // Insert into cache
597        {
598            let mut cache = self.cache.write();
599            cache.insert(entry.first_doc_id, Arc::clone(&block));
600        }
601
602        Ok(block)
603    }
604}
605
606fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
607    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
608}
609
610/// Raw block info for store merging (without decompression)
611#[derive(Debug, Clone)]
612pub struct RawStoreBlock {
613    pub first_doc_id: DocId,
614    pub num_docs: u32,
615    pub offset: u64,
616    pub length: u32,
617}
618
619/// Store merger - concatenates compressed blocks from multiple stores without recompression
620///
621/// This is much faster than rebuilding stores since it avoids:
622/// - Decompressing blocks from source stores
623/// - Re-serializing documents  
624/// - Re-compressing blocks at level 22
625///
626/// Limitations:
627/// - All source stores must NOT use dictionaries (or use the same dictionary)
628/// - Doc IDs are remapped sequentially
629pub struct StoreMerger<'a, W: Write> {
630    writer: &'a mut W,
631    index: Vec<StoreBlockIndex>,
632    current_offset: u64,
633    next_doc_id: DocId,
634}
635
636impl<'a, W: Write> StoreMerger<'a, W> {
637    pub fn new(writer: &'a mut W) -> Self {
638        Self {
639            writer,
640            index: Vec::new(),
641            current_offset: 0,
642            next_doc_id: 0,
643        }
644    }
645
646    /// Append raw compressed blocks from a store file
647    ///
648    /// `data_slice` should be the data portion of the store (before index/footer)
649    /// `blocks` contains the block metadata from the source store
650    pub async fn append_store<F: AsyncFileRead>(
651        &mut self,
652        data_slice: &F,
653        blocks: &[RawStoreBlock],
654    ) -> io::Result<()> {
655        for block in blocks {
656            // Read raw compressed block data
657            let start = block.offset as usize;
658            let end = start + block.length as usize;
659            let compressed_data = data_slice.read_bytes_range(start..end).await?;
660
661            // Write to output
662            self.writer.write_all(compressed_data.as_slice())?;
663
664            // Add to index with remapped doc IDs
665            self.index.push(StoreBlockIndex {
666                first_doc_id: self.next_doc_id,
667                offset: self.current_offset,
668                length: block.length,
669                num_docs: block.num_docs,
670            });
671
672            self.current_offset += block.length as u64;
673            self.next_doc_id += block.num_docs;
674        }
675
676        Ok(())
677    }
678
679    /// Finish writing the merged store
680    pub fn finish(self) -> io::Result<u32> {
681        let data_end_offset = self.current_offset;
682
683        // No dictionary support for merged stores (would need same dict across all sources)
684        let dict_offset = 0u64;
685
686        // Write index
687        self.writer
688            .write_u32::<LittleEndian>(self.index.len() as u32)?;
689        for entry in &self.index {
690            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
691            self.writer.write_u64::<LittleEndian>(entry.offset)?;
692            self.writer.write_u32::<LittleEndian>(entry.length)?;
693            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
694        }
695
696        // Write footer
697        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
698        self.writer.write_u64::<LittleEndian>(dict_offset)?;
699        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
700        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
701        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
702        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
703
704        Ok(self.next_doc_id)
705    }
706}
707
708impl AsyncStoreReader {
709    /// Get raw block metadata for merging (without loading block data)
710    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
711        self.index
712            .iter()
713            .map(|entry| RawStoreBlock {
714                first_doc_id: entry.first_doc_id,
715                num_docs: entry.num_docs,
716                offset: entry.offset,
717                length: entry.length,
718            })
719            .collect()
720    }
721
722    /// Get the data slice for raw block access
723    pub fn data_slice(&self) -> &LazyFileSlice {
724        &self.data_slice
725    }
726
727    /// Check if this store uses a dictionary (incompatible with raw merging)
728    pub fn has_dict(&self) -> bool {
729        self.dict.is_some()
730    }
731}