hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
24const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
25
26/// Block size for document store (256KB for better compression)
27/// Larger blocks = better compression ratio but more memory per block load
28pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30/// Default dictionary size (64KB is a good balance)
31pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33/// Compression level for document store (maximum for static indexes)
34const COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::MAX;
35
36/// Document store writer with optional dictionary compression
37pub struct StoreWriter<'a> {
38    writer: &'a mut dyn Write,
39    block_buffer: Vec<u8>,
40    index: Vec<StoreBlockIndex>,
41    current_offset: u64,
42    next_doc_id: DocId,
43    block_first_doc: DocId,
44    dict: Option<CompressionDict>,
45}
46
47impl<'a> StoreWriter<'a> {
48    /// Create a new store writer without dictionary
49    pub fn new(writer: &'a mut dyn Write) -> Self {
50        Self {
51            writer,
52            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
53            index: Vec::new(),
54            current_offset: 0,
55            next_doc_id: 0,
56            block_first_doc: 0,
57            dict: None,
58        }
59    }
60
61    /// Create a new store writer with a trained dictionary
62    pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
63        Self {
64            writer,
65            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
66            index: Vec::new(),
67            current_offset: 0,
68            next_doc_id: 0,
69            block_first_doc: 0,
70            dict: Some(dict),
71        }
72    }
73
74    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
75        let doc_id = self.next_doc_id;
76        self.next_doc_id += 1;
77
78        let doc_bytes = serialize_document(doc, schema)?;
79
80        self.block_buffer
81            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
82        self.block_buffer.extend_from_slice(&doc_bytes);
83
84        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
85            self.flush_block()?;
86        }
87
88        Ok(doc_id)
89    }
90
91    fn flush_block(&mut self) -> io::Result<()> {
92        if self.block_buffer.is_empty() {
93            return Ok(());
94        }
95
96        let num_docs = self.next_doc_id - self.block_first_doc;
97
98        // Use dictionary compression if available, otherwise standard compression
99        let compressed = if let Some(ref dict) = self.dict {
100            crate::compression::compress_with_dict(&self.block_buffer, COMPRESSION_LEVEL, dict)?
101        } else {
102            crate::compression::compress(&self.block_buffer, COMPRESSION_LEVEL)?
103        };
104
105        self.index.push(StoreBlockIndex {
106            first_doc_id: self.block_first_doc,
107            offset: self.current_offset,
108            length: compressed.len() as u32,
109            num_docs,
110        });
111
112        self.writer.write_all(&compressed)?;
113        self.current_offset += compressed.len() as u64;
114
115        self.block_buffer.clear();
116        self.block_first_doc = self.next_doc_id;
117
118        Ok(())
119    }
120
121    pub fn finish(mut self) -> io::Result<u32> {
122        self.flush_block()?;
123
124        let data_end_offset = self.current_offset;
125
126        // Write dictionary if present
127        let dict_offset = if let Some(ref dict) = self.dict {
128            let offset = self.current_offset;
129            let dict_bytes = dict.as_bytes();
130            self.writer
131                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
132            self.writer.write_all(dict_bytes)?;
133            self.current_offset += 4 + dict_bytes.len() as u64;
134            Some(offset)
135        } else {
136            None
137        };
138
139        // Write index
140        self.writer
141            .write_u32::<LittleEndian>(self.index.len() as u32)?;
142        for entry in &self.index {
143            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
144            self.writer.write_u64::<LittleEndian>(entry.offset)?;
145            self.writer.write_u32::<LittleEndian>(entry.length)?;
146            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
147        }
148
149        // Write footer
150        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
151        self.writer
152            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
153        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
154        self.writer
155            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; // has_dict flag
156        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
157        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
158
159        Ok(self.next_doc_id)
160    }
161}
162
163fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
164    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
165}
166
167/// Pending block for parallel compression
168#[cfg(feature = "native")]
169struct PendingBlock {
170    seq: usize,
171    first_doc_id: DocId,
172    num_docs: u32,
173    data: Vec<u8>,
174}
175
176/// Compressed block result
177#[cfg(feature = "native")]
178struct CompressedBlock {
179    seq: usize,
180    first_doc_id: DocId,
181    num_docs: u32,
182    compressed: Vec<u8>,
183}
184
185/// Parallel document store writer - compresses blocks in parallel while maintaining order
186///
187/// Uses a thread pool to compress blocks concurrently. Blocks are written in sequence
188/// order to maintain correct document ordering in the output file.
189#[cfg(feature = "native")]
190pub struct ParallelStoreWriter<'a> {
191    writer: &'a mut dyn Write,
192    block_buffer: Vec<u8>,
193    pending_blocks: Vec<PendingBlock>,
194    next_seq: usize,
195    next_doc_id: DocId,
196    block_first_doc: DocId,
197    dict: Option<CompressionDict>,
198    #[allow(dead_code)]
199    num_threads: usize,
200}
201
202#[cfg(feature = "native")]
203impl<'a> ParallelStoreWriter<'a> {
204    /// Create a new parallel store writer
205    pub fn new(writer: &'a mut dyn Write, num_threads: usize) -> Self {
206        Self {
207            writer,
208            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
209            pending_blocks: Vec::new(),
210            next_seq: 0,
211            next_doc_id: 0,
212            block_first_doc: 0,
213            dict: None,
214            num_threads: num_threads.max(1),
215        }
216    }
217
218    /// Create with dictionary
219    pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict, num_threads: usize) -> Self {
220        Self {
221            writer,
222            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
223            pending_blocks: Vec::new(),
224            next_seq: 0,
225            next_doc_id: 0,
226            block_first_doc: 0,
227            dict: Some(dict),
228            num_threads: num_threads.max(1),
229        }
230    }
231
232    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
233        let doc_id = self.next_doc_id;
234        self.next_doc_id += 1;
235
236        let doc_bytes = serialize_document(doc, schema)?;
237
238        self.block_buffer
239            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
240        self.block_buffer.extend_from_slice(&doc_bytes);
241
242        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
243            self.queue_block();
244        }
245
246        Ok(doc_id)
247    }
248
249    fn queue_block(&mut self) {
250        if self.block_buffer.is_empty() {
251            return;
252        }
253
254        let num_docs = self.next_doc_id - self.block_first_doc;
255        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
256
257        self.pending_blocks.push(PendingBlock {
258            seq: self.next_seq,
259            first_doc_id: self.block_first_doc,
260            num_docs,
261            data,
262        });
263
264        self.next_seq += 1;
265        self.block_first_doc = self.next_doc_id;
266    }
267
268    pub fn finish(mut self) -> io::Result<u32> {
269        // Queue any remaining data
270        self.queue_block();
271
272        if self.pending_blocks.is_empty() {
273            // Write empty store
274            let data_end_offset = 0u64;
275            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
276            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
277            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
278            self.writer.write_u32::<LittleEndian>(0)?; // num docs
279            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
280            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
281            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
282            return Ok(0);
283        }
284
285        // Compress all blocks in parallel using rayon
286        let dict = self.dict.clone();
287        let compressed_blocks: Vec<CompressedBlock> = {
288            use rayon::prelude::*;
289
290            self.pending_blocks
291                .into_par_iter()
292                .map(|block| {
293                    let compressed = if let Some(ref d) = dict {
294                        crate::compression::compress_with_dict(&block.data, COMPRESSION_LEVEL, d)
295                            .expect("compression failed")
296                    } else {
297                        crate::compression::compress(&block.data, COMPRESSION_LEVEL)
298                            .expect("compression failed")
299                    };
300
301                    CompressedBlock {
302                        seq: block.seq,
303                        first_doc_id: block.first_doc_id,
304                        num_docs: block.num_docs,
305                        compressed,
306                    }
307                })
308                .collect()
309        };
310
311        // Sort by sequence to maintain order
312        let mut sorted_blocks = compressed_blocks;
313        sorted_blocks.sort_by_key(|b| b.seq);
314
315        // Write blocks in order and build index
316        let mut index = Vec::with_capacity(sorted_blocks.len());
317        let mut current_offset = 0u64;
318
319        for block in sorted_blocks {
320            index.push(StoreBlockIndex {
321                first_doc_id: block.first_doc_id,
322                offset: current_offset,
323                length: block.compressed.len() as u32,
324                num_docs: block.num_docs,
325            });
326
327            self.writer.write_all(&block.compressed)?;
328            current_offset += block.compressed.len() as u64;
329        }
330
331        let data_end_offset = current_offset;
332
333        // Write dictionary if present
334        let dict_offset = if let Some(ref dict) = self.dict {
335            let offset = current_offset;
336            let dict_bytes = dict.as_bytes();
337            self.writer
338                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
339            self.writer.write_all(dict_bytes)?;
340            let _ = current_offset + 4 + dict_bytes.len() as u64; // offset tracking ends here
341            Some(offset)
342        } else {
343            None
344        };
345
346        // Write index
347        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
348        for entry in &index {
349            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
350            self.writer.write_u64::<LittleEndian>(entry.offset)?;
351            self.writer.write_u32::<LittleEndian>(entry.length)?;
352            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
353        }
354
355        // Write footer
356        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
357        self.writer
358            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
359        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
360        self.writer
361            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
362        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
363        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
364
365        Ok(self.next_doc_id)
366    }
367}
368
369/// Block index entry for document store
370#[derive(Debug, Clone)]
371struct StoreBlockIndex {
372    first_doc_id: DocId,
373    offset: u64,
374    length: u32,
375    num_docs: u32,
376}
377
378/// Async document store reader - loads blocks on demand
379pub struct AsyncStoreReader {
380    /// LazyFileSlice for the data portion - fetches ranges on demand
381    data_slice: LazyFileSlice,
382    /// Block index
383    index: Vec<StoreBlockIndex>,
384    num_docs: u32,
385    /// Optional compression dictionary
386    dict: Option<CompressionDict>,
387    /// Block cache
388    cache: RwLock<StoreBlockCache>,
389}
390
391struct StoreBlockCache {
392    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
393    access_order: Vec<DocId>,
394    max_blocks: usize,
395}
396
397impl StoreBlockCache {
398    fn new(max_blocks: usize) -> Self {
399        Self {
400            blocks: FxHashMap::default(),
401            access_order: Vec::new(),
402            max_blocks,
403        }
404    }
405
406    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
407        if let Some(block) = self.blocks.get(&first_doc_id) {
408            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
409                self.access_order.remove(pos);
410                self.access_order.push(first_doc_id);
411            }
412            Some(Arc::clone(block))
413        } else {
414            None
415        }
416    }
417
418    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
419        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
420            let evict = self.access_order.remove(0);
421            self.blocks.remove(&evict);
422        }
423        self.blocks.insert(first_doc_id, block);
424        self.access_order.push(first_doc_id);
425    }
426}
427
428impl AsyncStoreReader {
429    /// Open a document store from LazyFileHandle
430    /// Only loads footer and index into memory, data blocks are fetched on-demand
431    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
432        let file_len = file_handle.len();
433        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
434        if file_len < 32 {
435            return Err(io::Error::new(
436                io::ErrorKind::InvalidData,
437                "Store too small",
438            ));
439        }
440
441        // Read footer (32 bytes)
442        let footer = file_handle
443            .read_bytes_range(file_len - 32..file_len)
444            .await?;
445        let mut reader = footer.as_slice();
446        let data_end_offset = reader.read_u64::<LittleEndian>()?;
447        let dict_offset = reader.read_u64::<LittleEndian>()?;
448        let num_docs = reader.read_u32::<LittleEndian>()?;
449        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
450        let version = reader.read_u32::<LittleEndian>()?;
451        let magic = reader.read_u32::<LittleEndian>()?;
452
453        if magic != STORE_MAGIC {
454            return Err(io::Error::new(
455                io::ErrorKind::InvalidData,
456                "Invalid store magic",
457            ));
458        }
459        if version != STORE_VERSION {
460            return Err(io::Error::new(
461                io::ErrorKind::InvalidData,
462                format!("Unsupported store version: {}", version),
463            ));
464        }
465
466        // Load dictionary if present
467        let dict = if has_dict && dict_offset > 0 {
468            let dict_start = dict_offset as usize;
469            let dict_len_bytes = file_handle
470                .read_bytes_range(dict_start..dict_start + 4)
471                .await?;
472            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
473            let dict_bytes = file_handle
474                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
475                .await?;
476            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
477        } else {
478            None
479        };
480
481        // Calculate index location
482        let index_start = if has_dict && dict_offset > 0 {
483            let dict_start = dict_offset as usize;
484            let dict_len_bytes = file_handle
485                .read_bytes_range(dict_start..dict_start + 4)
486                .await?;
487            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
488            dict_start + 4 + dict_len
489        } else {
490            data_end_offset as usize
491        };
492        let index_end = file_len - 32;
493
494        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
495        let mut reader = index_bytes.as_slice();
496
497        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
498        let mut index = Vec::with_capacity(num_blocks);
499
500        for _ in 0..num_blocks {
501            let first_doc_id = reader.read_u32::<LittleEndian>()?;
502            let offset = reader.read_u64::<LittleEndian>()?;
503            let length = reader.read_u32::<LittleEndian>()?;
504            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
505
506            index.push(StoreBlockIndex {
507                first_doc_id,
508                offset,
509                length,
510                num_docs: num_docs_in_block,
511            });
512        }
513
514        // Create lazy slice for data portion only
515        let data_slice = file_handle.slice(0..data_end_offset as usize);
516
517        Ok(Self {
518            data_slice,
519            index,
520            num_docs,
521            dict,
522            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
523        })
524    }
525
526    /// Number of documents
527    pub fn num_docs(&self) -> u32 {
528        self.num_docs
529    }
530
531    /// Get a document by doc_id (async - may load block)
532    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
533        if doc_id >= self.num_docs {
534            return Ok(None);
535        }
536
537        // Find block containing this doc_id
538        let block_idx = self
539            .index
540            .binary_search_by(|entry| {
541                if doc_id < entry.first_doc_id {
542                    std::cmp::Ordering::Greater
543                } else if doc_id >= entry.first_doc_id + entry.num_docs {
544                    std::cmp::Ordering::Less
545                } else {
546                    std::cmp::Ordering::Equal
547                }
548            })
549            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
550
551        let entry = &self.index[block_idx];
552        let block_data = self.load_block(entry).await?;
553
554        // Find document within block
555        let doc_offset_in_block = doc_id - entry.first_doc_id;
556        let mut reader = &block_data[..];
557
558        for _ in 0..doc_offset_in_block {
559            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
560            if doc_len > reader.len() {
561                return Err(io::Error::new(
562                    io::ErrorKind::InvalidData,
563                    "Invalid doc length",
564                ));
565            }
566            reader = &reader[doc_len..];
567        }
568
569        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
570        let doc_bytes = &reader[..doc_len];
571
572        deserialize_document(doc_bytes, schema).map(Some)
573    }
574
575    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
576        // Check cache
577        {
578            let mut cache = self.cache.write();
579            if let Some(block) = cache.get(entry.first_doc_id) {
580                return Ok(block);
581            }
582        }
583
584        // Load from FileSlice
585        let start = entry.offset as usize;
586        let end = start + entry.length as usize;
587        let compressed = self.data_slice.read_bytes_range(start..end).await?;
588
589        // Use dictionary decompression if available
590        let decompressed = if let Some(ref dict) = self.dict {
591            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
592        } else {
593            crate::compression::decompress(compressed.as_slice())?
594        };
595
596        let block = Arc::new(decompressed);
597
598        // Insert into cache
599        {
600            let mut cache = self.cache.write();
601            cache.insert(entry.first_doc_id, Arc::clone(&block));
602        }
603
604        Ok(block)
605    }
606}
607
608fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
609    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
610}
611
612/// Raw block info for store merging (without decompression)
613#[derive(Debug, Clone)]
614pub struct RawStoreBlock {
615    pub first_doc_id: DocId,
616    pub num_docs: u32,
617    pub offset: u64,
618    pub length: u32,
619}
620
621/// Store merger - concatenates compressed blocks from multiple stores without recompression
622///
623/// This is much faster than rebuilding stores since it avoids:
624/// - Decompressing blocks from source stores
625/// - Re-serializing documents  
626/// - Re-compressing blocks at level 22
627///
628/// Limitations:
629/// - All source stores must NOT use dictionaries (or use the same dictionary)
630/// - Doc IDs are remapped sequentially
631pub struct StoreMerger<'a, W: Write> {
632    writer: &'a mut W,
633    index: Vec<StoreBlockIndex>,
634    current_offset: u64,
635    next_doc_id: DocId,
636}
637
638impl<'a, W: Write> StoreMerger<'a, W> {
639    pub fn new(writer: &'a mut W) -> Self {
640        Self {
641            writer,
642            index: Vec::new(),
643            current_offset: 0,
644            next_doc_id: 0,
645        }
646    }
647
648    /// Append raw compressed blocks from a store file
649    ///
650    /// `data_slice` should be the data portion of the store (before index/footer)
651    /// `blocks` contains the block metadata from the source store
652    pub async fn append_store<F: AsyncFileRead>(
653        &mut self,
654        data_slice: &F,
655        blocks: &[RawStoreBlock],
656    ) -> io::Result<()> {
657        for block in blocks {
658            // Read raw compressed block data
659            let start = block.offset as usize;
660            let end = start + block.length as usize;
661            let compressed_data = data_slice.read_bytes_range(start..end).await?;
662
663            // Write to output
664            self.writer.write_all(compressed_data.as_slice())?;
665
666            // Add to index with remapped doc IDs
667            self.index.push(StoreBlockIndex {
668                first_doc_id: self.next_doc_id,
669                offset: self.current_offset,
670                length: block.length,
671                num_docs: block.num_docs,
672            });
673
674            self.current_offset += block.length as u64;
675            self.next_doc_id += block.num_docs;
676        }
677
678        Ok(())
679    }
680
681    /// Finish writing the merged store
682    pub fn finish(self) -> io::Result<u32> {
683        let data_end_offset = self.current_offset;
684
685        // No dictionary support for merged stores (would need same dict across all sources)
686        let dict_offset = 0u64;
687
688        // Write index
689        self.writer
690            .write_u32::<LittleEndian>(self.index.len() as u32)?;
691        for entry in &self.index {
692            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
693            self.writer.write_u64::<LittleEndian>(entry.offset)?;
694            self.writer.write_u32::<LittleEndian>(entry.length)?;
695            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
696        }
697
698        // Write footer
699        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
700        self.writer.write_u64::<LittleEndian>(dict_offset)?;
701        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
702        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
703        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
704        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
705
706        Ok(self.next_doc_id)
707    }
708}
709
710impl AsyncStoreReader {
711    /// Get raw block metadata for merging (without loading block data)
712    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
713        self.index
714            .iter()
715            .map(|entry| RawStoreBlock {
716                first_doc_id: entry.first_doc_id,
717                num_docs: entry.num_docs,
718                offset: entry.offset,
719                length: entry.length,
720            })
721            .collect()
722    }
723
724    /// Get the data slice for raw block access
725    pub fn data_slice(&self) -> &LazyFileSlice {
726        &self.data_slice
727    }
728
729    /// Check if this store uses a dictionary (incompatible with raw merging)
730    pub fn has_dict(&self) -> bool {
731        self.dict.is_some()
732    }
733}