Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (256KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::FileHandle;
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (256KB for better compression)
29/// Larger blocks = better compression ratio but more memory per block load
30pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32/// Default dictionary size (4KB is a good balance)
33pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35/// Default compression level for document store
36#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
38
39/// Write block index + footer to a store file.
40///
41/// Shared by `EagerParallelStoreWriter::finish` and `StoreMerger::finish`.
42fn write_store_index_and_footer(
43    writer: &mut (impl Write + ?Sized),
44    index: &[StoreBlockIndex],
45    data_end_offset: u64,
46    dict_offset: u64,
47    num_docs: u32,
48    has_dict: bool,
49) -> io::Result<()> {
50    writer.write_u32::<LittleEndian>(index.len() as u32)?;
51    for entry in index {
52        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
53        writer.write_u64::<LittleEndian>(entry.offset)?;
54        writer.write_u32::<LittleEndian>(entry.length)?;
55        writer.write_u32::<LittleEndian>(entry.num_docs)?;
56    }
57    writer.write_u64::<LittleEndian>(data_end_offset)?;
58    writer.write_u64::<LittleEndian>(dict_offset)?;
59    writer.write_u32::<LittleEndian>(num_docs)?;
60    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
61    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
62    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
63    Ok(())
64}
65
66/// Binary document format:
67///   num_fields: u16
68///   per field: field_id: u16, type_tag: u8, value data
69///     0=Text:         len:u32 + utf8
70///     1=U64:          u64 LE
71///     2=I64:          i64 LE
72///     3=F64:          f64 LE
73///     4=Bytes:        len:u32 + raw
74///     5=SparseVector: count:u32 + count*(u32+f32)
75///     6=DenseVector:  count:u32 + count*f32
76///     7=Json:         len:u32 + json utf8
77pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78    let mut buf = Vec::with_capacity(256);
79    serialize_document_into(doc, schema, &mut buf)?;
80    Ok(buf)
81}
82
83/// Serialize a document into a reusable buffer (clears it first).
84/// Avoids per-document allocation when called in a loop.
85pub fn serialize_document_into(
86    doc: &Document,
87    schema: &Schema,
88    buf: &mut Vec<u8>,
89) -> io::Result<()> {
90    use crate::dsl::FieldValue;
91
92    buf.clear();
93
94    // Two-pass approach avoids allocating a Vec just to count + iterate stored fields.
95    let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
96        // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
97        if matches!(value, FieldValue::DenseVector(_)) {
98            return false;
99        }
100        schema.get_field_entry(*field).is_some_and(|e| e.stored)
101    };
102
103    let stored_count = doc
104        .field_values()
105        .iter()
106        .filter(|(field, value)| is_stored(field, value))
107        .count();
108
109    buf.write_u16::<LittleEndian>(stored_count as u16)?;
110
111    for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
112        buf.write_u16::<LittleEndian>(field.0 as u16)?;
113        match value {
114            FieldValue::Text(s) => {
115                buf.push(0);
116                let bytes = s.as_bytes();
117                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
118                buf.extend_from_slice(bytes);
119            }
120            FieldValue::U64(v) => {
121                buf.push(1);
122                buf.write_u64::<LittleEndian>(*v)?;
123            }
124            FieldValue::I64(v) => {
125                buf.push(2);
126                buf.write_i64::<LittleEndian>(*v)?;
127            }
128            FieldValue::F64(v) => {
129                buf.push(3);
130                buf.write_f64::<LittleEndian>(*v)?;
131            }
132            FieldValue::Bytes(b) => {
133                buf.push(4);
134                buf.write_u32::<LittleEndian>(b.len() as u32)?;
135                buf.extend_from_slice(b);
136            }
137            FieldValue::SparseVector(entries) => {
138                buf.push(5);
139                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
140                for (idx, val) in entries {
141                    buf.write_u32::<LittleEndian>(*idx)?;
142                    buf.write_f32::<LittleEndian>(*val)?;
143                }
144            }
145            FieldValue::DenseVector(values) => {
146                buf.push(6);
147                buf.write_u32::<LittleEndian>(values.len() as u32)?;
148                // Write raw f32 bytes directly
149                let byte_slice = unsafe {
150                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
151                };
152                buf.extend_from_slice(byte_slice);
153            }
154            FieldValue::Json(v) => {
155                buf.push(7);
156                let json_bytes = serde_json::to_vec(v)
157                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
158                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
159                buf.extend_from_slice(&json_bytes);
160            }
161        }
162    }
163
164    Ok(())
165}
166
167/// Compressed block result
168#[cfg(feature = "native")]
169struct CompressedBlock {
170    seq: usize,
171    first_doc_id: DocId,
172    num_docs: u32,
173    compressed: Vec<u8>,
174}
175
176/// Parallel document store writer - compresses blocks immediately when queued
177///
178/// Spawns compression tasks as soon as blocks are ready, overlapping document
179/// ingestion with compression to reduce total indexing time.
180///
181/// Uses background threads to compress blocks while the main thread continues
182/// accepting documents.
183#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185    writer: &'a mut dyn Write,
186    block_buffer: Vec<u8>,
187    /// Reusable buffer for document serialization (avoids per-doc allocation)
188    serialize_buf: Vec<u8>,
189    /// Compressed blocks ready to be written (may arrive out of order)
190    compressed_blocks: Vec<CompressedBlock>,
191    /// Handles for in-flight compression tasks
192    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
193    next_seq: usize,
194    next_doc_id: DocId,
195    block_first_doc: DocId,
196    dict: Option<Arc<CompressionDict>>,
197    compression_level: CompressionLevel,
198}
199
200#[cfg(feature = "native")]
201impl<'a> EagerParallelStoreWriter<'a> {
202    /// Create a new eager parallel store writer
203    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
204        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
205    }
206
207    /// Create with specific compression level
208    pub fn with_compression_level(
209        writer: &'a mut dyn Write,
210        _num_threads: usize,
211        compression_level: CompressionLevel,
212    ) -> Self {
213        Self {
214            writer,
215            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
216            serialize_buf: Vec::with_capacity(512),
217            compressed_blocks: Vec::new(),
218            pending_handles: Vec::new(),
219            next_seq: 0,
220            next_doc_id: 0,
221            block_first_doc: 0,
222            dict: None,
223            compression_level,
224        }
225    }
226
227    /// Create with dictionary
228    pub fn with_dict(
229        writer: &'a mut dyn Write,
230        dict: CompressionDict,
231        _num_threads: usize,
232    ) -> Self {
233        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
234    }
235
236    /// Create with dictionary and specific compression level
237    pub fn with_dict_and_level(
238        writer: &'a mut dyn Write,
239        dict: CompressionDict,
240        _num_threads: usize,
241        compression_level: CompressionLevel,
242    ) -> Self {
243        Self {
244            writer,
245            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
246            serialize_buf: Vec::with_capacity(512),
247            compressed_blocks: Vec::new(),
248            pending_handles: Vec::new(),
249            next_seq: 0,
250            next_doc_id: 0,
251            block_first_doc: 0,
252            dict: Some(Arc::new(dict)),
253            compression_level,
254        }
255    }
256
257    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
258        serialize_document_into(doc, schema, &mut self.serialize_buf)?;
259        let doc_id = self.next_doc_id;
260        self.next_doc_id += 1;
261        self.block_buffer
262            .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
263        self.block_buffer.extend_from_slice(&self.serialize_buf);
264        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
265            self.spawn_compression();
266        }
267        Ok(doc_id)
268    }
269
270    /// Store pre-serialized document bytes directly (avoids deserialize+reserialize roundtrip).
271    pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
272        let doc_id = self.next_doc_id;
273        self.next_doc_id += 1;
274
275        self.block_buffer
276            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
277        self.block_buffer.extend_from_slice(doc_bytes);
278
279        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
280            self.spawn_compression();
281        }
282
283        Ok(doc_id)
284    }
285
286    /// Spawn compression for the current block immediately
287    fn spawn_compression(&mut self) {
288        if self.block_buffer.is_empty() {
289            return;
290        }
291
292        let num_docs = self.next_doc_id - self.block_first_doc;
293        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
294        let seq = self.next_seq;
295        let first_doc_id = self.block_first_doc;
296        let dict = self.dict.clone();
297
298        self.next_seq += 1;
299        self.block_first_doc = self.next_doc_id;
300
301        // Spawn compression task using thread
302        let level = self.compression_level;
303        let handle = std::thread::spawn(move || {
304            let compressed = if let Some(ref d) = dict {
305                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
306            } else {
307                crate::compression::compress(&data, level).expect("compression failed")
308            };
309
310            CompressedBlock {
311                seq,
312                first_doc_id,
313                num_docs,
314                compressed,
315            }
316        });
317
318        self.pending_handles.push(handle);
319    }
320
321    /// Collect any completed compression tasks
322    fn collect_completed(&mut self) {
323        let mut remaining = Vec::new();
324        for handle in self.pending_handles.drain(..) {
325            if handle.is_finished() {
326                match handle.join() {
327                    Ok(block) => self.compressed_blocks.push(block),
328                    Err(payload) => std::panic::resume_unwind(payload),
329                }
330            } else {
331                remaining.push(handle);
332            }
333        }
334        self.pending_handles = remaining;
335    }
336
337    pub fn finish(mut self) -> io::Result<u32> {
338        // Spawn compression for any remaining data
339        self.spawn_compression();
340
341        // Collect any already-completed tasks
342        self.collect_completed();
343
344        // Wait for all remaining compression tasks
345        for handle in self.pending_handles.drain(..) {
346            match handle.join() {
347                Ok(block) => self.compressed_blocks.push(block),
348                Err(payload) => std::panic::resume_unwind(payload),
349            }
350        }
351
352        if self.compressed_blocks.is_empty() {
353            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
354            return Ok(0);
355        }
356
357        // Sort by sequence to maintain order
358        self.compressed_blocks.sort_by_key(|b| b.seq);
359
360        // Write blocks in order and build index
361        let mut index = Vec::with_capacity(self.compressed_blocks.len());
362        let mut current_offset = 0u64;
363
364        for block in &self.compressed_blocks {
365            index.push(StoreBlockIndex {
366                first_doc_id: block.first_doc_id,
367                offset: current_offset,
368                length: block.compressed.len() as u32,
369                num_docs: block.num_docs,
370            });
371
372            self.writer.write_all(&block.compressed)?;
373            current_offset += block.compressed.len() as u64;
374        }
375
376        let data_end_offset = current_offset;
377
378        // Write dictionary if present
379        let dict_offset = if let Some(ref dict) = self.dict {
380            let offset = current_offset;
381            let dict_bytes = dict.as_bytes();
382            self.writer
383                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
384            self.writer.write_all(dict_bytes)?;
385            Some(offset)
386        } else {
387            None
388        };
389
390        // Write index + footer
391        write_store_index_and_footer(
392            &mut self.writer,
393            &index,
394            data_end_offset,
395            dict_offset.unwrap_or(0),
396            self.next_doc_id,
397            self.dict.is_some(),
398        )?;
399
400        Ok(self.next_doc_id)
401    }
402}
403
404/// Block index entry for document store
405#[derive(Debug, Clone)]
406pub(crate) struct StoreBlockIndex {
407    pub(crate) first_doc_id: DocId,
408    pub(crate) offset: u64,
409    pub(crate) length: u32,
410    pub(crate) num_docs: u32,
411}
412
413/// Async document store reader - loads blocks on demand
414pub struct AsyncStoreReader {
415    /// FileHandle for the data portion - fetches ranges on demand
416    data_slice: FileHandle,
417    /// Block index
418    index: Vec<StoreBlockIndex>,
419    num_docs: u32,
420    /// Optional compression dictionary
421    dict: Option<CompressionDict>,
422    /// Block cache
423    cache: RwLock<StoreBlockCache>,
424}
425
426/// Decompressed block with pre-built doc offset table.
427///
428/// The offset table is built once on decompression: `offsets[i]` is the byte
429/// position in `data` where doc `i`'s length prefix starts. This turns the
430/// O(n) linear scan per `get()` into O(1) direct indexing.
431struct CachedBlock {
432    data: Vec<u8>,
433    /// Byte offset of each doc's length prefix within `data`.
434    /// `offsets.len()` == number of docs in the block.
435    offsets: Vec<u32>,
436}
437
438impl CachedBlock {
439    fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
440        let mut offsets = Vec::with_capacity(num_docs as usize);
441        let mut pos = 0usize;
442        for _ in 0..num_docs {
443            if pos + 4 > data.len() {
444                return Err(io::Error::new(
445                    io::ErrorKind::InvalidData,
446                    "truncated block while building offset table",
447                ));
448            }
449            offsets.push(pos as u32);
450            let doc_len =
451                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
452                    as usize;
453            pos += 4 + doc_len;
454        }
455        Ok(Self { data, offsets })
456    }
457
458    /// Get doc bytes by index within the block (O(1))
459    fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
460        let idx = doc_offset_in_block as usize;
461        if idx >= self.offsets.len() {
462            return Err(io::Error::new(
463                io::ErrorKind::InvalidData,
464                "doc offset out of range",
465            ));
466        }
467        let start = self.offsets[idx] as usize;
468        if start + 4 > self.data.len() {
469            return Err(io::Error::new(
470                io::ErrorKind::InvalidData,
471                "truncated doc length",
472            ));
473        }
474        let doc_len = u32::from_le_bytes([
475            self.data[start],
476            self.data[start + 1],
477            self.data[start + 2],
478            self.data[start + 3],
479        ]) as usize;
480        let data_start = start + 4;
481        if data_start + doc_len > self.data.len() {
482            return Err(io::Error::new(
483                io::ErrorKind::InvalidData,
484                "doc data overflow",
485            ));
486        }
487        Ok(&self.data[data_start..data_start + doc_len])
488    }
489}
490
491/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
492///
493/// On `get()`, promotes accessed entry to MRU position.
494/// For typical cache sizes (16-64 blocks), the linear promote scan is negligible.
495struct StoreBlockCache {
496    blocks: FxHashMap<DocId, Arc<CachedBlock>>,
497    lru_order: std::collections::VecDeque<DocId>,
498    max_blocks: usize,
499}
500
501impl StoreBlockCache {
502    fn new(max_blocks: usize) -> Self {
503        Self {
504            blocks: FxHashMap::default(),
505            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
506            max_blocks,
507        }
508    }
509
510    /// Check cache without LRU promotion (safe for read-lock fast path)
511    fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
512        self.blocks.get(&first_doc_id).map(Arc::clone)
513    }
514
515    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
516        let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
517        self.promote(first_doc_id);
518        Some(block)
519    }
520
521    fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
522        if self.blocks.contains_key(&first_doc_id) {
523            self.promote(first_doc_id);
524            return;
525        }
526        while self.blocks.len() >= self.max_blocks {
527            if let Some(evict) = self.lru_order.pop_front() {
528                self.blocks.remove(&evict);
529            } else {
530                break;
531            }
532        }
533        self.blocks.insert(first_doc_id, block);
534        self.lru_order.push_back(first_doc_id);
535    }
536
537    fn promote(&mut self, key: DocId) {
538        if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
539            self.lru_order.remove(pos);
540            self.lru_order.push_back(key);
541        }
542    }
543}
544
545impl AsyncStoreReader {
546    /// Open a document store from FileHandle
547    /// Only loads footer and index into memory, data blocks are fetched on-demand
548    pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
549        let file_len = file_handle.len();
550        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
551        if file_len < 32 {
552            return Err(io::Error::new(
553                io::ErrorKind::InvalidData,
554                "Store too small",
555            ));
556        }
557
558        // Read footer (32 bytes)
559        let footer = file_handle
560            .read_bytes_range(file_len - 32..file_len)
561            .await?;
562        let mut reader = footer.as_slice();
563        let data_end_offset = reader.read_u64::<LittleEndian>()?;
564        let dict_offset = reader.read_u64::<LittleEndian>()?;
565        let num_docs = reader.read_u32::<LittleEndian>()?;
566        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
567        let version = reader.read_u32::<LittleEndian>()?;
568        let magic = reader.read_u32::<LittleEndian>()?;
569
570        if magic != STORE_MAGIC {
571            return Err(io::Error::new(
572                io::ErrorKind::InvalidData,
573                "Invalid store magic",
574            ));
575        }
576        if version != STORE_VERSION {
577            return Err(io::Error::new(
578                io::ErrorKind::InvalidData,
579                format!("Unsupported store version: {}", version),
580            ));
581        }
582
583        // Load dictionary if present, and compute index_start in one pass
584        let (dict, index_start) = if has_dict && dict_offset > 0 {
585            let dict_start = dict_offset;
586            let dict_len_bytes = file_handle
587                .read_bytes_range(dict_start..dict_start + 4)
588                .await?;
589            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
590            let dict_bytes = file_handle
591                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
592                .await?;
593            let idx_start = dict_start + 4 + dict_len;
594            (
595                Some(CompressionDict::from_owned_bytes(dict_bytes)),
596                idx_start,
597            )
598        } else {
599            (None, data_end_offset)
600        };
601        let index_end = file_len - 32;
602
603        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
604        let mut reader = index_bytes.as_slice();
605
606        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
607        let mut index = Vec::with_capacity(num_blocks);
608
609        for _ in 0..num_blocks {
610            let first_doc_id = reader.read_u32::<LittleEndian>()?;
611            let offset = reader.read_u64::<LittleEndian>()?;
612            let length = reader.read_u32::<LittleEndian>()?;
613            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
614
615            index.push(StoreBlockIndex {
616                first_doc_id,
617                offset,
618                length,
619                num_docs: num_docs_in_block,
620            });
621        }
622
623        // Create lazy slice for data portion only
624        let data_slice = file_handle.slice(0..data_end_offset);
625
626        Ok(Self {
627            data_slice,
628            index,
629            num_docs,
630            dict,
631            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
632        })
633    }
634
635    /// Number of documents
636    pub fn num_docs(&self) -> u32 {
637        self.num_docs
638    }
639
640    /// Number of blocks currently in the cache
641    pub fn cached_blocks(&self) -> usize {
642        self.cache.read().blocks.len()
643    }
644
645    /// Get a document by doc_id (async - may load block)
646    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
647        if doc_id >= self.num_docs {
648            return Ok(None);
649        }
650
651        let (entry, block) = self.find_and_load_block(doc_id).await?;
652        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
653        deserialize_document(doc_bytes, schema).map(Some)
654    }
655
656    /// Get specific fields of a document by doc_id (async - may load block)
657    ///
658    /// Only deserializes the requested fields, skipping over unwanted data.
659    /// Much faster than `get()` when documents have large fields (text bodies,
660    /// vectors) that aren't needed for the response.
661    pub async fn get_fields(
662        &self,
663        doc_id: DocId,
664        schema: &Schema,
665        field_ids: &[u32],
666    ) -> io::Result<Option<Document>> {
667        if doc_id >= self.num_docs {
668            return Ok(None);
669        }
670
671        let (entry, block) = self.find_and_load_block(doc_id).await?;
672        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
673        deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
674    }
675
676    /// Find the block index entry and load/cache the block for a given doc_id
677    async fn find_and_load_block(
678        &self,
679        doc_id: DocId,
680    ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
681        let block_idx = self
682            .index
683            .binary_search_by(|entry| {
684                if doc_id < entry.first_doc_id {
685                    std::cmp::Ordering::Greater
686                } else if doc_id >= entry.first_doc_id + entry.num_docs {
687                    std::cmp::Ordering::Less
688                } else {
689                    std::cmp::Ordering::Equal
690                }
691            })
692            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
693
694        let entry = &self.index[block_idx];
695        let block = self.load_block(entry).await?;
696        Ok((entry, block))
697    }
698
699    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
700        // Fast path: read lock to check cache (allows concurrent readers)
701        {
702            let cache = self.cache.read();
703            if let Some(block) = cache.peek(entry.first_doc_id) {
704                return Ok(block);
705            }
706        }
707        // Slow path: write lock for LRU promotion or insert
708        {
709            if let Some(block) = self.cache.write().get(entry.first_doc_id) {
710                return Ok(block);
711            }
712        }
713
714        // Load from FileSlice
715        let start = entry.offset;
716        let end = start + entry.length as u64;
717        let compressed = self.data_slice.read_bytes_range(start..end).await?;
718
719        // Use dictionary decompression if available
720        let decompressed = if let Some(ref dict) = self.dict {
721            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
722        } else {
723            crate::compression::decompress(compressed.as_slice())?
724        };
725
726        // Build offset table for O(1) doc lookup within the block
727        let cached = CachedBlock::build(decompressed, entry.num_docs)?;
728        let block = Arc::new(cached);
729
730        // Insert into cache
731        {
732            let mut cache = self.cache.write();
733            cache.insert(entry.first_doc_id, Arc::clone(&block));
734        }
735
736        Ok(block)
737    }
738}
739
740/// Deserialize only specific fields from document bytes.
741///
742/// Skips over unwanted fields without allocating their values — just advances
743/// the reader past their length-prefixed data. For large documents with many
744/// fields (e.g., full text body), this avoids allocating/copying data that
745/// the caller doesn't need.
746pub fn deserialize_document_fields(
747    data: &[u8],
748    schema: &Schema,
749    field_ids: &[u32],
750) -> io::Result<Document> {
751    deserialize_document_inner(data, schema, Some(field_ids))
752}
753
754/// Deserialize all fields from document bytes.
755///
756/// Delegates to the shared field-parsing core with no field filter.
757pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
758    deserialize_document_inner(data, schema, None)
759}
760
761/// Shared deserialization core. `field_filter = None` means all fields wanted.
762fn deserialize_document_inner(
763    data: &[u8],
764    _schema: &Schema,
765    field_filter: Option<&[u32]>,
766) -> io::Result<Document> {
767    use crate::dsl::Field;
768
769    let mut reader = data;
770    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
771    let mut doc = Document::new();
772
773    for _ in 0..num_fields {
774        let field_id = reader.read_u16::<LittleEndian>()?;
775        let type_tag = reader.read_u8()?;
776
777        let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
778
779        match type_tag {
780            0 => {
781                // Text
782                let len = reader.read_u32::<LittleEndian>()? as usize;
783                if wanted {
784                    let s = std::str::from_utf8(&reader[..len])
785                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
786                    doc.add_text(Field(field_id as u32), s);
787                }
788                reader = &reader[len..];
789            }
790            1 => {
791                // U64
792                let v = reader.read_u64::<LittleEndian>()?;
793                if wanted {
794                    doc.add_u64(Field(field_id as u32), v);
795                }
796            }
797            2 => {
798                // I64
799                let v = reader.read_i64::<LittleEndian>()?;
800                if wanted {
801                    doc.add_i64(Field(field_id as u32), v);
802                }
803            }
804            3 => {
805                // F64
806                let v = reader.read_f64::<LittleEndian>()?;
807                if wanted {
808                    doc.add_f64(Field(field_id as u32), v);
809                }
810            }
811            4 => {
812                // Bytes
813                let len = reader.read_u32::<LittleEndian>()? as usize;
814                if wanted {
815                    doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
816                }
817                reader = &reader[len..];
818            }
819            5 => {
820                // SparseVector
821                let count = reader.read_u32::<LittleEndian>()? as usize;
822                if wanted {
823                    let mut entries = Vec::with_capacity(count);
824                    for _ in 0..count {
825                        let idx = reader.read_u32::<LittleEndian>()?;
826                        let val = reader.read_f32::<LittleEndian>()?;
827                        entries.push((idx, val));
828                    }
829                    doc.add_sparse_vector(Field(field_id as u32), entries);
830                } else {
831                    let skip = count * 8;
832                    if skip > reader.len() {
833                        return Err(io::Error::new(
834                            io::ErrorKind::UnexpectedEof,
835                            "sparse vector skip overflow",
836                        ));
837                    }
838                    reader = &reader[skip..];
839                }
840            }
841            6 => {
842                // DenseVector
843                let count = reader.read_u32::<LittleEndian>()? as usize;
844                let byte_len = count * 4;
845                if byte_len > reader.len() {
846                    return Err(io::Error::new(
847                        io::ErrorKind::UnexpectedEof,
848                        "dense vector truncated",
849                    ));
850                }
851                if wanted {
852                    let mut values = vec![0.0f32; count];
853                    unsafe {
854                        std::ptr::copy_nonoverlapping(
855                            reader.as_ptr(),
856                            values.as_mut_ptr() as *mut u8,
857                            byte_len,
858                        );
859                    }
860                    doc.add_dense_vector(Field(field_id as u32), values);
861                }
862                reader = &reader[byte_len..];
863            }
864            7 => {
865                // Json
866                let len = reader.read_u32::<LittleEndian>()? as usize;
867                if len > reader.len() {
868                    return Err(io::Error::new(
869                        io::ErrorKind::UnexpectedEof,
870                        "json field truncated",
871                    ));
872                }
873                if wanted {
874                    let v: serde_json::Value = serde_json::from_slice(&reader[..len])
875                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
876                    doc.add_json(Field(field_id as u32), v);
877                }
878                reader = &reader[len..];
879            }
880            _ => {
881                return Err(io::Error::new(
882                    io::ErrorKind::InvalidData,
883                    format!("Unknown field type tag: {}", type_tag),
884                ));
885            }
886        }
887    }
888
889    Ok(doc)
890}
891
892/// Raw block info for store merging (without decompression)
893#[derive(Debug, Clone)]
894pub struct RawStoreBlock {
895    pub first_doc_id: DocId,
896    pub num_docs: u32,
897    pub offset: u64,
898    pub length: u32,
899}
900
901/// Store merger - concatenates compressed blocks from multiple stores without recompression
902///
903/// This is much faster than rebuilding stores since it avoids:
904/// - Decompressing blocks from source stores
905/// - Re-serializing documents
906/// - Re-compressing blocks at level 22
907///
908/// Limitations:
909/// - All source stores must NOT use dictionaries (or use the same dictionary)
910/// - Doc IDs are remapped sequentially
911pub struct StoreMerger<'a, W: Write> {
912    writer: &'a mut W,
913    index: Vec<StoreBlockIndex>,
914    current_offset: u64,
915    next_doc_id: DocId,
916}
917
918impl<'a, W: Write> StoreMerger<'a, W> {
919    pub fn new(writer: &'a mut W) -> Self {
920        Self {
921            writer,
922            index: Vec::new(),
923            current_offset: 0,
924            next_doc_id: 0,
925        }
926    }
927
928    /// Append raw compressed blocks from a store file
929    ///
930    /// `data_slice` should be the data portion of the store (before index/footer)
931    /// `blocks` contains the block metadata from the source store
932    pub async fn append_store(
933        &mut self,
934        data_slice: &FileHandle,
935        blocks: &[RawStoreBlock],
936    ) -> io::Result<()> {
937        for block in blocks {
938            // Read raw compressed block data
939            let start = block.offset;
940            let end = start + block.length as u64;
941            let compressed_data = data_slice.read_bytes_range(start..end).await?;
942
943            // Write to output
944            self.writer.write_all(compressed_data.as_slice())?;
945
946            // Add to index with remapped doc IDs
947            self.index.push(StoreBlockIndex {
948                first_doc_id: self.next_doc_id,
949                offset: self.current_offset,
950                length: block.length,
951                num_docs: block.num_docs,
952            });
953
954            self.current_offset += block.length as u64;
955            self.next_doc_id += block.num_docs;
956        }
957
958        Ok(())
959    }
960
961    /// Append blocks from a dict-compressed store by decompressing and recompressing.
962    ///
963    /// For stores that use dictionary compression, raw blocks can't be stacked
964    /// directly because the decompressor needs the original dictionary.
965    /// This method decompresses each block with the source dict, then
966    /// recompresses without a dictionary so the merged output is self-contained.
967    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
968        let dict = store.dict();
969        let data_slice = store.data_slice();
970        let blocks = store.block_index();
971
972        for block in blocks {
973            let start = block.offset;
974            let end = start + block.length as u64;
975            let compressed = data_slice.read_bytes_range(start..end).await?;
976
977            // Decompress with source dict (or without if no dict)
978            let decompressed = if let Some(d) = dict {
979                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
980            } else {
981                crate::compression::decompress(compressed.as_slice())?
982            };
983
984            // Recompress without dictionary
985            let recompressed = crate::compression::compress(
986                &decompressed,
987                crate::compression::CompressionLevel::default(),
988            )?;
989
990            self.writer.write_all(&recompressed)?;
991
992            self.index.push(StoreBlockIndex {
993                first_doc_id: self.next_doc_id,
994                offset: self.current_offset,
995                length: recompressed.len() as u32,
996                num_docs: block.num_docs,
997            });
998
999            self.current_offset += recompressed.len() as u64;
1000            self.next_doc_id += block.num_docs;
1001        }
1002
1003        Ok(())
1004    }
1005
1006    /// Finish writing the merged store
1007    pub fn finish(self) -> io::Result<u32> {
1008        let data_end_offset = self.current_offset;
1009
1010        // No dictionary support for merged stores (would need same dict across all sources)
1011        let dict_offset = 0u64;
1012
1013        // Write index + footer
1014        write_store_index_and_footer(
1015            self.writer,
1016            &self.index,
1017            data_end_offset,
1018            dict_offset,
1019            self.next_doc_id,
1020            false,
1021        )?;
1022
1023        Ok(self.next_doc_id)
1024    }
1025}
1026
1027impl AsyncStoreReader {
1028    /// Get raw block metadata for merging (without loading block data)
1029    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1030        self.index
1031            .iter()
1032            .map(|entry| RawStoreBlock {
1033                first_doc_id: entry.first_doc_id,
1034                num_docs: entry.num_docs,
1035                offset: entry.offset,
1036                length: entry.length,
1037            })
1038            .collect()
1039    }
1040
1041    /// Get the data slice for raw block access
1042    pub fn data_slice(&self) -> &FileHandle {
1043        &self.data_slice
1044    }
1045
1046    /// Check if this store uses a dictionary (incompatible with raw merging)
1047    pub fn has_dict(&self) -> bool {
1048        self.dict.is_some()
1049    }
1050
1051    /// Get the decompression dictionary (if any)
1052    pub fn dict(&self) -> Option<&CompressionDict> {
1053        self.dict.as_ref()
1054    }
1055
1056    /// Get block index for iteration
1057    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1058        &self.index
1059    }
1060}