hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::{CompressionDict, CompressionLevel};
20use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
21use crate::dsl::{Document, Schema};
22
23const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
24const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
25
26/// Block size for document store (256KB for better compression)
27/// Larger blocks = better compression ratio but more memory per block load
28pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
29
30/// Default dictionary size (64KB is a good balance)
31pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
32
33/// Compression level for document store (maximum for static indexes)
34const COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::MAX;
35
36/// Document store writer with optional dictionary compression
37pub struct StoreWriter<'a> {
38    writer: &'a mut dyn Write,
39    block_buffer: Vec<u8>,
40    index: Vec<StoreBlockIndex>,
41    current_offset: u64,
42    next_doc_id: DocId,
43    block_first_doc: DocId,
44    dict: Option<CompressionDict>,
45}
46
47impl<'a> StoreWriter<'a> {
48    /// Create a new store writer without dictionary
49    pub fn new(writer: &'a mut dyn Write) -> Self {
50        Self {
51            writer,
52            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
53            index: Vec::new(),
54            current_offset: 0,
55            next_doc_id: 0,
56            block_first_doc: 0,
57            dict: None,
58        }
59    }
60
61    /// Create a new store writer with a trained dictionary
62    pub fn with_dict(writer: &'a mut dyn Write, dict: CompressionDict) -> Self {
63        Self {
64            writer,
65            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
66            index: Vec::new(),
67            current_offset: 0,
68            next_doc_id: 0,
69            block_first_doc: 0,
70            dict: Some(dict),
71        }
72    }
73
74    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
75        let doc_id = self.next_doc_id;
76        self.next_doc_id += 1;
77
78        let doc_bytes = serialize_document(doc, schema)?;
79
80        self.block_buffer
81            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
82        self.block_buffer.extend_from_slice(&doc_bytes);
83
84        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
85            self.flush_block()?;
86        }
87
88        Ok(doc_id)
89    }
90
91    fn flush_block(&mut self) -> io::Result<()> {
92        if self.block_buffer.is_empty() {
93            return Ok(());
94        }
95
96        let num_docs = self.next_doc_id - self.block_first_doc;
97
98        // Use dictionary compression if available, otherwise standard compression
99        let compressed = if let Some(ref dict) = self.dict {
100            crate::compression::compress_with_dict(&self.block_buffer, COMPRESSION_LEVEL, dict)?
101        } else {
102            crate::compression::compress(&self.block_buffer, COMPRESSION_LEVEL)?
103        };
104
105        self.index.push(StoreBlockIndex {
106            first_doc_id: self.block_first_doc,
107            offset: self.current_offset,
108            length: compressed.len() as u32,
109            num_docs,
110        });
111
112        self.writer.write_all(&compressed)?;
113        self.current_offset += compressed.len() as u64;
114
115        self.block_buffer.clear();
116        self.block_first_doc = self.next_doc_id;
117
118        Ok(())
119    }
120
121    pub fn finish(mut self) -> io::Result<u32> {
122        self.flush_block()?;
123
124        let data_end_offset = self.current_offset;
125
126        // Write dictionary if present
127        let dict_offset = if let Some(ref dict) = self.dict {
128            let offset = self.current_offset;
129            let dict_bytes = dict.as_bytes();
130            self.writer
131                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
132            self.writer.write_all(dict_bytes)?;
133            self.current_offset += 4 + dict_bytes.len() as u64;
134            Some(offset)
135        } else {
136            None
137        };
138
139        // Write index
140        self.writer
141            .write_u32::<LittleEndian>(self.index.len() as u32)?;
142        for entry in &self.index {
143            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
144            self.writer.write_u64::<LittleEndian>(entry.offset)?;
145            self.writer.write_u32::<LittleEndian>(entry.length)?;
146            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
147        }
148
149        // Write footer
150        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
151        self.writer
152            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
153        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
154        self.writer
155            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?; // has_dict flag
156        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
157        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
158
159        Ok(self.next_doc_id)
160    }
161}
162
163fn serialize_document(doc: &Document, _schema: &Schema) -> io::Result<Vec<u8>> {
164    serde_json::to_vec(doc).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
165}
166
167/// Compressed block result
168#[cfg(feature = "native")]
169struct CompressedBlock {
170    seq: usize,
171    first_doc_id: DocId,
172    num_docs: u32,
173    compressed: Vec<u8>,
174}
175
176/// Parallel document store writer - compresses blocks immediately when queued
177///
178/// Spawns compression tasks as soon as blocks are ready, overlapping document
179/// ingestion with compression to reduce total indexing time.
180///
181/// Uses background threads to compress blocks while the main thread continues
182/// accepting documents.
183#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185    writer: &'a mut dyn Write,
186    block_buffer: Vec<u8>,
187    /// Compressed blocks ready to be written (may arrive out of order)
188    compressed_blocks: Vec<CompressedBlock>,
189    /// Handles for in-flight compression tasks
190    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
191    next_seq: usize,
192    next_doc_id: DocId,
193    block_first_doc: DocId,
194    dict: Option<Arc<CompressionDict>>,
195}
196
197#[cfg(feature = "native")]
198impl<'a> EagerParallelStoreWriter<'a> {
199    /// Create a new eager parallel store writer
200    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
201        Self {
202            writer,
203            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
204            compressed_blocks: Vec::new(),
205            pending_handles: Vec::new(),
206            next_seq: 0,
207            next_doc_id: 0,
208            block_first_doc: 0,
209            dict: None,
210        }
211    }
212
213    /// Create with dictionary
214    pub fn with_dict(
215        writer: &'a mut dyn Write,
216        dict: CompressionDict,
217        _num_threads: usize,
218    ) -> Self {
219        Self {
220            writer,
221            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
222            compressed_blocks: Vec::new(),
223            pending_handles: Vec::new(),
224            next_seq: 0,
225            next_doc_id: 0,
226            block_first_doc: 0,
227            dict: Some(Arc::new(dict)),
228        }
229    }
230
231    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
232        let doc_id = self.next_doc_id;
233        self.next_doc_id += 1;
234
235        let doc_bytes = serialize_document(doc, schema)?;
236
237        self.block_buffer
238            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
239        self.block_buffer.extend_from_slice(&doc_bytes);
240
241        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
242            self.spawn_compression();
243        }
244
245        Ok(doc_id)
246    }
247
248    /// Spawn compression for the current block immediately
249    fn spawn_compression(&mut self) {
250        if self.block_buffer.is_empty() {
251            return;
252        }
253
254        let num_docs = self.next_doc_id - self.block_first_doc;
255        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
256        let seq = self.next_seq;
257        let first_doc_id = self.block_first_doc;
258        let dict = self.dict.clone();
259
260        self.next_seq += 1;
261        self.block_first_doc = self.next_doc_id;
262
263        // Spawn compression task using rayon's thread pool
264        let handle = std::thread::spawn(move || {
265            let compressed = if let Some(ref d) = dict {
266                crate::compression::compress_with_dict(&data, COMPRESSION_LEVEL, d)
267                    .expect("compression failed")
268            } else {
269                crate::compression::compress(&data, COMPRESSION_LEVEL).expect("compression failed")
270            };
271
272            CompressedBlock {
273                seq,
274                first_doc_id,
275                num_docs,
276                compressed,
277            }
278        });
279
280        self.pending_handles.push(handle);
281    }
282
283    /// Collect any completed compression tasks
284    fn collect_completed(&mut self) {
285        let mut remaining = Vec::new();
286        for handle in self.pending_handles.drain(..) {
287            if handle.is_finished() {
288                if let Ok(block) = handle.join() {
289                    self.compressed_blocks.push(block);
290                }
291            } else {
292                remaining.push(handle);
293            }
294        }
295        self.pending_handles = remaining;
296    }
297
298    pub fn finish(mut self) -> io::Result<u32> {
299        // Spawn compression for any remaining data
300        self.spawn_compression();
301
302        // Collect any already-completed tasks
303        self.collect_completed();
304
305        // Wait for all remaining compression tasks
306        for handle in self.pending_handles.drain(..) {
307            if let Ok(block) = handle.join() {
308                self.compressed_blocks.push(block);
309            }
310        }
311
312        if self.compressed_blocks.is_empty() {
313            // Write empty store
314            let data_end_offset = 0u64;
315            self.writer.write_u32::<LittleEndian>(0)?; // num blocks
316            self.writer.write_u64::<LittleEndian>(data_end_offset)?;
317            self.writer.write_u64::<LittleEndian>(0)?; // dict offset
318            self.writer.write_u32::<LittleEndian>(0)?; // num docs
319            self.writer.write_u32::<LittleEndian>(0)?; // has_dict
320            self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
321            self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
322            return Ok(0);
323        }
324
325        // Sort by sequence to maintain order
326        self.compressed_blocks.sort_by_key(|b| b.seq);
327
328        // Write blocks in order and build index
329        let mut index = Vec::with_capacity(self.compressed_blocks.len());
330        let mut current_offset = 0u64;
331
332        for block in &self.compressed_blocks {
333            index.push(StoreBlockIndex {
334                first_doc_id: block.first_doc_id,
335                offset: current_offset,
336                length: block.compressed.len() as u32,
337                num_docs: block.num_docs,
338            });
339
340            self.writer.write_all(&block.compressed)?;
341            current_offset += block.compressed.len() as u64;
342        }
343
344        let data_end_offset = current_offset;
345
346        // Write dictionary if present
347        let dict_offset = if let Some(ref dict) = self.dict {
348            let offset = current_offset;
349            let dict_bytes = dict.as_bytes();
350            self.writer
351                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
352            self.writer.write_all(dict_bytes)?;
353            Some(offset)
354        } else {
355            None
356        };
357
358        // Write index
359        self.writer.write_u32::<LittleEndian>(index.len() as u32)?;
360        for entry in &index {
361            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
362            self.writer.write_u64::<LittleEndian>(entry.offset)?;
363            self.writer.write_u32::<LittleEndian>(entry.length)?;
364            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
365        }
366
367        // Write footer
368        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
369        self.writer
370            .write_u64::<LittleEndian>(dict_offset.unwrap_or(0))?;
371        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
372        self.writer
373            .write_u32::<LittleEndian>(if self.dict.is_some() { 1 } else { 0 })?;
374        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
375        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
376
377        Ok(self.next_doc_id)
378    }
379}
380
381/// Block index entry for document store
382#[derive(Debug, Clone)]
383struct StoreBlockIndex {
384    first_doc_id: DocId,
385    offset: u64,
386    length: u32,
387    num_docs: u32,
388}
389
390/// Async document store reader - loads blocks on demand
391pub struct AsyncStoreReader {
392    /// LazyFileSlice for the data portion - fetches ranges on demand
393    data_slice: LazyFileSlice,
394    /// Block index
395    index: Vec<StoreBlockIndex>,
396    num_docs: u32,
397    /// Optional compression dictionary
398    dict: Option<CompressionDict>,
399    /// Block cache
400    cache: RwLock<StoreBlockCache>,
401}
402
403struct StoreBlockCache {
404    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
405    access_order: Vec<DocId>,
406    max_blocks: usize,
407}
408
409impl StoreBlockCache {
410    fn new(max_blocks: usize) -> Self {
411        Self {
412            blocks: FxHashMap::default(),
413            access_order: Vec::new(),
414            max_blocks,
415        }
416    }
417
418    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
419        if let Some(block) = self.blocks.get(&first_doc_id) {
420            if let Some(pos) = self.access_order.iter().position(|&d| d == first_doc_id) {
421                self.access_order.remove(pos);
422                self.access_order.push(first_doc_id);
423            }
424            Some(Arc::clone(block))
425        } else {
426            None
427        }
428    }
429
430    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
431        while self.blocks.len() >= self.max_blocks && !self.access_order.is_empty() {
432            let evict = self.access_order.remove(0);
433            self.blocks.remove(&evict);
434        }
435        self.blocks.insert(first_doc_id, block);
436        self.access_order.push(first_doc_id);
437    }
438}
439
440impl AsyncStoreReader {
441    /// Open a document store from LazyFileHandle
442    /// Only loads footer and index into memory, data blocks are fetched on-demand
443    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
444        let file_len = file_handle.len();
445        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
446        if file_len < 32 {
447            return Err(io::Error::new(
448                io::ErrorKind::InvalidData,
449                "Store too small",
450            ));
451        }
452
453        // Read footer (32 bytes)
454        let footer = file_handle
455            .read_bytes_range(file_len - 32..file_len)
456            .await?;
457        let mut reader = footer.as_slice();
458        let data_end_offset = reader.read_u64::<LittleEndian>()?;
459        let dict_offset = reader.read_u64::<LittleEndian>()?;
460        let num_docs = reader.read_u32::<LittleEndian>()?;
461        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
462        let version = reader.read_u32::<LittleEndian>()?;
463        let magic = reader.read_u32::<LittleEndian>()?;
464
465        if magic != STORE_MAGIC {
466            return Err(io::Error::new(
467                io::ErrorKind::InvalidData,
468                "Invalid store magic",
469            ));
470        }
471        if version != STORE_VERSION {
472            return Err(io::Error::new(
473                io::ErrorKind::InvalidData,
474                format!("Unsupported store version: {}", version),
475            ));
476        }
477
478        // Load dictionary if present
479        let dict = if has_dict && dict_offset > 0 {
480            let dict_start = dict_offset as usize;
481            let dict_len_bytes = file_handle
482                .read_bytes_range(dict_start..dict_start + 4)
483                .await?;
484            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
485            let dict_bytes = file_handle
486                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
487                .await?;
488            Some(CompressionDict::from_bytes(dict_bytes.to_vec()))
489        } else {
490            None
491        };
492
493        // Calculate index location
494        let index_start = if has_dict && dict_offset > 0 {
495            let dict_start = dict_offset as usize;
496            let dict_len_bytes = file_handle
497                .read_bytes_range(dict_start..dict_start + 4)
498                .await?;
499            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as usize;
500            dict_start + 4 + dict_len
501        } else {
502            data_end_offset as usize
503        };
504        let index_end = file_len - 32;
505
506        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
507        let mut reader = index_bytes.as_slice();
508
509        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
510        let mut index = Vec::with_capacity(num_blocks);
511
512        for _ in 0..num_blocks {
513            let first_doc_id = reader.read_u32::<LittleEndian>()?;
514            let offset = reader.read_u64::<LittleEndian>()?;
515            let length = reader.read_u32::<LittleEndian>()?;
516            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
517
518            index.push(StoreBlockIndex {
519                first_doc_id,
520                offset,
521                length,
522                num_docs: num_docs_in_block,
523            });
524        }
525
526        // Create lazy slice for data portion only
527        let data_slice = file_handle.slice(0..data_end_offset as usize);
528
529        Ok(Self {
530            data_slice,
531            index,
532            num_docs,
533            dict,
534            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
535        })
536    }
537
538    /// Number of documents
539    pub fn num_docs(&self) -> u32 {
540        self.num_docs
541    }
542
543    /// Get a document by doc_id (async - may load block)
544    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
545        if doc_id >= self.num_docs {
546            return Ok(None);
547        }
548
549        // Find block containing this doc_id
550        let block_idx = self
551            .index
552            .binary_search_by(|entry| {
553                if doc_id < entry.first_doc_id {
554                    std::cmp::Ordering::Greater
555                } else if doc_id >= entry.first_doc_id + entry.num_docs {
556                    std::cmp::Ordering::Less
557                } else {
558                    std::cmp::Ordering::Equal
559                }
560            })
561            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
562
563        let entry = &self.index[block_idx];
564        let block_data = self.load_block(entry).await?;
565
566        // Find document within block
567        let doc_offset_in_block = doc_id - entry.first_doc_id;
568        let mut reader = &block_data[..];
569
570        for _ in 0..doc_offset_in_block {
571            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
572            if doc_len > reader.len() {
573                return Err(io::Error::new(
574                    io::ErrorKind::InvalidData,
575                    "Invalid doc length",
576                ));
577            }
578            reader = &reader[doc_len..];
579        }
580
581        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
582        let doc_bytes = &reader[..doc_len];
583
584        deserialize_document(doc_bytes, schema).map(Some)
585    }
586
587    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
588        // Check cache
589        {
590            let mut cache = self.cache.write();
591            if let Some(block) = cache.get(entry.first_doc_id) {
592                return Ok(block);
593            }
594        }
595
596        // Load from FileSlice
597        let start = entry.offset as usize;
598        let end = start + entry.length as usize;
599        let compressed = self.data_slice.read_bytes_range(start..end).await?;
600
601        // Use dictionary decompression if available
602        let decompressed = if let Some(ref dict) = self.dict {
603            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
604        } else {
605            crate::compression::decompress(compressed.as_slice())?
606        };
607
608        let block = Arc::new(decompressed);
609
610        // Insert into cache
611        {
612            let mut cache = self.cache.write();
613            cache.insert(entry.first_doc_id, Arc::clone(&block));
614        }
615
616        Ok(block)
617    }
618}
619
620fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
621    serde_json::from_slice(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
622}
623
624/// Raw block info for store merging (without decompression)
625#[derive(Debug, Clone)]
626pub struct RawStoreBlock {
627    pub first_doc_id: DocId,
628    pub num_docs: u32,
629    pub offset: u64,
630    pub length: u32,
631}
632
633/// Store merger - concatenates compressed blocks from multiple stores without recompression
634///
635/// This is much faster than rebuilding stores since it avoids:
636/// - Decompressing blocks from source stores
637/// - Re-serializing documents
638/// - Re-compressing blocks at level 22
639///
640/// Limitations:
641/// - All source stores must NOT use dictionaries (or use the same dictionary)
642/// - Doc IDs are remapped sequentially
643pub struct StoreMerger<'a, W: Write> {
644    writer: &'a mut W,
645    index: Vec<StoreBlockIndex>,
646    current_offset: u64,
647    next_doc_id: DocId,
648}
649
650impl<'a, W: Write> StoreMerger<'a, W> {
651    pub fn new(writer: &'a mut W) -> Self {
652        Self {
653            writer,
654            index: Vec::new(),
655            current_offset: 0,
656            next_doc_id: 0,
657        }
658    }
659
660    /// Append raw compressed blocks from a store file
661    ///
662    /// `data_slice` should be the data portion of the store (before index/footer)
663    /// `blocks` contains the block metadata from the source store
664    pub async fn append_store<F: AsyncFileRead>(
665        &mut self,
666        data_slice: &F,
667        blocks: &[RawStoreBlock],
668    ) -> io::Result<()> {
669        for block in blocks {
670            // Read raw compressed block data
671            let start = block.offset as usize;
672            let end = start + block.length as usize;
673            let compressed_data = data_slice.read_bytes_range(start..end).await?;
674
675            // Write to output
676            self.writer.write_all(compressed_data.as_slice())?;
677
678            // Add to index with remapped doc IDs
679            self.index.push(StoreBlockIndex {
680                first_doc_id: self.next_doc_id,
681                offset: self.current_offset,
682                length: block.length,
683                num_docs: block.num_docs,
684            });
685
686            self.current_offset += block.length as u64;
687            self.next_doc_id += block.num_docs;
688        }
689
690        Ok(())
691    }
692
693    /// Finish writing the merged store
694    pub fn finish(self) -> io::Result<u32> {
695        let data_end_offset = self.current_offset;
696
697        // No dictionary support for merged stores (would need same dict across all sources)
698        let dict_offset = 0u64;
699
700        // Write index
701        self.writer
702            .write_u32::<LittleEndian>(self.index.len() as u32)?;
703        for entry in &self.index {
704            self.writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
705            self.writer.write_u64::<LittleEndian>(entry.offset)?;
706            self.writer.write_u32::<LittleEndian>(entry.length)?;
707            self.writer.write_u32::<LittleEndian>(entry.num_docs)?;
708        }
709
710        // Write footer
711        self.writer.write_u64::<LittleEndian>(data_end_offset)?;
712        self.writer.write_u64::<LittleEndian>(dict_offset)?;
713        self.writer.write_u32::<LittleEndian>(self.next_doc_id)?;
714        self.writer.write_u32::<LittleEndian>(0)?; // has_dict = false
715        self.writer.write_u32::<LittleEndian>(STORE_VERSION)?;
716        self.writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
717
718        Ok(self.next_doc_id)
719    }
720}
721
722impl AsyncStoreReader {
723    /// Get raw block metadata for merging (without loading block data)
724    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
725        self.index
726            .iter()
727            .map(|entry| RawStoreBlock {
728                first_doc_id: entry.first_doc_id,
729                num_docs: entry.num_docs,
730                offset: entry.offset,
731                length: entry.length,
732            })
733            .collect()
734    }
735
736    /// Get the data slice for raw block access
737    pub fn data_slice(&self) -> &LazyFileSlice {
738        &self.data_slice
739    }
740
741    /// Check if this store uses a dictionary (incompatible with raw merging)
742    pub fn has_dict(&self) -> bool {
743        self.dict.is_some()
744    }
745}