Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Write block index + footer to a store file.
29///
30/// Shared by `StoreWriter::finish`, `StoreWriter::finish` (empty), and `StoreMerger::finish`.
31fn write_store_index_and_footer(
32    writer: &mut (impl Write + ?Sized),
33    index: &[StoreBlockIndex],
34    data_end_offset: u64,
35    dict_offset: u64,
36    num_docs: u32,
37    has_dict: bool,
38) -> io::Result<()> {
39    writer.write_u32::<LittleEndian>(index.len() as u32)?;
40    for entry in index {
41        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
42        writer.write_u64::<LittleEndian>(entry.offset)?;
43        writer.write_u32::<LittleEndian>(entry.length)?;
44        writer.write_u32::<LittleEndian>(entry.num_docs)?;
45    }
46    writer.write_u64::<LittleEndian>(data_end_offset)?;
47    writer.write_u64::<LittleEndian>(dict_offset)?;
48    writer.write_u32::<LittleEndian>(num_docs)?;
49    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
50    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
51    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
52    Ok(())
53}
54
55/// Block size for document store (256KB for better compression)
56/// Larger blocks = better compression ratio but more memory per block load
57pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
58
59/// Default dictionary size (64KB is a good balance)
60pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
61
62/// Default compression level for document store
63#[cfg(feature = "native")]
64const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
65
66/// Binary document format:
67///   num_fields: u16
68///   per field: field_id: u16, type_tag: u8, value data
69///     0=Text:         len:u32 + utf8
70///     1=U64:          u64 LE
71///     2=I64:          i64 LE
72///     3=F64:          f64 LE
73///     4=Bytes:        len:u32 + raw
74///     5=SparseVector: count:u32 + count*(u32+f32)
75///     6=DenseVector:  count:u32 + count*f32
76///     7=Json:         len:u32 + json utf8
77pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78    let mut buf = Vec::with_capacity(256);
79    serialize_document_into(doc, schema, &mut buf)?;
80    Ok(buf)
81}
82
83/// Serialize a document into a reusable buffer (clears it first).
84/// Avoids per-document allocation when called in a loop.
85pub fn serialize_document_into(
86    doc: &Document,
87    schema: &Schema,
88    buf: &mut Vec<u8>,
89) -> io::Result<()> {
90    use crate::dsl::FieldValue;
91
92    buf.clear();
93
94    // Two-pass approach avoids allocating a Vec just to count + iterate stored fields.
95    let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
96        // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
97        if matches!(value, FieldValue::DenseVector(_)) {
98            return false;
99        }
100        schema.get_field_entry(*field).is_some_and(|e| e.stored)
101    };
102
103    let stored_count = doc
104        .field_values()
105        .iter()
106        .filter(|(field, value)| is_stored(field, value))
107        .count();
108
109    buf.write_u16::<LittleEndian>(stored_count as u16)?;
110
111    for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
112        buf.write_u16::<LittleEndian>(field.0 as u16)?;
113        match value {
114            FieldValue::Text(s) => {
115                buf.push(0);
116                let bytes = s.as_bytes();
117                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
118                buf.extend_from_slice(bytes);
119            }
120            FieldValue::U64(v) => {
121                buf.push(1);
122                buf.write_u64::<LittleEndian>(*v)?;
123            }
124            FieldValue::I64(v) => {
125                buf.push(2);
126                buf.write_i64::<LittleEndian>(*v)?;
127            }
128            FieldValue::F64(v) => {
129                buf.push(3);
130                buf.write_f64::<LittleEndian>(*v)?;
131            }
132            FieldValue::Bytes(b) => {
133                buf.push(4);
134                buf.write_u32::<LittleEndian>(b.len() as u32)?;
135                buf.extend_from_slice(b);
136            }
137            FieldValue::SparseVector(entries) => {
138                buf.push(5);
139                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
140                for (idx, val) in entries {
141                    buf.write_u32::<LittleEndian>(*idx)?;
142                    buf.write_f32::<LittleEndian>(*val)?;
143                }
144            }
145            FieldValue::DenseVector(values) => {
146                buf.push(6);
147                buf.write_u32::<LittleEndian>(values.len() as u32)?;
148                // Write raw f32 bytes directly
149                let byte_slice = unsafe {
150                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
151                };
152                buf.extend_from_slice(byte_slice);
153            }
154            FieldValue::Json(v) => {
155                buf.push(7);
156                let json_bytes = serde_json::to_vec(v)
157                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
158                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
159                buf.extend_from_slice(&json_bytes);
160            }
161        }
162    }
163
164    Ok(())
165}
166
167/// Compressed block result
168#[cfg(feature = "native")]
169struct CompressedBlock {
170    seq: usize,
171    first_doc_id: DocId,
172    num_docs: u32,
173    compressed: Vec<u8>,
174}
175
176/// Parallel document store writer - compresses blocks immediately when queued
177///
178/// Spawns compression tasks as soon as blocks are ready, overlapping document
179/// ingestion with compression to reduce total indexing time.
180///
181/// Uses background threads to compress blocks while the main thread continues
182/// accepting documents.
183#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185    writer: &'a mut dyn Write,
186    block_buffer: Vec<u8>,
187    /// Compressed blocks ready to be written (may arrive out of order)
188    compressed_blocks: Vec<CompressedBlock>,
189    /// Handles for in-flight compression tasks
190    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
191    next_seq: usize,
192    next_doc_id: DocId,
193    block_first_doc: DocId,
194    dict: Option<Arc<CompressionDict>>,
195    compression_level: CompressionLevel,
196}
197
198#[cfg(feature = "native")]
199impl<'a> EagerParallelStoreWriter<'a> {
200    /// Create a new eager parallel store writer
201    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
202        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
203    }
204
205    /// Create with specific compression level
206    pub fn with_compression_level(
207        writer: &'a mut dyn Write,
208        _num_threads: usize,
209        compression_level: CompressionLevel,
210    ) -> Self {
211        Self {
212            writer,
213            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
214            compressed_blocks: Vec::new(),
215            pending_handles: Vec::new(),
216            next_seq: 0,
217            next_doc_id: 0,
218            block_first_doc: 0,
219            dict: None,
220            compression_level,
221        }
222    }
223
224    /// Create with dictionary
225    pub fn with_dict(
226        writer: &'a mut dyn Write,
227        dict: CompressionDict,
228        _num_threads: usize,
229    ) -> Self {
230        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
231    }
232
233    /// Create with dictionary and specific compression level
234    pub fn with_dict_and_level(
235        writer: &'a mut dyn Write,
236        dict: CompressionDict,
237        _num_threads: usize,
238        compression_level: CompressionLevel,
239    ) -> Self {
240        Self {
241            writer,
242            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
243            compressed_blocks: Vec::new(),
244            pending_handles: Vec::new(),
245            next_seq: 0,
246            next_doc_id: 0,
247            block_first_doc: 0,
248            dict: Some(Arc::new(dict)),
249            compression_level,
250        }
251    }
252
253    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
254        let doc_bytes = serialize_document(doc, schema)?;
255        self.store_raw(&doc_bytes)
256    }
257
258    /// Store pre-serialized document bytes directly (avoids deserialize+reserialize roundtrip).
259    pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
260        let doc_id = self.next_doc_id;
261        self.next_doc_id += 1;
262
263        self.block_buffer
264            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
265        self.block_buffer.extend_from_slice(doc_bytes);
266
267        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
268            self.spawn_compression();
269        }
270
271        Ok(doc_id)
272    }
273
274    /// Spawn compression for the current block immediately
275    fn spawn_compression(&mut self) {
276        if self.block_buffer.is_empty() {
277            return;
278        }
279
280        let num_docs = self.next_doc_id - self.block_first_doc;
281        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
282        let seq = self.next_seq;
283        let first_doc_id = self.block_first_doc;
284        let dict = self.dict.clone();
285
286        self.next_seq += 1;
287        self.block_first_doc = self.next_doc_id;
288
289        // Spawn compression task using thread
290        let level = self.compression_level;
291        let handle = std::thread::spawn(move || {
292            let compressed = if let Some(ref d) = dict {
293                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
294            } else {
295                crate::compression::compress(&data, level).expect("compression failed")
296            };
297
298            CompressedBlock {
299                seq,
300                first_doc_id,
301                num_docs,
302                compressed,
303            }
304        });
305
306        self.pending_handles.push(handle);
307    }
308
309    /// Collect any completed compression tasks
310    fn collect_completed(&mut self) {
311        let mut remaining = Vec::new();
312        for handle in self.pending_handles.drain(..) {
313            if handle.is_finished() {
314                if let Ok(block) = handle.join() {
315                    self.compressed_blocks.push(block);
316                }
317            } else {
318                remaining.push(handle);
319            }
320        }
321        self.pending_handles = remaining;
322    }
323
324    pub fn finish(mut self) -> io::Result<u32> {
325        // Spawn compression for any remaining data
326        self.spawn_compression();
327
328        // Collect any already-completed tasks
329        self.collect_completed();
330
331        // Wait for all remaining compression tasks
332        for handle in self.pending_handles.drain(..) {
333            if let Ok(block) = handle.join() {
334                self.compressed_blocks.push(block);
335            }
336        }
337
338        if self.compressed_blocks.is_empty() {
339            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
340            return Ok(0);
341        }
342
343        // Sort by sequence to maintain order
344        self.compressed_blocks.sort_by_key(|b| b.seq);
345
346        // Write blocks in order and build index
347        let mut index = Vec::with_capacity(self.compressed_blocks.len());
348        let mut current_offset = 0u64;
349
350        for block in &self.compressed_blocks {
351            index.push(StoreBlockIndex {
352                first_doc_id: block.first_doc_id,
353                offset: current_offset,
354                length: block.compressed.len() as u32,
355                num_docs: block.num_docs,
356            });
357
358            self.writer.write_all(&block.compressed)?;
359            current_offset += block.compressed.len() as u64;
360        }
361
362        let data_end_offset = current_offset;
363
364        // Write dictionary if present
365        let dict_offset = if let Some(ref dict) = self.dict {
366            let offset = current_offset;
367            let dict_bytes = dict.as_bytes();
368            self.writer
369                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
370            self.writer.write_all(dict_bytes)?;
371            Some(offset)
372        } else {
373            None
374        };
375
376        // Write index + footer
377        write_store_index_and_footer(
378            &mut self.writer,
379            &index,
380            data_end_offset,
381            dict_offset.unwrap_or(0),
382            self.next_doc_id,
383            self.dict.is_some(),
384        )?;
385
386        Ok(self.next_doc_id)
387    }
388}
389
390/// Block index entry for document store
391#[derive(Debug, Clone)]
392pub(crate) struct StoreBlockIndex {
393    pub(crate) first_doc_id: DocId,
394    pub(crate) offset: u64,
395    pub(crate) length: u32,
396    pub(crate) num_docs: u32,
397}
398
399/// Async document store reader - loads blocks on demand
400pub struct AsyncStoreReader {
401    /// LazyFileSlice for the data portion - fetches ranges on demand
402    data_slice: LazyFileSlice,
403    /// Block index
404    index: Vec<StoreBlockIndex>,
405    num_docs: u32,
406    /// Optional compression dictionary
407    dict: Option<CompressionDict>,
408    /// Block cache
409    cache: RwLock<StoreBlockCache>,
410}
411
412/// FIFO block cache — O(1) lookup, insert, and eviction.
413///
414/// Uses VecDeque for eviction order (pop_front = O(1)) instead of
415/// Vec::remove(0) which is O(n). get() is &self for read-lock compatibility.
416struct StoreBlockCache {
417    blocks: FxHashMap<DocId, Arc<Vec<u8>>>,
418    insert_order: std::collections::VecDeque<DocId>,
419    max_blocks: usize,
420}
421
422impl StoreBlockCache {
423    fn new(max_blocks: usize) -> Self {
424        Self {
425            blocks: FxHashMap::default(),
426            insert_order: std::collections::VecDeque::with_capacity(max_blocks),
427            max_blocks,
428        }
429    }
430
431    fn get(&self, first_doc_id: DocId) -> Option<Arc<Vec<u8>>> {
432        self.blocks.get(&first_doc_id).map(Arc::clone)
433    }
434
435    fn insert(&mut self, first_doc_id: DocId, block: Arc<Vec<u8>>) {
436        if self.blocks.contains_key(&first_doc_id) {
437            return; // already cached
438        }
439        while self.blocks.len() >= self.max_blocks {
440            if let Some(evict) = self.insert_order.pop_front() {
441                self.blocks.remove(&evict);
442            } else {
443                break;
444            }
445        }
446        self.blocks.insert(first_doc_id, block);
447        self.insert_order.push_back(first_doc_id);
448    }
449}
450
451impl AsyncStoreReader {
452    /// Open a document store from LazyFileHandle
453    /// Only loads footer and index into memory, data blocks are fetched on-demand
454    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
455        let file_len = file_handle.len();
456        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
457        if file_len < 32 {
458            return Err(io::Error::new(
459                io::ErrorKind::InvalidData,
460                "Store too small",
461            ));
462        }
463
464        // Read footer (32 bytes)
465        let footer = file_handle
466            .read_bytes_range(file_len - 32..file_len)
467            .await?;
468        let mut reader = footer.as_slice();
469        let data_end_offset = reader.read_u64::<LittleEndian>()?;
470        let dict_offset = reader.read_u64::<LittleEndian>()?;
471        let num_docs = reader.read_u32::<LittleEndian>()?;
472        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
473        let version = reader.read_u32::<LittleEndian>()?;
474        let magic = reader.read_u32::<LittleEndian>()?;
475
476        if magic != STORE_MAGIC {
477            return Err(io::Error::new(
478                io::ErrorKind::InvalidData,
479                "Invalid store magic",
480            ));
481        }
482        if version != STORE_VERSION {
483            return Err(io::Error::new(
484                io::ErrorKind::InvalidData,
485                format!("Unsupported store version: {}", version),
486            ));
487        }
488
489        // Load dictionary if present, and compute index_start in one pass
490        let (dict, index_start) = if has_dict && dict_offset > 0 {
491            let dict_start = dict_offset;
492            let dict_len_bytes = file_handle
493                .read_bytes_range(dict_start..dict_start + 4)
494                .await?;
495            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
496            let dict_bytes = file_handle
497                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
498                .await?;
499            let idx_start = dict_start + 4 + dict_len;
500            (
501                Some(CompressionDict::from_bytes(dict_bytes.to_vec())),
502                idx_start,
503            )
504        } else {
505            (None, data_end_offset)
506        };
507        let index_end = file_len - 32;
508
509        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
510        let mut reader = index_bytes.as_slice();
511
512        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
513        let mut index = Vec::with_capacity(num_blocks);
514
515        for _ in 0..num_blocks {
516            let first_doc_id = reader.read_u32::<LittleEndian>()?;
517            let offset = reader.read_u64::<LittleEndian>()?;
518            let length = reader.read_u32::<LittleEndian>()?;
519            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
520
521            index.push(StoreBlockIndex {
522                first_doc_id,
523                offset,
524                length,
525                num_docs: num_docs_in_block,
526            });
527        }
528
529        // Create lazy slice for data portion only
530        let data_slice = file_handle.slice(0..data_end_offset);
531
532        Ok(Self {
533            data_slice,
534            index,
535            num_docs,
536            dict,
537            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
538        })
539    }
540
541    /// Number of documents
542    pub fn num_docs(&self) -> u32 {
543        self.num_docs
544    }
545
546    /// Number of blocks currently in the cache
547    pub fn cached_blocks(&self) -> usize {
548        self.cache.read().blocks.len()
549    }
550
551    /// Get a document by doc_id (async - may load block)
552    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
553        if doc_id >= self.num_docs {
554            return Ok(None);
555        }
556
557        // Find block containing this doc_id
558        let block_idx = self
559            .index
560            .binary_search_by(|entry| {
561                if doc_id < entry.first_doc_id {
562                    std::cmp::Ordering::Greater
563                } else if doc_id >= entry.first_doc_id + entry.num_docs {
564                    std::cmp::Ordering::Less
565                } else {
566                    std::cmp::Ordering::Equal
567                }
568            })
569            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
570
571        let entry = &self.index[block_idx];
572        let block_data = self.load_block(entry).await?;
573
574        // Find document within block
575        let doc_offset_in_block = doc_id - entry.first_doc_id;
576        let mut reader = &block_data[..];
577
578        for _ in 0..doc_offset_in_block {
579            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
580            if doc_len > reader.len() {
581                return Err(io::Error::new(
582                    io::ErrorKind::InvalidData,
583                    "Invalid doc length",
584                ));
585            }
586            reader = &reader[doc_len..];
587        }
588
589        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
590        let doc_bytes = &reader[..doc_len];
591
592        deserialize_document(doc_bytes, schema).map(Some)
593    }
594
595    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<Vec<u8>>> {
596        // Check cache first (read lock — concurrent cache hits don't serialize)
597        {
598            if let Some(block) = self.cache.read().get(entry.first_doc_id) {
599                return Ok(block);
600            }
601        }
602
603        // Load from FileSlice
604        let start = entry.offset;
605        let end = start + entry.length as u64;
606        let compressed = self.data_slice.read_bytes_range(start..end).await?;
607
608        // Use dictionary decompression if available
609        let decompressed = if let Some(ref dict) = self.dict {
610            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
611        } else {
612            crate::compression::decompress(compressed.as_slice())?
613        };
614
615        let block = Arc::new(decompressed);
616
617        // Insert into cache
618        {
619            let mut cache = self.cache.write();
620            cache.insert(entry.first_doc_id, Arc::clone(&block));
621        }
622
623        Ok(block)
624    }
625}
626
627pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
628    use crate::dsl::Field;
629
630    let mut reader = data;
631    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
632    let mut doc = Document::new();
633
634    for _ in 0..num_fields {
635        let field_id = reader.read_u16::<LittleEndian>()?;
636        let field = Field(field_id as u32);
637        let type_tag = reader.read_u8()?;
638
639        match type_tag {
640            0 => {
641                // Text
642                let len = reader.read_u32::<LittleEndian>()? as usize;
643                let s = std::str::from_utf8(&reader[..len])
644                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
645                doc.add_text(field, s);
646                reader = &reader[len..];
647            }
648            1 => {
649                // U64
650                doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
651            }
652            2 => {
653                // I64
654                doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
655            }
656            3 => {
657                // F64
658                doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
659            }
660            4 => {
661                // Bytes
662                let len = reader.read_u32::<LittleEndian>()? as usize;
663                doc.add_bytes(field, reader[..len].to_vec());
664                reader = &reader[len..];
665            }
666            5 => {
667                // SparseVector
668                let count = reader.read_u32::<LittleEndian>()? as usize;
669                let mut entries = Vec::with_capacity(count);
670                for _ in 0..count {
671                    let idx = reader.read_u32::<LittleEndian>()?;
672                    let val = reader.read_f32::<LittleEndian>()?;
673                    entries.push((idx, val));
674                }
675                doc.add_sparse_vector(field, entries);
676            }
677            6 => {
678                // DenseVector
679                let count = reader.read_u32::<LittleEndian>()? as usize;
680                let byte_len = count * 4;
681                if reader.len() < byte_len {
682                    return Err(io::Error::new(
683                        io::ErrorKind::UnexpectedEof,
684                        format!(
685                            "dense vector field {}: need {} bytes but only {} remain",
686                            field.0,
687                            byte_len,
688                            reader.len()
689                        ),
690                    ));
691                }
692                let mut values = vec![0.0f32; count];
693                // Read raw f32 bytes directly
694                unsafe {
695                    std::ptr::copy_nonoverlapping(
696                        reader.as_ptr(),
697                        values.as_mut_ptr() as *mut u8,
698                        byte_len,
699                    );
700                }
701                reader = &reader[byte_len..];
702                doc.add_dense_vector(field, values);
703            }
704            7 => {
705                // Json
706                let len = reader.read_u32::<LittleEndian>()? as usize;
707                let v: serde_json::Value = serde_json::from_slice(&reader[..len])
708                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
709                doc.add_json(field, v);
710                reader = &reader[len..];
711            }
712            _ => {
713                return Err(io::Error::new(
714                    io::ErrorKind::InvalidData,
715                    format!("Unknown field type tag: {}", type_tag),
716                ));
717            }
718        }
719    }
720
721    Ok(doc)
722}
723
724/// Raw block info for store merging (without decompression)
725#[derive(Debug, Clone)]
726pub struct RawStoreBlock {
727    pub first_doc_id: DocId,
728    pub num_docs: u32,
729    pub offset: u64,
730    pub length: u32,
731}
732
733/// Store merger - concatenates compressed blocks from multiple stores without recompression
734///
735/// This is much faster than rebuilding stores since it avoids:
736/// - Decompressing blocks from source stores
737/// - Re-serializing documents
738/// - Re-compressing blocks at level 22
739///
740/// Limitations:
741/// - All source stores must NOT use dictionaries (or use the same dictionary)
742/// - Doc IDs are remapped sequentially
743pub struct StoreMerger<'a, W: Write> {
744    writer: &'a mut W,
745    index: Vec<StoreBlockIndex>,
746    current_offset: u64,
747    next_doc_id: DocId,
748}
749
750impl<'a, W: Write> StoreMerger<'a, W> {
751    pub fn new(writer: &'a mut W) -> Self {
752        Self {
753            writer,
754            index: Vec::new(),
755            current_offset: 0,
756            next_doc_id: 0,
757        }
758    }
759
760    /// Append raw compressed blocks from a store file
761    ///
762    /// `data_slice` should be the data portion of the store (before index/footer)
763    /// `blocks` contains the block metadata from the source store
764    pub async fn append_store<F: AsyncFileRead>(
765        &mut self,
766        data_slice: &F,
767        blocks: &[RawStoreBlock],
768    ) -> io::Result<()> {
769        for block in blocks {
770            // Read raw compressed block data
771            let start = block.offset;
772            let end = start + block.length as u64;
773            let compressed_data = data_slice.read_bytes_range(start..end).await?;
774
775            // Write to output
776            self.writer.write_all(compressed_data.as_slice())?;
777
778            // Add to index with remapped doc IDs
779            self.index.push(StoreBlockIndex {
780                first_doc_id: self.next_doc_id,
781                offset: self.current_offset,
782                length: block.length,
783                num_docs: block.num_docs,
784            });
785
786            self.current_offset += block.length as u64;
787            self.next_doc_id += block.num_docs;
788        }
789
790        Ok(())
791    }
792
793    /// Append blocks from a dict-compressed store by decompressing and recompressing.
794    ///
795    /// For stores that use dictionary compression, raw blocks can't be stacked
796    /// directly because the decompressor needs the original dictionary.
797    /// This method decompresses each block with the source dict, then
798    /// recompresses without a dictionary so the merged output is self-contained.
799    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
800        let dict = store.dict();
801        let data_slice = store.data_slice();
802        let blocks = store.block_index();
803
804        for block in blocks {
805            let start = block.offset;
806            let end = start + block.length as u64;
807            let compressed = data_slice.read_bytes_range(start..end).await?;
808
809            // Decompress with source dict (or without if no dict)
810            let decompressed = if let Some(d) = dict {
811                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
812            } else {
813                crate::compression::decompress(compressed.as_slice())?
814            };
815
816            // Recompress without dictionary
817            let recompressed = crate::compression::compress(
818                &decompressed,
819                crate::compression::CompressionLevel::default(),
820            )?;
821
822            self.writer.write_all(&recompressed)?;
823
824            self.index.push(StoreBlockIndex {
825                first_doc_id: self.next_doc_id,
826                offset: self.current_offset,
827                length: recompressed.len() as u32,
828                num_docs: block.num_docs,
829            });
830
831            self.current_offset += recompressed.len() as u64;
832            self.next_doc_id += block.num_docs;
833        }
834
835        Ok(())
836    }
837
838    /// Finish writing the merged store
839    pub fn finish(self) -> io::Result<u32> {
840        let data_end_offset = self.current_offset;
841
842        // No dictionary support for merged stores (would need same dict across all sources)
843        let dict_offset = 0u64;
844
845        // Write index + footer
846        write_store_index_and_footer(
847            self.writer,
848            &self.index,
849            data_end_offset,
850            dict_offset,
851            self.next_doc_id,
852            false,
853        )?;
854
855        Ok(self.next_doc_id)
856    }
857}
858
859impl AsyncStoreReader {
860    /// Get raw block metadata for merging (without loading block data)
861    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
862        self.index
863            .iter()
864            .map(|entry| RawStoreBlock {
865                first_doc_id: entry.first_doc_id,
866                num_docs: entry.num_docs,
867                offset: entry.offset,
868                length: entry.length,
869            })
870            .collect()
871    }
872
873    /// Get the data slice for raw block access
874    pub fn data_slice(&self) -> &LazyFileSlice {
875        &self.data_slice
876    }
877
878    /// Check if this store uses a dictionary (incompatible with raw merging)
879    pub fn has_dict(&self) -> bool {
880        self.dict.is_some()
881    }
882
883    /// Get the decompression dictionary (if any)
884    pub fn dict(&self) -> Option<&CompressionDict> {
885        self.dict.as_ref()
886    }
887
888    /// Get block index for iteration
889    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
890        &self.index
891    }
892}