Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (256KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::FileHandle;
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (16KB).
29/// Smaller blocks reduce read amplification for single-doc fetches at the
30/// cost of slightly worse compression ratio. Zstd dictionary training
31/// recovers most of the compression loss.
32pub const STORE_BLOCK_SIZE: usize = 16 * 1024;
33
34/// Default dictionary size (4KB is a good balance)
35pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
36
37/// Default compression level for document store
38#[cfg(feature = "native")]
39const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
40
41/// Write block index + footer to a store file.
42///
43/// Shared by `EagerParallelStoreWriter::finish` and `StoreMerger::finish`.
44fn write_store_index_and_footer(
45    writer: &mut (impl Write + ?Sized),
46    index: &[StoreBlockIndex],
47    data_end_offset: u64,
48    dict_offset: u64,
49    num_docs: u32,
50    has_dict: bool,
51) -> io::Result<()> {
52    writer.write_u32::<LittleEndian>(index.len() as u32)?;
53    for entry in index {
54        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
55        writer.write_u64::<LittleEndian>(entry.offset)?;
56        writer.write_u32::<LittleEndian>(entry.length)?;
57        writer.write_u32::<LittleEndian>(entry.num_docs)?;
58    }
59    writer.write_u64::<LittleEndian>(data_end_offset)?;
60    writer.write_u64::<LittleEndian>(dict_offset)?;
61    writer.write_u32::<LittleEndian>(num_docs)?;
62    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
63    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
64    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
65    Ok(())
66}
67
68/// Binary document format:
69///   num_fields: u16
70///   per field: field_id: u16, type_tag: u8, value data
71///     0=Text:         len:u32 + utf8
72///     1=U64:          u64 LE
73///     2=I64:          i64 LE
74///     3=F64:          f64 LE
75///     4=Bytes:        len:u32 + raw
76///     5=SparseVector: count:u32 + count*(u32+f32)
77///     6=DenseVector:  count:u32 + count*f32
78///     7=Json:         len:u32 + json utf8
79pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
80    let mut buf = Vec::with_capacity(256);
81    serialize_document_into(doc, schema, &mut buf)?;
82    Ok(buf)
83}
84
85/// Serialize a document into a reusable buffer (clears it first).
86/// Avoids per-document allocation when called in a loop.
87pub fn serialize_document_into(
88    doc: &Document,
89    schema: &Schema,
90    buf: &mut Vec<u8>,
91) -> io::Result<()> {
92    use crate::dsl::FieldValue;
93
94    buf.clear();
95
96    // Two-pass approach avoids allocating a Vec just to count + iterate stored fields.
97    let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
98        // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
99        if matches!(value, FieldValue::DenseVector(_)) {
100            return false;
101        }
102        schema.get_field_entry(*field).is_some_and(|e| e.stored)
103    };
104
105    let stored_count = doc
106        .field_values()
107        .iter()
108        .filter(|(field, value)| is_stored(field, value))
109        .count();
110
111    buf.write_u16::<LittleEndian>(stored_count as u16)?;
112
113    for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
114        buf.write_u16::<LittleEndian>(field.0 as u16)?;
115        match value {
116            FieldValue::Text(s) => {
117                buf.push(0);
118                let bytes = s.as_bytes();
119                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
120                buf.extend_from_slice(bytes);
121            }
122            FieldValue::U64(v) => {
123                buf.push(1);
124                buf.write_u64::<LittleEndian>(*v)?;
125            }
126            FieldValue::I64(v) => {
127                buf.push(2);
128                buf.write_i64::<LittleEndian>(*v)?;
129            }
130            FieldValue::F64(v) => {
131                buf.push(3);
132                buf.write_f64::<LittleEndian>(*v)?;
133            }
134            FieldValue::Bytes(b) => {
135                buf.push(4);
136                buf.write_u32::<LittleEndian>(b.len() as u32)?;
137                buf.extend_from_slice(b);
138            }
139            FieldValue::SparseVector(entries) => {
140                buf.push(5);
141                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
142                for (idx, val) in entries {
143                    buf.write_u32::<LittleEndian>(*idx)?;
144                    buf.write_f32::<LittleEndian>(*val)?;
145                }
146            }
147            FieldValue::DenseVector(values) => {
148                buf.push(6);
149                buf.write_u32::<LittleEndian>(values.len() as u32)?;
150                // Write raw f32 bytes directly
151                let byte_slice = unsafe {
152                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
153                };
154                buf.extend_from_slice(byte_slice);
155            }
156            FieldValue::Json(v) => {
157                buf.push(7);
158                let json_bytes = serde_json::to_vec(v)
159                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
160                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
161                buf.extend_from_slice(&json_bytes);
162            }
163        }
164    }
165
166    Ok(())
167}
168
169/// Compressed block result
170#[cfg(feature = "native")]
171struct CompressedBlock {
172    seq: usize,
173    first_doc_id: DocId,
174    num_docs: u32,
175    compressed: Vec<u8>,
176}
177
178/// Parallel document store writer - compresses blocks immediately when queued
179///
180/// Spawns compression tasks as soon as blocks are ready, overlapping document
181/// ingestion with compression to reduce total indexing time.
182///
183/// Uses background threads to compress blocks while the main thread continues
184/// accepting documents.
185#[cfg(feature = "native")]
186pub struct EagerParallelStoreWriter<'a> {
187    writer: &'a mut dyn Write,
188    block_buffer: Vec<u8>,
189    /// Reusable buffer for document serialization (avoids per-doc allocation)
190    serialize_buf: Vec<u8>,
191    /// Compressed blocks ready to be written (may arrive out of order)
192    compressed_blocks: Vec<CompressedBlock>,
193    /// Handles for in-flight compression tasks
194    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
195    next_seq: usize,
196    next_doc_id: DocId,
197    block_first_doc: DocId,
198    dict: Option<Arc<CompressionDict>>,
199    compression_level: CompressionLevel,
200}
201
202#[cfg(feature = "native")]
203impl<'a> EagerParallelStoreWriter<'a> {
204    /// Create a new eager parallel store writer
205    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
206        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
207    }
208
209    /// Create with specific compression level
210    pub fn with_compression_level(
211        writer: &'a mut dyn Write,
212        _num_threads: usize,
213        compression_level: CompressionLevel,
214    ) -> Self {
215        Self {
216            writer,
217            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
218            serialize_buf: Vec::with_capacity(512),
219            compressed_blocks: Vec::new(),
220            pending_handles: Vec::new(),
221            next_seq: 0,
222            next_doc_id: 0,
223            block_first_doc: 0,
224            dict: None,
225            compression_level,
226        }
227    }
228
229    /// Create with dictionary
230    pub fn with_dict(
231        writer: &'a mut dyn Write,
232        dict: CompressionDict,
233        _num_threads: usize,
234    ) -> Self {
235        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
236    }
237
238    /// Create with dictionary and specific compression level
239    pub fn with_dict_and_level(
240        writer: &'a mut dyn Write,
241        dict: CompressionDict,
242        _num_threads: usize,
243        compression_level: CompressionLevel,
244    ) -> Self {
245        Self {
246            writer,
247            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
248            serialize_buf: Vec::with_capacity(512),
249            compressed_blocks: Vec::new(),
250            pending_handles: Vec::new(),
251            next_seq: 0,
252            next_doc_id: 0,
253            block_first_doc: 0,
254            dict: Some(Arc::new(dict)),
255            compression_level,
256        }
257    }
258
259    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
260        serialize_document_into(doc, schema, &mut self.serialize_buf)?;
261        let doc_id = self.next_doc_id;
262        self.next_doc_id += 1;
263        self.block_buffer
264            .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
265        self.block_buffer.extend_from_slice(&self.serialize_buf);
266        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
267            self.spawn_compression();
268        }
269        Ok(doc_id)
270    }
271
272    /// Store pre-serialized document bytes directly (avoids deserialize+reserialize roundtrip).
273    pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
274        let doc_id = self.next_doc_id;
275        self.next_doc_id += 1;
276
277        self.block_buffer
278            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
279        self.block_buffer.extend_from_slice(doc_bytes);
280
281        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
282            self.spawn_compression();
283        }
284
285        Ok(doc_id)
286    }
287
288    /// Spawn compression for the current block immediately
289    fn spawn_compression(&mut self) {
290        if self.block_buffer.is_empty() {
291            return;
292        }
293
294        let num_docs = self.next_doc_id - self.block_first_doc;
295        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
296        let seq = self.next_seq;
297        let first_doc_id = self.block_first_doc;
298        let dict = self.dict.clone();
299
300        self.next_seq += 1;
301        self.block_first_doc = self.next_doc_id;
302
303        // Spawn compression task using thread
304        let level = self.compression_level;
305        let handle = std::thread::spawn(move || {
306            let compressed = if let Some(ref d) = dict {
307                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
308            } else {
309                crate::compression::compress(&data, level).expect("compression failed")
310            };
311
312            CompressedBlock {
313                seq,
314                first_doc_id,
315                num_docs,
316                compressed,
317            }
318        });
319
320        self.pending_handles.push(handle);
321    }
322
323    /// Collect any completed compression tasks
324    fn collect_completed(&mut self) {
325        let mut remaining = Vec::new();
326        for handle in self.pending_handles.drain(..) {
327            if handle.is_finished() {
328                match handle.join() {
329                    Ok(block) => self.compressed_blocks.push(block),
330                    Err(payload) => std::panic::resume_unwind(payload),
331                }
332            } else {
333                remaining.push(handle);
334            }
335        }
336        self.pending_handles = remaining;
337    }
338
339    pub fn finish(mut self) -> io::Result<u32> {
340        // Spawn compression for any remaining data
341        self.spawn_compression();
342
343        // Collect any already-completed tasks
344        self.collect_completed();
345
346        // Wait for all remaining compression tasks
347        for handle in self.pending_handles.drain(..) {
348            match handle.join() {
349                Ok(block) => self.compressed_blocks.push(block),
350                Err(payload) => std::panic::resume_unwind(payload),
351            }
352        }
353
354        if self.compressed_blocks.is_empty() {
355            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
356            return Ok(0);
357        }
358
359        // Sort by sequence to maintain order
360        self.compressed_blocks.sort_by_key(|b| b.seq);
361
362        // Write blocks in order and build index
363        let mut index = Vec::with_capacity(self.compressed_blocks.len());
364        let mut current_offset = 0u64;
365
366        for block in &self.compressed_blocks {
367            index.push(StoreBlockIndex {
368                first_doc_id: block.first_doc_id,
369                offset: current_offset,
370                length: block.compressed.len() as u32,
371                num_docs: block.num_docs,
372            });
373
374            self.writer.write_all(&block.compressed)?;
375            current_offset += block.compressed.len() as u64;
376        }
377
378        let data_end_offset = current_offset;
379
380        // Write dictionary if present
381        let dict_offset = if let Some(ref dict) = self.dict {
382            let offset = current_offset;
383            let dict_bytes = dict.as_bytes();
384            self.writer
385                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
386            self.writer.write_all(dict_bytes)?;
387            Some(offset)
388        } else {
389            None
390        };
391
392        // Write index + footer
393        write_store_index_and_footer(
394            &mut self.writer,
395            &index,
396            data_end_offset,
397            dict_offset.unwrap_or(0),
398            self.next_doc_id,
399            self.dict.is_some(),
400        )?;
401
402        Ok(self.next_doc_id)
403    }
404}
405
406/// Block index entry for document store
407#[derive(Debug, Clone)]
408pub(crate) struct StoreBlockIndex {
409    pub(crate) first_doc_id: DocId,
410    pub(crate) offset: u64,
411    pub(crate) length: u32,
412    pub(crate) num_docs: u32,
413}
414
415/// Async document store reader - loads blocks on demand
416pub struct AsyncStoreReader {
417    /// FileHandle for the data portion - fetches ranges on demand
418    data_slice: FileHandle,
419    /// Block index
420    index: Vec<StoreBlockIndex>,
421    num_docs: u32,
422    /// Optional compression dictionary
423    dict: Option<CompressionDict>,
424    /// Block cache
425    cache: RwLock<StoreBlockCache>,
426}
427
428/// Decompressed block with pre-built doc offset table.
429///
430/// The offset table is built once on decompression: `offsets[i]` is the byte
431/// position in `data` where doc `i`'s length prefix starts. This turns the
432/// O(n) linear scan per `get()` into O(1) direct indexing.
433struct CachedBlock {
434    data: Vec<u8>,
435    /// Byte offset of each doc's length prefix within `data`.
436    /// `offsets.len()` == number of docs in the block.
437    offsets: Vec<u32>,
438}
439
440impl CachedBlock {
441    fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
442        let mut offsets = Vec::with_capacity(num_docs as usize);
443        let mut pos = 0usize;
444        for _ in 0..num_docs {
445            if pos + 4 > data.len() {
446                return Err(io::Error::new(
447                    io::ErrorKind::InvalidData,
448                    "truncated block while building offset table",
449                ));
450            }
451            offsets.push(pos as u32);
452            let doc_len =
453                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
454                    as usize;
455            pos += 4 + doc_len;
456        }
457        Ok(Self { data, offsets })
458    }
459
460    /// Get doc bytes by index within the block (O(1))
461    fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
462        let idx = doc_offset_in_block as usize;
463        if idx >= self.offsets.len() {
464            return Err(io::Error::new(
465                io::ErrorKind::InvalidData,
466                "doc offset out of range",
467            ));
468        }
469        let start = self.offsets[idx] as usize;
470        if start + 4 > self.data.len() {
471            return Err(io::Error::new(
472                io::ErrorKind::InvalidData,
473                "truncated doc length",
474            ));
475        }
476        let doc_len = u32::from_le_bytes([
477            self.data[start],
478            self.data[start + 1],
479            self.data[start + 2],
480            self.data[start + 3],
481        ]) as usize;
482        let data_start = start + 4;
483        if data_start + doc_len > self.data.len() {
484            return Err(io::Error::new(
485                io::ErrorKind::InvalidData,
486                "doc data overflow",
487            ));
488        }
489        Ok(&self.data[data_start..data_start + doc_len])
490    }
491}
492
493/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
494///
495/// On `get()`, promotes accessed entry to MRU position.
496/// For typical cache sizes (16-64 blocks), the linear promote scan is negligible.
497struct StoreBlockCache {
498    blocks: FxHashMap<DocId, Arc<CachedBlock>>,
499    lru_order: std::collections::VecDeque<DocId>,
500    max_blocks: usize,
501}
502
503impl StoreBlockCache {
504    fn new(max_blocks: usize) -> Self {
505        Self {
506            blocks: FxHashMap::default(),
507            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
508            max_blocks,
509        }
510    }
511
512    /// Check cache without LRU promotion (safe for read-lock fast path)
513    fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
514        self.blocks.get(&first_doc_id).map(Arc::clone)
515    }
516
517    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
518        let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
519        self.promote(first_doc_id);
520        Some(block)
521    }
522
523    fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
524        if self.blocks.contains_key(&first_doc_id) {
525            self.promote(first_doc_id);
526            return;
527        }
528        while self.blocks.len() >= self.max_blocks {
529            if let Some(evict) = self.lru_order.pop_front() {
530                self.blocks.remove(&evict);
531            } else {
532                break;
533            }
534        }
535        self.blocks.insert(first_doc_id, block);
536        self.lru_order.push_back(first_doc_id);
537    }
538
539    fn promote(&mut self, key: DocId) {
540        if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
541            self.lru_order.remove(pos);
542            self.lru_order.push_back(key);
543        }
544    }
545}
546
547impl AsyncStoreReader {
548    /// Open a document store from FileHandle
549    /// Only loads footer and index into memory, data blocks are fetched on-demand
550    pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
551        let file_len = file_handle.len();
552        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
553        if file_len < 32 {
554            return Err(io::Error::new(
555                io::ErrorKind::InvalidData,
556                "Store too small",
557            ));
558        }
559
560        // Read footer (32 bytes)
561        let footer = file_handle
562            .read_bytes_range(file_len - 32..file_len)
563            .await?;
564        let mut reader = footer.as_slice();
565        let data_end_offset = reader.read_u64::<LittleEndian>()?;
566        let dict_offset = reader.read_u64::<LittleEndian>()?;
567        let num_docs = reader.read_u32::<LittleEndian>()?;
568        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
569        let version = reader.read_u32::<LittleEndian>()?;
570        let magic = reader.read_u32::<LittleEndian>()?;
571
572        if magic != STORE_MAGIC {
573            return Err(io::Error::new(
574                io::ErrorKind::InvalidData,
575                "Invalid store magic",
576            ));
577        }
578        if version != STORE_VERSION {
579            return Err(io::Error::new(
580                io::ErrorKind::InvalidData,
581                format!("Unsupported store version: {}", version),
582            ));
583        }
584
585        // Load dictionary if present, and compute index_start in one pass
586        let (dict, index_start) = if has_dict && dict_offset > 0 {
587            let dict_start = dict_offset;
588            let dict_len_bytes = file_handle
589                .read_bytes_range(dict_start..dict_start + 4)
590                .await?;
591            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
592            let dict_bytes = file_handle
593                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
594                .await?;
595            let idx_start = dict_start + 4 + dict_len;
596            (
597                Some(CompressionDict::from_owned_bytes(dict_bytes)),
598                idx_start,
599            )
600        } else {
601            (None, data_end_offset)
602        };
603        let index_end = file_len - 32;
604
605        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
606        let mut reader = index_bytes.as_slice();
607
608        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
609        let mut index = Vec::with_capacity(num_blocks);
610
611        for _ in 0..num_blocks {
612            let first_doc_id = reader.read_u32::<LittleEndian>()?;
613            let offset = reader.read_u64::<LittleEndian>()?;
614            let length = reader.read_u32::<LittleEndian>()?;
615            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
616
617            index.push(StoreBlockIndex {
618                first_doc_id,
619                offset,
620                length,
621                num_docs: num_docs_in_block,
622            });
623        }
624
625        // Create lazy slice for data portion only
626        let data_slice = file_handle.slice(0..data_end_offset);
627
628        Ok(Self {
629            data_slice,
630            index,
631            num_docs,
632            dict,
633            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
634        })
635    }
636
637    /// Number of documents
638    pub fn num_docs(&self) -> u32 {
639        self.num_docs
640    }
641
642    /// Number of blocks currently in the cache
643    pub fn cached_blocks(&self) -> usize {
644        self.cache.read().blocks.len()
645    }
646
647    /// Get a document by doc_id (async - may load block)
648    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
649        if doc_id >= self.num_docs {
650            return Ok(None);
651        }
652
653        let (entry, block) = self.find_and_load_block(doc_id).await?;
654        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
655        deserialize_document(doc_bytes, schema).map(Some)
656    }
657
658    /// Get specific fields of a document by doc_id (async - may load block)
659    ///
660    /// Only deserializes the requested fields, skipping over unwanted data.
661    /// Much faster than `get()` when documents have large fields (text bodies,
662    /// vectors) that aren't needed for the response.
663    pub async fn get_fields(
664        &self,
665        doc_id: DocId,
666        schema: &Schema,
667        field_ids: &[u32],
668    ) -> io::Result<Option<Document>> {
669        if doc_id >= self.num_docs {
670            return Ok(None);
671        }
672
673        let (entry, block) = self.find_and_load_block(doc_id).await?;
674        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
675        deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
676    }
677
678    /// Find the block index entry and load/cache the block for a given doc_id
679    async fn find_and_load_block(
680        &self,
681        doc_id: DocId,
682    ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
683        let block_idx = self
684            .index
685            .binary_search_by(|entry| {
686                if doc_id < entry.first_doc_id {
687                    std::cmp::Ordering::Greater
688                } else if doc_id >= entry.first_doc_id + entry.num_docs {
689                    std::cmp::Ordering::Less
690                } else {
691                    std::cmp::Ordering::Equal
692                }
693            })
694            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
695
696        let entry = &self.index[block_idx];
697        let block = self.load_block(entry).await?;
698        Ok((entry, block))
699    }
700
701    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
702        // Fast path: read lock to check cache (allows concurrent readers)
703        {
704            let cache = self.cache.read();
705            if let Some(block) = cache.peek(entry.first_doc_id) {
706                return Ok(block);
707            }
708        }
709        // Slow path: write lock for LRU promotion or insert
710        {
711            if let Some(block) = self.cache.write().get(entry.first_doc_id) {
712                return Ok(block);
713            }
714        }
715
716        // Load from FileSlice
717        let start = entry.offset;
718        let end = start + entry.length as u64;
719        let compressed = self.data_slice.read_bytes_range(start..end).await?;
720
721        // Use dictionary decompression if available
722        let decompressed = if let Some(ref dict) = self.dict {
723            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
724        } else {
725            crate::compression::decompress(compressed.as_slice())?
726        };
727
728        // Build offset table for O(1) doc lookup within the block
729        let cached = CachedBlock::build(decompressed, entry.num_docs)?;
730        let block = Arc::new(cached);
731
732        // Insert into cache
733        {
734            let mut cache = self.cache.write();
735            cache.insert(entry.first_doc_id, Arc::clone(&block));
736        }
737
738        Ok(block)
739    }
740}
741
742/// Deserialize only specific fields from document bytes.
743///
744/// Skips over unwanted fields without allocating their values — just advances
745/// the reader past their length-prefixed data. For large documents with many
746/// fields (e.g., full text body), this avoids allocating/copying data that
747/// the caller doesn't need.
748pub fn deserialize_document_fields(
749    data: &[u8],
750    schema: &Schema,
751    field_ids: &[u32],
752) -> io::Result<Document> {
753    deserialize_document_inner(data, schema, Some(field_ids))
754}
755
756/// Deserialize all fields from document bytes.
757///
758/// Delegates to the shared field-parsing core with no field filter.
759pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
760    deserialize_document_inner(data, schema, None)
761}
762
763/// Shared deserialization core. `field_filter = None` means all fields wanted.
764fn deserialize_document_inner(
765    data: &[u8],
766    _schema: &Schema,
767    field_filter: Option<&[u32]>,
768) -> io::Result<Document> {
769    use crate::dsl::Field;
770
771    let mut reader = data;
772    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
773    let mut doc = Document::new();
774
775    for _ in 0..num_fields {
776        let field_id = reader.read_u16::<LittleEndian>()?;
777        let type_tag = reader.read_u8()?;
778
779        let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
780
781        match type_tag {
782            0 => {
783                // Text
784                let len = reader.read_u32::<LittleEndian>()? as usize;
785                if wanted {
786                    let s = std::str::from_utf8(&reader[..len])
787                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
788                    doc.add_text(Field(field_id as u32), s);
789                }
790                reader = &reader[len..];
791            }
792            1 => {
793                // U64
794                let v = reader.read_u64::<LittleEndian>()?;
795                if wanted {
796                    doc.add_u64(Field(field_id as u32), v);
797                }
798            }
799            2 => {
800                // I64
801                let v = reader.read_i64::<LittleEndian>()?;
802                if wanted {
803                    doc.add_i64(Field(field_id as u32), v);
804                }
805            }
806            3 => {
807                // F64
808                let v = reader.read_f64::<LittleEndian>()?;
809                if wanted {
810                    doc.add_f64(Field(field_id as u32), v);
811                }
812            }
813            4 => {
814                // Bytes
815                let len = reader.read_u32::<LittleEndian>()? as usize;
816                if wanted {
817                    doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
818                }
819                reader = &reader[len..];
820            }
821            5 => {
822                // SparseVector
823                let count = reader.read_u32::<LittleEndian>()? as usize;
824                if wanted {
825                    let mut entries = Vec::with_capacity(count);
826                    for _ in 0..count {
827                        let idx = reader.read_u32::<LittleEndian>()?;
828                        let val = reader.read_f32::<LittleEndian>()?;
829                        entries.push((idx, val));
830                    }
831                    doc.add_sparse_vector(Field(field_id as u32), entries);
832                } else {
833                    let skip = count * 8;
834                    if skip > reader.len() {
835                        return Err(io::Error::new(
836                            io::ErrorKind::UnexpectedEof,
837                            "sparse vector skip overflow",
838                        ));
839                    }
840                    reader = &reader[skip..];
841                }
842            }
843            6 => {
844                // DenseVector
845                let count = reader.read_u32::<LittleEndian>()? as usize;
846                let byte_len = count * 4;
847                if byte_len > reader.len() {
848                    return Err(io::Error::new(
849                        io::ErrorKind::UnexpectedEof,
850                        "dense vector truncated",
851                    ));
852                }
853                if wanted {
854                    let mut values = vec![0.0f32; count];
855                    unsafe {
856                        std::ptr::copy_nonoverlapping(
857                            reader.as_ptr(),
858                            values.as_mut_ptr() as *mut u8,
859                            byte_len,
860                        );
861                    }
862                    doc.add_dense_vector(Field(field_id as u32), values);
863                }
864                reader = &reader[byte_len..];
865            }
866            7 => {
867                // Json
868                let len = reader.read_u32::<LittleEndian>()? as usize;
869                if len > reader.len() {
870                    return Err(io::Error::new(
871                        io::ErrorKind::UnexpectedEof,
872                        "json field truncated",
873                    ));
874                }
875                if wanted {
876                    let v: serde_json::Value = serde_json::from_slice(&reader[..len])
877                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
878                    doc.add_json(Field(field_id as u32), v);
879                }
880                reader = &reader[len..];
881            }
882            _ => {
883                return Err(io::Error::new(
884                    io::ErrorKind::InvalidData,
885                    format!("Unknown field type tag: {}", type_tag),
886                ));
887            }
888        }
889    }
890
891    Ok(doc)
892}
893
894/// Raw block info for store merging (without decompression)
895#[derive(Debug, Clone)]
896pub struct RawStoreBlock {
897    pub first_doc_id: DocId,
898    pub num_docs: u32,
899    pub offset: u64,
900    pub length: u32,
901}
902
903/// Store merger - concatenates compressed blocks from multiple stores without recompression
904///
905/// This is much faster than rebuilding stores since it avoids:
906/// - Decompressing blocks from source stores
907/// - Re-serializing documents
908/// - Re-compressing blocks at level 22
909///
910/// Limitations:
911/// - All source stores must NOT use dictionaries (or use the same dictionary)
912/// - Doc IDs are remapped sequentially
913pub struct StoreMerger<'a, W: Write> {
914    writer: &'a mut W,
915    index: Vec<StoreBlockIndex>,
916    current_offset: u64,
917    next_doc_id: DocId,
918}
919
920impl<'a, W: Write> StoreMerger<'a, W> {
921    pub fn new(writer: &'a mut W) -> Self {
922        Self {
923            writer,
924            index: Vec::new(),
925            current_offset: 0,
926            next_doc_id: 0,
927        }
928    }
929
930    /// Append raw compressed blocks from a store file
931    ///
932    /// `data_slice` should be the data portion of the store (before index/footer)
933    /// `blocks` contains the block metadata from the source store
934    pub async fn append_store(
935        &mut self,
936        data_slice: &FileHandle,
937        blocks: &[RawStoreBlock],
938    ) -> io::Result<()> {
939        for block in blocks {
940            // Read raw compressed block data
941            let start = block.offset;
942            let end = start + block.length as u64;
943            let compressed_data = data_slice.read_bytes_range(start..end).await?;
944
945            // Write to output
946            self.writer.write_all(compressed_data.as_slice())?;
947
948            // Add to index with remapped doc IDs
949            self.index.push(StoreBlockIndex {
950                first_doc_id: self.next_doc_id,
951                offset: self.current_offset,
952                length: block.length,
953                num_docs: block.num_docs,
954            });
955
956            self.current_offset += block.length as u64;
957            self.next_doc_id += block.num_docs;
958        }
959
960        Ok(())
961    }
962
963    /// Append blocks from a dict-compressed store by decompressing and recompressing.
964    ///
965    /// For stores that use dictionary compression, raw blocks can't be stacked
966    /// directly because the decompressor needs the original dictionary.
967    /// This method decompresses each block with the source dict, then
968    /// recompresses without a dictionary so the merged output is self-contained.
969    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
970        let dict = store.dict();
971        let data_slice = store.data_slice();
972        let blocks = store.block_index();
973
974        for block in blocks {
975            let start = block.offset;
976            let end = start + block.length as u64;
977            let compressed = data_slice.read_bytes_range(start..end).await?;
978
979            // Decompress with source dict (or without if no dict)
980            let decompressed = if let Some(d) = dict {
981                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
982            } else {
983                crate::compression::decompress(compressed.as_slice())?
984            };
985
986            // Recompress without dictionary
987            let recompressed = crate::compression::compress(
988                &decompressed,
989                crate::compression::CompressionLevel::default(),
990            )?;
991
992            self.writer.write_all(&recompressed)?;
993
994            self.index.push(StoreBlockIndex {
995                first_doc_id: self.next_doc_id,
996                offset: self.current_offset,
997                length: recompressed.len() as u32,
998                num_docs: block.num_docs,
999            });
1000
1001            self.current_offset += recompressed.len() as u64;
1002            self.next_doc_id += block.num_docs;
1003        }
1004
1005        Ok(())
1006    }
1007
1008    /// Finish writing the merged store
1009    pub fn finish(self) -> io::Result<u32> {
1010        let data_end_offset = self.current_offset;
1011
1012        // No dictionary support for merged stores (would need same dict across all sources)
1013        let dict_offset = 0u64;
1014
1015        // Write index + footer
1016        write_store_index_and_footer(
1017            self.writer,
1018            &self.index,
1019            data_end_offset,
1020            dict_offset,
1021            self.next_doc_id,
1022            false,
1023        )?;
1024
1025        Ok(self.next_doc_id)
1026    }
1027}
1028
1029impl AsyncStoreReader {
1030    /// Get raw block metadata for merging (without loading block data)
1031    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1032        self.index
1033            .iter()
1034            .map(|entry| RawStoreBlock {
1035                first_doc_id: entry.first_doc_id,
1036                num_docs: entry.num_docs,
1037                offset: entry.offset,
1038                length: entry.length,
1039            })
1040            .collect()
1041    }
1042
1043    /// Get the data slice for raw block access
1044    pub fn data_slice(&self) -> &FileHandle {
1045        &self.data_slice
1046    }
1047
1048    /// Check if this store uses a dictionary (incompatible with raw merging)
1049    pub fn has_dict(&self) -> bool {
1050        self.dict.is_some()
1051    }
1052
1053    /// Get the decompression dictionary (if any)
1054    pub fn dict(&self) -> Option<&CompressionDict> {
1055        self.dict.as_ref()
1056    }
1057
1058    /// Get block index for iteration
1059    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1060        &self.index
1061    }
1062}