Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (64KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Write block index + footer to a store file.
29///
30/// Shared by `StoreWriter::finish`, `StoreWriter::finish` (empty), and `StoreMerger::finish`.
31fn write_store_index_and_footer(
32    writer: &mut (impl Write + ?Sized),
33    index: &[StoreBlockIndex],
34    data_end_offset: u64,
35    dict_offset: u64,
36    num_docs: u32,
37    has_dict: bool,
38) -> io::Result<()> {
39    writer.write_u32::<LittleEndian>(index.len() as u32)?;
40    for entry in index {
41        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
42        writer.write_u64::<LittleEndian>(entry.offset)?;
43        writer.write_u32::<LittleEndian>(entry.length)?;
44        writer.write_u32::<LittleEndian>(entry.num_docs)?;
45    }
46    writer.write_u64::<LittleEndian>(data_end_offset)?;
47    writer.write_u64::<LittleEndian>(dict_offset)?;
48    writer.write_u32::<LittleEndian>(num_docs)?;
49    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
50    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
51    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
52    Ok(())
53}
54
55/// Block size for document store (256KB for better compression)
56/// Larger blocks = better compression ratio but more memory per block load
57pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
58
59/// Default dictionary size (64KB is a good balance)
60pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
61
62/// Default compression level for document store
63#[cfg(feature = "native")]
64const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(7);
65
66/// Binary document format:
67///   num_fields: u16
68///   per field: field_id: u16, type_tag: u8, value data
69///     0=Text:         len:u32 + utf8
70///     1=U64:          u64 LE
71///     2=I64:          i64 LE
72///     3=F64:          f64 LE
73///     4=Bytes:        len:u32 + raw
74///     5=SparseVector: count:u32 + count*(u32+f32)
75///     6=DenseVector:  count:u32 + count*f32
76///     7=Json:         len:u32 + json utf8
77pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78    let mut buf = Vec::with_capacity(256);
79    serialize_document_into(doc, schema, &mut buf)?;
80    Ok(buf)
81}
82
83/// Serialize a document into a reusable buffer (clears it first).
84/// Avoids per-document allocation when called in a loop.
85pub fn serialize_document_into(
86    doc: &Document,
87    schema: &Schema,
88    buf: &mut Vec<u8>,
89) -> io::Result<()> {
90    use crate::dsl::FieldValue;
91
92    buf.clear();
93
94    // Two-pass approach avoids allocating a Vec just to count + iterate stored fields.
95    let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
96        // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
97        if matches!(value, FieldValue::DenseVector(_)) {
98            return false;
99        }
100        schema.get_field_entry(*field).is_some_and(|e| e.stored)
101    };
102
103    let stored_count = doc
104        .field_values()
105        .iter()
106        .filter(|(field, value)| is_stored(field, value))
107        .count();
108
109    buf.write_u16::<LittleEndian>(stored_count as u16)?;
110
111    for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
112        buf.write_u16::<LittleEndian>(field.0 as u16)?;
113        match value {
114            FieldValue::Text(s) => {
115                buf.push(0);
116                let bytes = s.as_bytes();
117                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
118                buf.extend_from_slice(bytes);
119            }
120            FieldValue::U64(v) => {
121                buf.push(1);
122                buf.write_u64::<LittleEndian>(*v)?;
123            }
124            FieldValue::I64(v) => {
125                buf.push(2);
126                buf.write_i64::<LittleEndian>(*v)?;
127            }
128            FieldValue::F64(v) => {
129                buf.push(3);
130                buf.write_f64::<LittleEndian>(*v)?;
131            }
132            FieldValue::Bytes(b) => {
133                buf.push(4);
134                buf.write_u32::<LittleEndian>(b.len() as u32)?;
135                buf.extend_from_slice(b);
136            }
137            FieldValue::SparseVector(entries) => {
138                buf.push(5);
139                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
140                for (idx, val) in entries {
141                    buf.write_u32::<LittleEndian>(*idx)?;
142                    buf.write_f32::<LittleEndian>(*val)?;
143                }
144            }
145            FieldValue::DenseVector(values) => {
146                buf.push(6);
147                buf.write_u32::<LittleEndian>(values.len() as u32)?;
148                // Write raw f32 bytes directly
149                let byte_slice = unsafe {
150                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
151                };
152                buf.extend_from_slice(byte_slice);
153            }
154            FieldValue::Json(v) => {
155                buf.push(7);
156                let json_bytes = serde_json::to_vec(v)
157                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
158                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
159                buf.extend_from_slice(&json_bytes);
160            }
161        }
162    }
163
164    Ok(())
165}
166
167/// Compressed block result
168#[cfg(feature = "native")]
169struct CompressedBlock {
170    seq: usize,
171    first_doc_id: DocId,
172    num_docs: u32,
173    compressed: Vec<u8>,
174}
175
176/// Parallel document store writer - compresses blocks immediately when queued
177///
178/// Spawns compression tasks as soon as blocks are ready, overlapping document
179/// ingestion with compression to reduce total indexing time.
180///
181/// Uses background threads to compress blocks while the main thread continues
182/// accepting documents.
183#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185    writer: &'a mut dyn Write,
186    block_buffer: Vec<u8>,
187    /// Compressed blocks ready to be written (may arrive out of order)
188    compressed_blocks: Vec<CompressedBlock>,
189    /// Handles for in-flight compression tasks
190    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
191    next_seq: usize,
192    next_doc_id: DocId,
193    block_first_doc: DocId,
194    dict: Option<Arc<CompressionDict>>,
195    compression_level: CompressionLevel,
196}
197
198#[cfg(feature = "native")]
199impl<'a> EagerParallelStoreWriter<'a> {
200    /// Create a new eager parallel store writer
201    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
202        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
203    }
204
205    /// Create with specific compression level
206    pub fn with_compression_level(
207        writer: &'a mut dyn Write,
208        _num_threads: usize,
209        compression_level: CompressionLevel,
210    ) -> Self {
211        Self {
212            writer,
213            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
214            compressed_blocks: Vec::new(),
215            pending_handles: Vec::new(),
216            next_seq: 0,
217            next_doc_id: 0,
218            block_first_doc: 0,
219            dict: None,
220            compression_level,
221        }
222    }
223
224    /// Create with dictionary
225    pub fn with_dict(
226        writer: &'a mut dyn Write,
227        dict: CompressionDict,
228        _num_threads: usize,
229    ) -> Self {
230        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
231    }
232
233    /// Create with dictionary and specific compression level
234    pub fn with_dict_and_level(
235        writer: &'a mut dyn Write,
236        dict: CompressionDict,
237        _num_threads: usize,
238        compression_level: CompressionLevel,
239    ) -> Self {
240        Self {
241            writer,
242            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
243            compressed_blocks: Vec::new(),
244            pending_handles: Vec::new(),
245            next_seq: 0,
246            next_doc_id: 0,
247            block_first_doc: 0,
248            dict: Some(Arc::new(dict)),
249            compression_level,
250        }
251    }
252
253    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
254        let doc_bytes = serialize_document(doc, schema)?;
255        self.store_raw(&doc_bytes)
256    }
257
258    /// Store pre-serialized document bytes directly (avoids deserialize+reserialize roundtrip).
259    pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
260        let doc_id = self.next_doc_id;
261        self.next_doc_id += 1;
262
263        self.block_buffer
264            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
265        self.block_buffer.extend_from_slice(doc_bytes);
266
267        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
268            self.spawn_compression();
269        }
270
271        Ok(doc_id)
272    }
273
274    /// Spawn compression for the current block immediately
275    fn spawn_compression(&mut self) {
276        if self.block_buffer.is_empty() {
277            return;
278        }
279
280        let num_docs = self.next_doc_id - self.block_first_doc;
281        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
282        let seq = self.next_seq;
283        let first_doc_id = self.block_first_doc;
284        let dict = self.dict.clone();
285
286        self.next_seq += 1;
287        self.block_first_doc = self.next_doc_id;
288
289        // Spawn compression task using thread
290        let level = self.compression_level;
291        let handle = std::thread::spawn(move || {
292            let compressed = if let Some(ref d) = dict {
293                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
294            } else {
295                crate::compression::compress(&data, level).expect("compression failed")
296            };
297
298            CompressedBlock {
299                seq,
300                first_doc_id,
301                num_docs,
302                compressed,
303            }
304        });
305
306        self.pending_handles.push(handle);
307    }
308
309    /// Collect any completed compression tasks
310    fn collect_completed(&mut self) {
311        let mut remaining = Vec::new();
312        for handle in self.pending_handles.drain(..) {
313            if handle.is_finished() {
314                if let Ok(block) = handle.join() {
315                    self.compressed_blocks.push(block);
316                }
317            } else {
318                remaining.push(handle);
319            }
320        }
321        self.pending_handles = remaining;
322    }
323
324    pub fn finish(mut self) -> io::Result<u32> {
325        // Spawn compression for any remaining data
326        self.spawn_compression();
327
328        // Collect any already-completed tasks
329        self.collect_completed();
330
331        // Wait for all remaining compression tasks
332        for handle in self.pending_handles.drain(..) {
333            if let Ok(block) = handle.join() {
334                self.compressed_blocks.push(block);
335            }
336        }
337
338        if self.compressed_blocks.is_empty() {
339            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
340            return Ok(0);
341        }
342
343        // Sort by sequence to maintain order
344        self.compressed_blocks.sort_by_key(|b| b.seq);
345
346        // Write blocks in order and build index
347        let mut index = Vec::with_capacity(self.compressed_blocks.len());
348        let mut current_offset = 0u64;
349
350        for block in &self.compressed_blocks {
351            index.push(StoreBlockIndex {
352                first_doc_id: block.first_doc_id,
353                offset: current_offset,
354                length: block.compressed.len() as u32,
355                num_docs: block.num_docs,
356            });
357
358            self.writer.write_all(&block.compressed)?;
359            current_offset += block.compressed.len() as u64;
360        }
361
362        let data_end_offset = current_offset;
363
364        // Write dictionary if present
365        let dict_offset = if let Some(ref dict) = self.dict {
366            let offset = current_offset;
367            let dict_bytes = dict.as_bytes();
368            self.writer
369                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
370            self.writer.write_all(dict_bytes)?;
371            Some(offset)
372        } else {
373            None
374        };
375
376        // Write index + footer
377        write_store_index_and_footer(
378            &mut self.writer,
379            &index,
380            data_end_offset,
381            dict_offset.unwrap_or(0),
382            self.next_doc_id,
383            self.dict.is_some(),
384        )?;
385
386        Ok(self.next_doc_id)
387    }
388}
389
390/// Block index entry for document store
391#[derive(Debug, Clone)]
392pub(crate) struct StoreBlockIndex {
393    pub(crate) first_doc_id: DocId,
394    pub(crate) offset: u64,
395    pub(crate) length: u32,
396    pub(crate) num_docs: u32,
397}
398
399/// Async document store reader - loads blocks on demand
400pub struct AsyncStoreReader {
401    /// LazyFileSlice for the data portion - fetches ranges on demand
402    data_slice: LazyFileSlice,
403    /// Block index
404    index: Vec<StoreBlockIndex>,
405    num_docs: u32,
406    /// Optional compression dictionary
407    dict: Option<CompressionDict>,
408    /// Block cache
409    cache: RwLock<StoreBlockCache>,
410}
411
412/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
413///
414/// On `get()`, promotes accessed entry to MRU position.
415/// For typical cache sizes (16-64 blocks), the linear promote scan is negligible.
416struct StoreBlockCache {
417    blocks: FxHashMap<DocId, Arc<[u8]>>,
418    lru_order: std::collections::VecDeque<DocId>,
419    max_blocks: usize,
420}
421
422impl StoreBlockCache {
423    fn new(max_blocks: usize) -> Self {
424        Self {
425            blocks: FxHashMap::default(),
426            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
427            max_blocks,
428        }
429    }
430
431    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<[u8]>> {
432        if self.blocks.contains_key(&first_doc_id) {
433            self.promote(first_doc_id);
434            self.blocks.get(&first_doc_id).map(Arc::clone)
435        } else {
436            None
437        }
438    }
439
440    fn insert(&mut self, first_doc_id: DocId, block: Arc<[u8]>) {
441        if self.blocks.contains_key(&first_doc_id) {
442            self.promote(first_doc_id);
443            return;
444        }
445        while self.blocks.len() >= self.max_blocks {
446            if let Some(evict) = self.lru_order.pop_front() {
447                self.blocks.remove(&evict);
448            } else {
449                break;
450            }
451        }
452        self.blocks.insert(first_doc_id, block);
453        self.lru_order.push_back(first_doc_id);
454    }
455
456    fn promote(&mut self, key: DocId) {
457        if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
458            self.lru_order.remove(pos);
459            self.lru_order.push_back(key);
460        }
461    }
462}
463
464impl AsyncStoreReader {
465    /// Open a document store from LazyFileHandle
466    /// Only loads footer and index into memory, data blocks are fetched on-demand
467    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
468        let file_len = file_handle.len();
469        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
470        if file_len < 32 {
471            return Err(io::Error::new(
472                io::ErrorKind::InvalidData,
473                "Store too small",
474            ));
475        }
476
477        // Read footer (32 bytes)
478        let footer = file_handle
479            .read_bytes_range(file_len - 32..file_len)
480            .await?;
481        let mut reader = footer.as_slice();
482        let data_end_offset = reader.read_u64::<LittleEndian>()?;
483        let dict_offset = reader.read_u64::<LittleEndian>()?;
484        let num_docs = reader.read_u32::<LittleEndian>()?;
485        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
486        let version = reader.read_u32::<LittleEndian>()?;
487        let magic = reader.read_u32::<LittleEndian>()?;
488
489        if magic != STORE_MAGIC {
490            return Err(io::Error::new(
491                io::ErrorKind::InvalidData,
492                "Invalid store magic",
493            ));
494        }
495        if version != STORE_VERSION {
496            return Err(io::Error::new(
497                io::ErrorKind::InvalidData,
498                format!("Unsupported store version: {}", version),
499            ));
500        }
501
502        // Load dictionary if present, and compute index_start in one pass
503        let (dict, index_start) = if has_dict && dict_offset > 0 {
504            let dict_start = dict_offset;
505            let dict_len_bytes = file_handle
506                .read_bytes_range(dict_start..dict_start + 4)
507                .await?;
508            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
509            let dict_bytes = file_handle
510                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
511                .await?;
512            let idx_start = dict_start + 4 + dict_len;
513            (
514                Some(CompressionDict::from_bytes(dict_bytes.to_vec())),
515                idx_start,
516            )
517        } else {
518            (None, data_end_offset)
519        };
520        let index_end = file_len - 32;
521
522        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
523        let mut reader = index_bytes.as_slice();
524
525        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
526        let mut index = Vec::with_capacity(num_blocks);
527
528        for _ in 0..num_blocks {
529            let first_doc_id = reader.read_u32::<LittleEndian>()?;
530            let offset = reader.read_u64::<LittleEndian>()?;
531            let length = reader.read_u32::<LittleEndian>()?;
532            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
533
534            index.push(StoreBlockIndex {
535                first_doc_id,
536                offset,
537                length,
538                num_docs: num_docs_in_block,
539            });
540        }
541
542        // Create lazy slice for data portion only
543        let data_slice = file_handle.slice(0..data_end_offset);
544
545        Ok(Self {
546            data_slice,
547            index,
548            num_docs,
549            dict,
550            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
551        })
552    }
553
554    /// Number of documents
555    pub fn num_docs(&self) -> u32 {
556        self.num_docs
557    }
558
559    /// Number of blocks currently in the cache
560    pub fn cached_blocks(&self) -> usize {
561        self.cache.read().blocks.len()
562    }
563
564    /// Get a document by doc_id (async - may load block)
565    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
566        if doc_id >= self.num_docs {
567            return Ok(None);
568        }
569
570        // Find block containing this doc_id
571        let block_idx = self
572            .index
573            .binary_search_by(|entry| {
574                if doc_id < entry.first_doc_id {
575                    std::cmp::Ordering::Greater
576                } else if doc_id >= entry.first_doc_id + entry.num_docs {
577                    std::cmp::Ordering::Less
578                } else {
579                    std::cmp::Ordering::Equal
580                }
581            })
582            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
583
584        let entry = &self.index[block_idx];
585        let block_data = self.load_block(entry).await?;
586
587        // Find document within block
588        let doc_offset_in_block = doc_id - entry.first_doc_id;
589        let mut reader = &block_data[..];
590
591        for _ in 0..doc_offset_in_block {
592            let doc_len = reader.read_u32::<LittleEndian>()? as usize;
593            if doc_len > reader.len() {
594                return Err(io::Error::new(
595                    io::ErrorKind::InvalidData,
596                    "Invalid doc length",
597                ));
598            }
599            reader = &reader[doc_len..];
600        }
601
602        let doc_len = reader.read_u32::<LittleEndian>()? as usize;
603        let doc_bytes = &reader[..doc_len];
604
605        deserialize_document(doc_bytes, schema).map(Some)
606    }
607
608    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<[u8]>> {
609        // Check cache (write lock for LRU promotion on hit)
610        {
611            if let Some(block) = self.cache.write().get(entry.first_doc_id) {
612                return Ok(block);
613            }
614        }
615
616        // Load from FileSlice
617        let start = entry.offset;
618        let end = start + entry.length as u64;
619        let compressed = self.data_slice.read_bytes_range(start..end).await?;
620
621        // Use dictionary decompression if available
622        let decompressed = if let Some(ref dict) = self.dict {
623            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
624        } else {
625            crate::compression::decompress(compressed.as_slice())?
626        };
627
628        let block: Arc<[u8]> = Arc::from(decompressed);
629
630        // Insert into cache
631        {
632            let mut cache = self.cache.write();
633            cache.insert(entry.first_doc_id, Arc::clone(&block));
634        }
635
636        Ok(block)
637    }
638}
639
640pub fn deserialize_document(data: &[u8], _schema: &Schema) -> io::Result<Document> {
641    use crate::dsl::Field;
642
643    let mut reader = data;
644    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
645    let mut doc = Document::new();
646
647    for _ in 0..num_fields {
648        let field_id = reader.read_u16::<LittleEndian>()?;
649        let field = Field(field_id as u32);
650        let type_tag = reader.read_u8()?;
651
652        match type_tag {
653            0 => {
654                // Text
655                let len = reader.read_u32::<LittleEndian>()? as usize;
656                let s = std::str::from_utf8(&reader[..len])
657                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
658                doc.add_text(field, s);
659                reader = &reader[len..];
660            }
661            1 => {
662                // U64
663                doc.add_u64(field, reader.read_u64::<LittleEndian>()?);
664            }
665            2 => {
666                // I64
667                doc.add_i64(field, reader.read_i64::<LittleEndian>()?);
668            }
669            3 => {
670                // F64
671                doc.add_f64(field, reader.read_f64::<LittleEndian>()?);
672            }
673            4 => {
674                // Bytes
675                let len = reader.read_u32::<LittleEndian>()? as usize;
676                doc.add_bytes(field, reader[..len].to_vec());
677                reader = &reader[len..];
678            }
679            5 => {
680                // SparseVector
681                let count = reader.read_u32::<LittleEndian>()? as usize;
682                let mut entries = Vec::with_capacity(count);
683                for _ in 0..count {
684                    let idx = reader.read_u32::<LittleEndian>()?;
685                    let val = reader.read_f32::<LittleEndian>()?;
686                    entries.push((idx, val));
687                }
688                doc.add_sparse_vector(field, entries);
689            }
690            6 => {
691                // DenseVector
692                let count = reader.read_u32::<LittleEndian>()? as usize;
693                let byte_len = count * 4;
694                if reader.len() < byte_len {
695                    return Err(io::Error::new(
696                        io::ErrorKind::UnexpectedEof,
697                        format!(
698                            "dense vector field {}: need {} bytes but only {} remain",
699                            field.0,
700                            byte_len,
701                            reader.len()
702                        ),
703                    ));
704                }
705                let mut values = vec![0.0f32; count];
706                // Read raw f32 bytes directly
707                unsafe {
708                    std::ptr::copy_nonoverlapping(
709                        reader.as_ptr(),
710                        values.as_mut_ptr() as *mut u8,
711                        byte_len,
712                    );
713                }
714                reader = &reader[byte_len..];
715                doc.add_dense_vector(field, values);
716            }
717            7 => {
718                // Json
719                let len = reader.read_u32::<LittleEndian>()? as usize;
720                let v: serde_json::Value = serde_json::from_slice(&reader[..len])
721                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
722                doc.add_json(field, v);
723                reader = &reader[len..];
724            }
725            _ => {
726                return Err(io::Error::new(
727                    io::ErrorKind::InvalidData,
728                    format!("Unknown field type tag: {}", type_tag),
729                ));
730            }
731        }
732    }
733
734    Ok(doc)
735}
736
737/// Raw block info for store merging (without decompression)
738#[derive(Debug, Clone)]
739pub struct RawStoreBlock {
740    pub first_doc_id: DocId,
741    pub num_docs: u32,
742    pub offset: u64,
743    pub length: u32,
744}
745
746/// Store merger - concatenates compressed blocks from multiple stores without recompression
747///
748/// This is much faster than rebuilding stores since it avoids:
749/// - Decompressing blocks from source stores
750/// - Re-serializing documents
751/// - Re-compressing blocks at level 22
752///
753/// Limitations:
754/// - All source stores must NOT use dictionaries (or use the same dictionary)
755/// - Doc IDs are remapped sequentially
756pub struct StoreMerger<'a, W: Write> {
757    writer: &'a mut W,
758    index: Vec<StoreBlockIndex>,
759    current_offset: u64,
760    next_doc_id: DocId,
761}
762
763impl<'a, W: Write> StoreMerger<'a, W> {
764    pub fn new(writer: &'a mut W) -> Self {
765        Self {
766            writer,
767            index: Vec::new(),
768            current_offset: 0,
769            next_doc_id: 0,
770        }
771    }
772
773    /// Append raw compressed blocks from a store file
774    ///
775    /// `data_slice` should be the data portion of the store (before index/footer)
776    /// `blocks` contains the block metadata from the source store
777    pub async fn append_store<F: AsyncFileRead>(
778        &mut self,
779        data_slice: &F,
780        blocks: &[RawStoreBlock],
781    ) -> io::Result<()> {
782        for block in blocks {
783            // Read raw compressed block data
784            let start = block.offset;
785            let end = start + block.length as u64;
786            let compressed_data = data_slice.read_bytes_range(start..end).await?;
787
788            // Write to output
789            self.writer.write_all(compressed_data.as_slice())?;
790
791            // Add to index with remapped doc IDs
792            self.index.push(StoreBlockIndex {
793                first_doc_id: self.next_doc_id,
794                offset: self.current_offset,
795                length: block.length,
796                num_docs: block.num_docs,
797            });
798
799            self.current_offset += block.length as u64;
800            self.next_doc_id += block.num_docs;
801        }
802
803        Ok(())
804    }
805
806    /// Append blocks from a dict-compressed store by decompressing and recompressing.
807    ///
808    /// For stores that use dictionary compression, raw blocks can't be stacked
809    /// directly because the decompressor needs the original dictionary.
810    /// This method decompresses each block with the source dict, then
811    /// recompresses without a dictionary so the merged output is self-contained.
812    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
813        let dict = store.dict();
814        let data_slice = store.data_slice();
815        let blocks = store.block_index();
816
817        for block in blocks {
818            let start = block.offset;
819            let end = start + block.length as u64;
820            let compressed = data_slice.read_bytes_range(start..end).await?;
821
822            // Decompress with source dict (or without if no dict)
823            let decompressed = if let Some(d) = dict {
824                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
825            } else {
826                crate::compression::decompress(compressed.as_slice())?
827            };
828
829            // Recompress without dictionary
830            let recompressed = crate::compression::compress(
831                &decompressed,
832                crate::compression::CompressionLevel::default(),
833            )?;
834
835            self.writer.write_all(&recompressed)?;
836
837            self.index.push(StoreBlockIndex {
838                first_doc_id: self.next_doc_id,
839                offset: self.current_offset,
840                length: recompressed.len() as u32,
841                num_docs: block.num_docs,
842            });
843
844            self.current_offset += recompressed.len() as u64;
845            self.next_doc_id += block.num_docs;
846        }
847
848        Ok(())
849    }
850
851    /// Finish writing the merged store
852    pub fn finish(self) -> io::Result<u32> {
853        let data_end_offset = self.current_offset;
854
855        // No dictionary support for merged stores (would need same dict across all sources)
856        let dict_offset = 0u64;
857
858        // Write index + footer
859        write_store_index_and_footer(
860            self.writer,
861            &self.index,
862            data_end_offset,
863            dict_offset,
864            self.next_doc_id,
865            false,
866        )?;
867
868        Ok(self.next_doc_id)
869    }
870}
871
872impl AsyncStoreReader {
873    /// Get raw block metadata for merging (without loading block data)
874    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
875        self.index
876            .iter()
877            .map(|entry| RawStoreBlock {
878                first_doc_id: entry.first_doc_id,
879                num_docs: entry.num_docs,
880                offset: entry.offset,
881                length: entry.length,
882            })
883            .collect()
884    }
885
886    /// Get the data slice for raw block access
887    pub fn data_slice(&self) -> &LazyFileSlice {
888        &self.data_slice
889    }
890
891    /// Check if this store uses a dictionary (incompatible with raw merging)
892    pub fn has_dict(&self) -> bool {
893        self.dict.is_some()
894    }
895
896    /// Get the decompression dictionary (if any)
897    pub fn dict(&self) -> Option<&CompressionDict> {
898        self.dict.as_ref()
899    }
900
901    /// Get block index for iteration
902    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
903        &self.index
904    }
905}