Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (256KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::{AsyncFileRead, LazyFileHandle, LazyFileSlice};
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (256KB for better compression)
29/// Larger blocks = better compression ratio but more memory per block load
30pub const STORE_BLOCK_SIZE: usize = 256 * 1024;
31
32/// Default dictionary size (4KB is a good balance)
33pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
34
35/// Default compression level for document store
36#[cfg(feature = "native")]
37const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
38
39/// Write block index + footer to a store file.
40///
41/// Shared by `EagerParallelStoreWriter::finish` and `StoreMerger::finish`.
42fn write_store_index_and_footer(
43    writer: &mut (impl Write + ?Sized),
44    index: &[StoreBlockIndex],
45    data_end_offset: u64,
46    dict_offset: u64,
47    num_docs: u32,
48    has_dict: bool,
49) -> io::Result<()> {
50    writer.write_u32::<LittleEndian>(index.len() as u32)?;
51    for entry in index {
52        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
53        writer.write_u64::<LittleEndian>(entry.offset)?;
54        writer.write_u32::<LittleEndian>(entry.length)?;
55        writer.write_u32::<LittleEndian>(entry.num_docs)?;
56    }
57    writer.write_u64::<LittleEndian>(data_end_offset)?;
58    writer.write_u64::<LittleEndian>(dict_offset)?;
59    writer.write_u32::<LittleEndian>(num_docs)?;
60    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
61    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
62    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
63    Ok(())
64}
65
66/// Binary document format:
67///   num_fields: u16
68///   per field: field_id: u16, type_tag: u8, value data
69///     0=Text:         len:u32 + utf8
70///     1=U64:          u64 LE
71///     2=I64:          i64 LE
72///     3=F64:          f64 LE
73///     4=Bytes:        len:u32 + raw
74///     5=SparseVector: count:u32 + count*(u32+f32)
75///     6=DenseVector:  count:u32 + count*f32
76///     7=Json:         len:u32 + json utf8
77pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
78    let mut buf = Vec::with_capacity(256);
79    serialize_document_into(doc, schema, &mut buf)?;
80    Ok(buf)
81}
82
83/// Serialize a document into a reusable buffer (clears it first).
84/// Avoids per-document allocation when called in a loop.
85pub fn serialize_document_into(
86    doc: &Document,
87    schema: &Schema,
88    buf: &mut Vec<u8>,
89) -> io::Result<()> {
90    use crate::dsl::FieldValue;
91
92    buf.clear();
93
94    // Two-pass approach avoids allocating a Vec just to count + iterate stored fields.
95    let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
96        // Dense vectors live in .vectors (LazyFlatVectorData), not in .store
97        if matches!(value, FieldValue::DenseVector(_)) {
98            return false;
99        }
100        schema.get_field_entry(*field).is_some_and(|e| e.stored)
101    };
102
103    let stored_count = doc
104        .field_values()
105        .iter()
106        .filter(|(field, value)| is_stored(field, value))
107        .count();
108
109    buf.write_u16::<LittleEndian>(stored_count as u16)?;
110
111    for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
112        buf.write_u16::<LittleEndian>(field.0 as u16)?;
113        match value {
114            FieldValue::Text(s) => {
115                buf.push(0);
116                let bytes = s.as_bytes();
117                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
118                buf.extend_from_slice(bytes);
119            }
120            FieldValue::U64(v) => {
121                buf.push(1);
122                buf.write_u64::<LittleEndian>(*v)?;
123            }
124            FieldValue::I64(v) => {
125                buf.push(2);
126                buf.write_i64::<LittleEndian>(*v)?;
127            }
128            FieldValue::F64(v) => {
129                buf.push(3);
130                buf.write_f64::<LittleEndian>(*v)?;
131            }
132            FieldValue::Bytes(b) => {
133                buf.push(4);
134                buf.write_u32::<LittleEndian>(b.len() as u32)?;
135                buf.extend_from_slice(b);
136            }
137            FieldValue::SparseVector(entries) => {
138                buf.push(5);
139                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
140                for (idx, val) in entries {
141                    buf.write_u32::<LittleEndian>(*idx)?;
142                    buf.write_f32::<LittleEndian>(*val)?;
143                }
144            }
145            FieldValue::DenseVector(values) => {
146                buf.push(6);
147                buf.write_u32::<LittleEndian>(values.len() as u32)?;
148                // Write raw f32 bytes directly
149                let byte_slice = unsafe {
150                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
151                };
152                buf.extend_from_slice(byte_slice);
153            }
154            FieldValue::Json(v) => {
155                buf.push(7);
156                let json_bytes = serde_json::to_vec(v)
157                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
158                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
159                buf.extend_from_slice(&json_bytes);
160            }
161        }
162    }
163
164    Ok(())
165}
166
167/// Compressed block result
168#[cfg(feature = "native")]
169struct CompressedBlock {
170    seq: usize,
171    first_doc_id: DocId,
172    num_docs: u32,
173    compressed: Vec<u8>,
174}
175
176/// Parallel document store writer - compresses blocks immediately when queued
177///
178/// Spawns compression tasks as soon as blocks are ready, overlapping document
179/// ingestion with compression to reduce total indexing time.
180///
181/// Uses background threads to compress blocks while the main thread continues
182/// accepting documents.
183#[cfg(feature = "native")]
184pub struct EagerParallelStoreWriter<'a> {
185    writer: &'a mut dyn Write,
186    block_buffer: Vec<u8>,
187    /// Reusable buffer for document serialization (avoids per-doc allocation)
188    serialize_buf: Vec<u8>,
189    /// Compressed blocks ready to be written (may arrive out of order)
190    compressed_blocks: Vec<CompressedBlock>,
191    /// Handles for in-flight compression tasks
192    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
193    next_seq: usize,
194    next_doc_id: DocId,
195    block_first_doc: DocId,
196    dict: Option<Arc<CompressionDict>>,
197    compression_level: CompressionLevel,
198}
199
200#[cfg(feature = "native")]
201impl<'a> EagerParallelStoreWriter<'a> {
202    /// Create a new eager parallel store writer
203    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
204        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
205    }
206
207    /// Create with specific compression level
208    pub fn with_compression_level(
209        writer: &'a mut dyn Write,
210        _num_threads: usize,
211        compression_level: CompressionLevel,
212    ) -> Self {
213        Self {
214            writer,
215            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
216            serialize_buf: Vec::with_capacity(512),
217            compressed_blocks: Vec::new(),
218            pending_handles: Vec::new(),
219            next_seq: 0,
220            next_doc_id: 0,
221            block_first_doc: 0,
222            dict: None,
223            compression_level,
224        }
225    }
226
227    /// Create with dictionary
228    pub fn with_dict(
229        writer: &'a mut dyn Write,
230        dict: CompressionDict,
231        _num_threads: usize,
232    ) -> Self {
233        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
234    }
235
236    /// Create with dictionary and specific compression level
237    pub fn with_dict_and_level(
238        writer: &'a mut dyn Write,
239        dict: CompressionDict,
240        _num_threads: usize,
241        compression_level: CompressionLevel,
242    ) -> Self {
243        Self {
244            writer,
245            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
246            serialize_buf: Vec::with_capacity(512),
247            compressed_blocks: Vec::new(),
248            pending_handles: Vec::new(),
249            next_seq: 0,
250            next_doc_id: 0,
251            block_first_doc: 0,
252            dict: Some(Arc::new(dict)),
253            compression_level,
254        }
255    }
256
257    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
258        serialize_document_into(doc, schema, &mut self.serialize_buf)?;
259        let doc_id = self.next_doc_id;
260        self.next_doc_id += 1;
261        self.block_buffer
262            .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
263        self.block_buffer.extend_from_slice(&self.serialize_buf);
264        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
265            self.spawn_compression();
266        }
267        Ok(doc_id)
268    }
269
270    /// Store pre-serialized document bytes directly (avoids deserialize+reserialize roundtrip).
271    pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
272        let doc_id = self.next_doc_id;
273        self.next_doc_id += 1;
274
275        self.block_buffer
276            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
277        self.block_buffer.extend_from_slice(doc_bytes);
278
279        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
280            self.spawn_compression();
281        }
282
283        Ok(doc_id)
284    }
285
286    /// Spawn compression for the current block immediately
287    fn spawn_compression(&mut self) {
288        if self.block_buffer.is_empty() {
289            return;
290        }
291
292        let num_docs = self.next_doc_id - self.block_first_doc;
293        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
294        let seq = self.next_seq;
295        let first_doc_id = self.block_first_doc;
296        let dict = self.dict.clone();
297
298        self.next_seq += 1;
299        self.block_first_doc = self.next_doc_id;
300
301        // Spawn compression task using thread
302        let level = self.compression_level;
303        let handle = std::thread::spawn(move || {
304            let compressed = if let Some(ref d) = dict {
305                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
306            } else {
307                crate::compression::compress(&data, level).expect("compression failed")
308            };
309
310            CompressedBlock {
311                seq,
312                first_doc_id,
313                num_docs,
314                compressed,
315            }
316        });
317
318        self.pending_handles.push(handle);
319    }
320
321    /// Collect any completed compression tasks
322    fn collect_completed(&mut self) {
323        let mut remaining = Vec::new();
324        for handle in self.pending_handles.drain(..) {
325            if handle.is_finished() {
326                if let Ok(block) = handle.join() {
327                    self.compressed_blocks.push(block);
328                }
329            } else {
330                remaining.push(handle);
331            }
332        }
333        self.pending_handles = remaining;
334    }
335
336    pub fn finish(mut self) -> io::Result<u32> {
337        // Spawn compression for any remaining data
338        self.spawn_compression();
339
340        // Collect any already-completed tasks
341        self.collect_completed();
342
343        // Wait for all remaining compression tasks
344        for handle in self.pending_handles.drain(..) {
345            if let Ok(block) = handle.join() {
346                self.compressed_blocks.push(block);
347            }
348        }
349
350        if self.compressed_blocks.is_empty() {
351            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
352            return Ok(0);
353        }
354
355        // Sort by sequence to maintain order
356        self.compressed_blocks.sort_by_key(|b| b.seq);
357
358        // Write blocks in order and build index
359        let mut index = Vec::with_capacity(self.compressed_blocks.len());
360        let mut current_offset = 0u64;
361
362        for block in &self.compressed_blocks {
363            index.push(StoreBlockIndex {
364                first_doc_id: block.first_doc_id,
365                offset: current_offset,
366                length: block.compressed.len() as u32,
367                num_docs: block.num_docs,
368            });
369
370            self.writer.write_all(&block.compressed)?;
371            current_offset += block.compressed.len() as u64;
372        }
373
374        let data_end_offset = current_offset;
375
376        // Write dictionary if present
377        let dict_offset = if let Some(ref dict) = self.dict {
378            let offset = current_offset;
379            let dict_bytes = dict.as_bytes();
380            self.writer
381                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
382            self.writer.write_all(dict_bytes)?;
383            Some(offset)
384        } else {
385            None
386        };
387
388        // Write index + footer
389        write_store_index_and_footer(
390            &mut self.writer,
391            &index,
392            data_end_offset,
393            dict_offset.unwrap_or(0),
394            self.next_doc_id,
395            self.dict.is_some(),
396        )?;
397
398        Ok(self.next_doc_id)
399    }
400}
401
402/// Block index entry for document store
403#[derive(Debug, Clone)]
404pub(crate) struct StoreBlockIndex {
405    pub(crate) first_doc_id: DocId,
406    pub(crate) offset: u64,
407    pub(crate) length: u32,
408    pub(crate) num_docs: u32,
409}
410
411/// Async document store reader - loads blocks on demand
412pub struct AsyncStoreReader {
413    /// LazyFileSlice for the data portion - fetches ranges on demand
414    data_slice: LazyFileSlice,
415    /// Block index
416    index: Vec<StoreBlockIndex>,
417    num_docs: u32,
418    /// Optional compression dictionary
419    dict: Option<CompressionDict>,
420    /// Block cache
421    cache: RwLock<StoreBlockCache>,
422}
423
424/// Decompressed block with pre-built doc offset table.
425///
426/// The offset table is built once on decompression: `offsets[i]` is the byte
427/// position in `data` where doc `i`'s length prefix starts. This turns the
428/// O(n) linear scan per `get()` into O(1) direct indexing.
429struct CachedBlock {
430    data: Vec<u8>,
431    /// Byte offset of each doc's length prefix within `data`.
432    /// `offsets.len()` == number of docs in the block.
433    offsets: Vec<u32>,
434}
435
436impl CachedBlock {
437    fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
438        let mut offsets = Vec::with_capacity(num_docs as usize);
439        let mut pos = 0usize;
440        for _ in 0..num_docs {
441            if pos + 4 > data.len() {
442                return Err(io::Error::new(
443                    io::ErrorKind::InvalidData,
444                    "truncated block while building offset table",
445                ));
446            }
447            offsets.push(pos as u32);
448            let doc_len =
449                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
450                    as usize;
451            pos += 4 + doc_len;
452        }
453        Ok(Self { data, offsets })
454    }
455
456    /// Get doc bytes by index within the block (O(1))
457    fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
458        let idx = doc_offset_in_block as usize;
459        if idx >= self.offsets.len() {
460            return Err(io::Error::new(
461                io::ErrorKind::InvalidData,
462                "doc offset out of range",
463            ));
464        }
465        let start = self.offsets[idx] as usize;
466        if start + 4 > self.data.len() {
467            return Err(io::Error::new(
468                io::ErrorKind::InvalidData,
469                "truncated doc length",
470            ));
471        }
472        let doc_len = u32::from_le_bytes([
473            self.data[start],
474            self.data[start + 1],
475            self.data[start + 2],
476            self.data[start + 3],
477        ]) as usize;
478        let data_start = start + 4;
479        if data_start + doc_len > self.data.len() {
480            return Err(io::Error::new(
481                io::ErrorKind::InvalidData,
482                "doc data overflow",
483            ));
484        }
485        Ok(&self.data[data_start..data_start + doc_len])
486    }
487}
488
489/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
490///
491/// On `get()`, promotes accessed entry to MRU position.
492/// For typical cache sizes (16-64 blocks), the linear promote scan is negligible.
493struct StoreBlockCache {
494    blocks: FxHashMap<DocId, Arc<CachedBlock>>,
495    lru_order: std::collections::VecDeque<DocId>,
496    max_blocks: usize,
497}
498
499impl StoreBlockCache {
500    fn new(max_blocks: usize) -> Self {
501        Self {
502            blocks: FxHashMap::default(),
503            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
504            max_blocks,
505        }
506    }
507
508    /// Check cache without LRU promotion (safe for read-lock fast path)
509    fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
510        self.blocks.get(&first_doc_id).map(Arc::clone)
511    }
512
513    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
514        let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
515        self.promote(first_doc_id);
516        Some(block)
517    }
518
519    fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
520        if self.blocks.contains_key(&first_doc_id) {
521            self.promote(first_doc_id);
522            return;
523        }
524        while self.blocks.len() >= self.max_blocks {
525            if let Some(evict) = self.lru_order.pop_front() {
526                self.blocks.remove(&evict);
527            } else {
528                break;
529            }
530        }
531        self.blocks.insert(first_doc_id, block);
532        self.lru_order.push_back(first_doc_id);
533    }
534
535    fn promote(&mut self, key: DocId) {
536        if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
537            self.lru_order.remove(pos);
538            self.lru_order.push_back(key);
539        }
540    }
541}
542
543impl AsyncStoreReader {
544    /// Open a document store from LazyFileHandle
545    /// Only loads footer and index into memory, data blocks are fetched on-demand
546    pub async fn open(file_handle: LazyFileHandle, cache_blocks: usize) -> io::Result<Self> {
547        let file_len = file_handle.len();
548        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
549        if file_len < 32 {
550            return Err(io::Error::new(
551                io::ErrorKind::InvalidData,
552                "Store too small",
553            ));
554        }
555
556        // Read footer (32 bytes)
557        let footer = file_handle
558            .read_bytes_range(file_len - 32..file_len)
559            .await?;
560        let mut reader = footer.as_slice();
561        let data_end_offset = reader.read_u64::<LittleEndian>()?;
562        let dict_offset = reader.read_u64::<LittleEndian>()?;
563        let num_docs = reader.read_u32::<LittleEndian>()?;
564        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
565        let version = reader.read_u32::<LittleEndian>()?;
566        let magic = reader.read_u32::<LittleEndian>()?;
567
568        if magic != STORE_MAGIC {
569            return Err(io::Error::new(
570                io::ErrorKind::InvalidData,
571                "Invalid store magic",
572            ));
573        }
574        if version != STORE_VERSION {
575            return Err(io::Error::new(
576                io::ErrorKind::InvalidData,
577                format!("Unsupported store version: {}", version),
578            ));
579        }
580
581        // Load dictionary if present, and compute index_start in one pass
582        let (dict, index_start) = if has_dict && dict_offset > 0 {
583            let dict_start = dict_offset;
584            let dict_len_bytes = file_handle
585                .read_bytes_range(dict_start..dict_start + 4)
586                .await?;
587            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
588            let dict_bytes = file_handle
589                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
590                .await?;
591            let idx_start = dict_start + 4 + dict_len;
592            (
593                Some(CompressionDict::from_owned_bytes(dict_bytes)),
594                idx_start,
595            )
596        } else {
597            (None, data_end_offset)
598        };
599        let index_end = file_len - 32;
600
601        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
602        let mut reader = index_bytes.as_slice();
603
604        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
605        let mut index = Vec::with_capacity(num_blocks);
606
607        for _ in 0..num_blocks {
608            let first_doc_id = reader.read_u32::<LittleEndian>()?;
609            let offset = reader.read_u64::<LittleEndian>()?;
610            let length = reader.read_u32::<LittleEndian>()?;
611            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
612
613            index.push(StoreBlockIndex {
614                first_doc_id,
615                offset,
616                length,
617                num_docs: num_docs_in_block,
618            });
619        }
620
621        // Create lazy slice for data portion only
622        let data_slice = file_handle.slice(0..data_end_offset);
623
624        Ok(Self {
625            data_slice,
626            index,
627            num_docs,
628            dict,
629            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
630        })
631    }
632
633    /// Number of documents
634    pub fn num_docs(&self) -> u32 {
635        self.num_docs
636    }
637
638    /// Number of blocks currently in the cache
639    pub fn cached_blocks(&self) -> usize {
640        self.cache.read().blocks.len()
641    }
642
643    /// Get a document by doc_id (async - may load block)
644    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
645        if doc_id >= self.num_docs {
646            return Ok(None);
647        }
648
649        let (entry, block) = self.find_and_load_block(doc_id).await?;
650        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
651        deserialize_document(doc_bytes, schema).map(Some)
652    }
653
654    /// Get specific fields of a document by doc_id (async - may load block)
655    ///
656    /// Only deserializes the requested fields, skipping over unwanted data.
657    /// Much faster than `get()` when documents have large fields (text bodies,
658    /// vectors) that aren't needed for the response.
659    pub async fn get_fields(
660        &self,
661        doc_id: DocId,
662        schema: &Schema,
663        field_ids: &[u32],
664    ) -> io::Result<Option<Document>> {
665        if doc_id >= self.num_docs {
666            return Ok(None);
667        }
668
669        let (entry, block) = self.find_and_load_block(doc_id).await?;
670        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
671        deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
672    }
673
674    /// Find the block index entry and load/cache the block for a given doc_id
675    async fn find_and_load_block(
676        &self,
677        doc_id: DocId,
678    ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
679        let block_idx = self
680            .index
681            .binary_search_by(|entry| {
682                if doc_id < entry.first_doc_id {
683                    std::cmp::Ordering::Greater
684                } else if doc_id >= entry.first_doc_id + entry.num_docs {
685                    std::cmp::Ordering::Less
686                } else {
687                    std::cmp::Ordering::Equal
688                }
689            })
690            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
691
692        let entry = &self.index[block_idx];
693        let block = self.load_block(entry).await?;
694        Ok((entry, block))
695    }
696
697    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
698        // Fast path: read lock to check cache (allows concurrent readers)
699        {
700            let cache = self.cache.read();
701            if let Some(block) = cache.peek(entry.first_doc_id) {
702                return Ok(block);
703            }
704        }
705        // Slow path: write lock for LRU promotion or insert
706        {
707            if let Some(block) = self.cache.write().get(entry.first_doc_id) {
708                return Ok(block);
709            }
710        }
711
712        // Load from FileSlice
713        let start = entry.offset;
714        let end = start + entry.length as u64;
715        let compressed = self.data_slice.read_bytes_range(start..end).await?;
716
717        // Use dictionary decompression if available
718        let decompressed = if let Some(ref dict) = self.dict {
719            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
720        } else {
721            crate::compression::decompress(compressed.as_slice())?
722        };
723
724        // Build offset table for O(1) doc lookup within the block
725        let cached = CachedBlock::build(decompressed, entry.num_docs)?;
726        let block = Arc::new(cached);
727
728        // Insert into cache
729        {
730            let mut cache = self.cache.write();
731            cache.insert(entry.first_doc_id, Arc::clone(&block));
732        }
733
734        Ok(block)
735    }
736}
737
738/// Deserialize only specific fields from document bytes.
739///
740/// Skips over unwanted fields without allocating their values — just advances
741/// the reader past their length-prefixed data. For large documents with many
742/// fields (e.g., full text body), this avoids allocating/copying data that
743/// the caller doesn't need.
744pub fn deserialize_document_fields(
745    data: &[u8],
746    schema: &Schema,
747    field_ids: &[u32],
748) -> io::Result<Document> {
749    deserialize_document_inner(data, schema, Some(field_ids))
750}
751
752/// Deserialize all fields from document bytes.
753///
754/// Delegates to the shared field-parsing core with no field filter.
755pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
756    deserialize_document_inner(data, schema, None)
757}
758
759/// Shared deserialization core. `field_filter = None` means all fields wanted.
760fn deserialize_document_inner(
761    data: &[u8],
762    _schema: &Schema,
763    field_filter: Option<&[u32]>,
764) -> io::Result<Document> {
765    use crate::dsl::Field;
766
767    let mut reader = data;
768    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
769    let mut doc = Document::new();
770
771    for _ in 0..num_fields {
772        let field_id = reader.read_u16::<LittleEndian>()?;
773        let type_tag = reader.read_u8()?;
774
775        let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
776
777        match type_tag {
778            0 => {
779                // Text
780                let len = reader.read_u32::<LittleEndian>()? as usize;
781                if wanted {
782                    let s = std::str::from_utf8(&reader[..len])
783                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
784                    doc.add_text(Field(field_id as u32), s);
785                }
786                reader = &reader[len..];
787            }
788            1 => {
789                // U64
790                let v = reader.read_u64::<LittleEndian>()?;
791                if wanted {
792                    doc.add_u64(Field(field_id as u32), v);
793                }
794            }
795            2 => {
796                // I64
797                let v = reader.read_i64::<LittleEndian>()?;
798                if wanted {
799                    doc.add_i64(Field(field_id as u32), v);
800                }
801            }
802            3 => {
803                // F64
804                let v = reader.read_f64::<LittleEndian>()?;
805                if wanted {
806                    doc.add_f64(Field(field_id as u32), v);
807                }
808            }
809            4 => {
810                // Bytes
811                let len = reader.read_u32::<LittleEndian>()? as usize;
812                if wanted {
813                    doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
814                }
815                reader = &reader[len..];
816            }
817            5 => {
818                // SparseVector
819                let count = reader.read_u32::<LittleEndian>()? as usize;
820                if wanted {
821                    let mut entries = Vec::with_capacity(count);
822                    for _ in 0..count {
823                        let idx = reader.read_u32::<LittleEndian>()?;
824                        let val = reader.read_f32::<LittleEndian>()?;
825                        entries.push((idx, val));
826                    }
827                    doc.add_sparse_vector(Field(field_id as u32), entries);
828                } else {
829                    let skip = count * 8;
830                    if skip > reader.len() {
831                        return Err(io::Error::new(
832                            io::ErrorKind::UnexpectedEof,
833                            "sparse vector skip overflow",
834                        ));
835                    }
836                    reader = &reader[skip..];
837                }
838            }
839            6 => {
840                // DenseVector
841                let count = reader.read_u32::<LittleEndian>()? as usize;
842                let byte_len = count * 4;
843                if byte_len > reader.len() {
844                    return Err(io::Error::new(
845                        io::ErrorKind::UnexpectedEof,
846                        "dense vector truncated",
847                    ));
848                }
849                if wanted {
850                    let mut values = vec![0.0f32; count];
851                    unsafe {
852                        std::ptr::copy_nonoverlapping(
853                            reader.as_ptr(),
854                            values.as_mut_ptr() as *mut u8,
855                            byte_len,
856                        );
857                    }
858                    doc.add_dense_vector(Field(field_id as u32), values);
859                }
860                reader = &reader[byte_len..];
861            }
862            7 => {
863                // Json
864                let len = reader.read_u32::<LittleEndian>()? as usize;
865                if len > reader.len() {
866                    return Err(io::Error::new(
867                        io::ErrorKind::UnexpectedEof,
868                        "json field truncated",
869                    ));
870                }
871                if wanted {
872                    let v: serde_json::Value = serde_json::from_slice(&reader[..len])
873                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
874                    doc.add_json(Field(field_id as u32), v);
875                }
876                reader = &reader[len..];
877            }
878            _ => {
879                return Err(io::Error::new(
880                    io::ErrorKind::InvalidData,
881                    format!("Unknown field type tag: {}", type_tag),
882                ));
883            }
884        }
885    }
886
887    Ok(doc)
888}
889
890/// Raw block info for store merging (without decompression)
891#[derive(Debug, Clone)]
892pub struct RawStoreBlock {
893    pub first_doc_id: DocId,
894    pub num_docs: u32,
895    pub offset: u64,
896    pub length: u32,
897}
898
899/// Store merger - concatenates compressed blocks from multiple stores without recompression
900///
901/// This is much faster than rebuilding stores since it avoids:
902/// - Decompressing blocks from source stores
903/// - Re-serializing documents
904/// - Re-compressing blocks at level 22
905///
906/// Limitations:
907/// - All source stores must NOT use dictionaries (or use the same dictionary)
908/// - Doc IDs are remapped sequentially
909pub struct StoreMerger<'a, W: Write> {
910    writer: &'a mut W,
911    index: Vec<StoreBlockIndex>,
912    current_offset: u64,
913    next_doc_id: DocId,
914}
915
916impl<'a, W: Write> StoreMerger<'a, W> {
917    pub fn new(writer: &'a mut W) -> Self {
918        Self {
919            writer,
920            index: Vec::new(),
921            current_offset: 0,
922            next_doc_id: 0,
923        }
924    }
925
926    /// Append raw compressed blocks from a store file
927    ///
928    /// `data_slice` should be the data portion of the store (before index/footer)
929    /// `blocks` contains the block metadata from the source store
930    pub async fn append_store<F: AsyncFileRead>(
931        &mut self,
932        data_slice: &F,
933        blocks: &[RawStoreBlock],
934    ) -> io::Result<()> {
935        for block in blocks {
936            // Read raw compressed block data
937            let start = block.offset;
938            let end = start + block.length as u64;
939            let compressed_data = data_slice.read_bytes_range(start..end).await?;
940
941            // Write to output
942            self.writer.write_all(compressed_data.as_slice())?;
943
944            // Add to index with remapped doc IDs
945            self.index.push(StoreBlockIndex {
946                first_doc_id: self.next_doc_id,
947                offset: self.current_offset,
948                length: block.length,
949                num_docs: block.num_docs,
950            });
951
952            self.current_offset += block.length as u64;
953            self.next_doc_id += block.num_docs;
954        }
955
956        Ok(())
957    }
958
959    /// Append blocks from a dict-compressed store by decompressing and recompressing.
960    ///
961    /// For stores that use dictionary compression, raw blocks can't be stacked
962    /// directly because the decompressor needs the original dictionary.
963    /// This method decompresses each block with the source dict, then
964    /// recompresses without a dictionary so the merged output is self-contained.
965    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
966        let dict = store.dict();
967        let data_slice = store.data_slice();
968        let blocks = store.block_index();
969
970        for block in blocks {
971            let start = block.offset;
972            let end = start + block.length as u64;
973            let compressed = data_slice.read_bytes_range(start..end).await?;
974
975            // Decompress with source dict (or without if no dict)
976            let decompressed = if let Some(d) = dict {
977                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
978            } else {
979                crate::compression::decompress(compressed.as_slice())?
980            };
981
982            // Recompress without dictionary
983            let recompressed = crate::compression::compress(
984                &decompressed,
985                crate::compression::CompressionLevel::default(),
986            )?;
987
988            self.writer.write_all(&recompressed)?;
989
990            self.index.push(StoreBlockIndex {
991                first_doc_id: self.next_doc_id,
992                offset: self.current_offset,
993                length: recompressed.len() as u32,
994                num_docs: block.num_docs,
995            });
996
997            self.current_offset += recompressed.len() as u64;
998            self.next_doc_id += block.num_docs;
999        }
1000
1001        Ok(())
1002    }
1003
1004    /// Finish writing the merged store
1005    pub fn finish(self) -> io::Result<u32> {
1006        let data_end_offset = self.current_offset;
1007
1008        // No dictionary support for merged stores (would need same dict across all sources)
1009        let dict_offset = 0u64;
1010
1011        // Write index + footer
1012        write_store_index_and_footer(
1013            self.writer,
1014            &self.index,
1015            data_end_offset,
1016            dict_offset,
1017            self.next_doc_id,
1018            false,
1019        )?;
1020
1021        Ok(self.next_doc_id)
1022    }
1023}
1024
1025impl AsyncStoreReader {
1026    /// Get raw block metadata for merging (without loading block data)
1027    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1028        self.index
1029            .iter()
1030            .map(|entry| RawStoreBlock {
1031                first_doc_id: entry.first_doc_id,
1032                num_docs: entry.num_docs,
1033                offset: entry.offset,
1034                length: entry.length,
1035            })
1036            .collect()
1037    }
1038
1039    /// Get the data slice for raw block access
1040    pub fn data_slice(&self) -> &LazyFileSlice {
1041        &self.data_slice
1042    }
1043
1044    /// Check if this store uses a dictionary (incompatible with raw merging)
1045    pub fn has_dict(&self) -> bool {
1046        self.dict.is_some()
1047    }
1048
1049    /// Get the decompression dictionary (if any)
1050    pub fn dict(&self) -> Option<&CompressionDict> {
1051        self.dict.as_ref()
1052    }
1053
1054    /// Get block index for iteration
1055    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1056        &self.index
1057    }
1058}