Skip to main content

hermes_core/segment/
store.rs

1//! Document store with Zstd compression and lazy loading
2//!
3//! Optimized for static indexes:
4//! - Maximum compression level (22) for best compression ratio
5//! - Larger block sizes (256KB) for better compression efficiency
6//! - Optional trained dictionary support for even better compression
7//! - Parallel compression support for faster indexing
8//!
9//! Writer stores documents in compressed blocks.
10//! Reader only loads index into memory, blocks are loaded on-demand.
11
12use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
13use parking_lot::RwLock;
14use rustc_hash::FxHashMap;
15use std::io::{self, Write};
16use std::sync::Arc;
17
18use crate::DocId;
19use crate::compression::CompressionDict;
20#[cfg(feature = "native")]
21use crate::compression::CompressionLevel;
22use crate::directories::FileHandle;
23use crate::dsl::{Document, Schema};
24
25const STORE_MAGIC: u32 = 0x53544F52; // "STOR"
26const STORE_VERSION: u32 = 2; // Version 2 supports dictionaries
27
28/// Block size for document store (16KB).
29/// Smaller blocks reduce read amplification for single-doc fetches at the
30/// cost of slightly worse compression ratio. Zstd dictionary training
31/// recovers most of the compression loss.
32pub const STORE_BLOCK_SIZE: usize = 16 * 1024;
33
34/// Default dictionary size (4KB is a good balance)
35pub const DEFAULT_DICT_SIZE: usize = 4 * 1024;
36
37/// Default compression level for document store
38#[cfg(feature = "native")]
39const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel(3);
40
41/// Write block index + footer to a store file.
42///
43/// Shared by `EagerParallelStoreWriter::finish` and `StoreMerger::finish`.
44fn write_store_index_and_footer(
45    writer: &mut (impl Write + ?Sized),
46    index: &[StoreBlockIndex],
47    data_end_offset: u64,
48    dict_offset: u64,
49    num_docs: u32,
50    has_dict: bool,
51) -> io::Result<()> {
52    writer.write_u32::<LittleEndian>(index.len() as u32)?;
53    for entry in index {
54        writer.write_u32::<LittleEndian>(entry.first_doc_id)?;
55        writer.write_u64::<LittleEndian>(entry.offset)?;
56        writer.write_u32::<LittleEndian>(entry.length)?;
57        writer.write_u32::<LittleEndian>(entry.num_docs)?;
58    }
59    writer.write_u64::<LittleEndian>(data_end_offset)?;
60    writer.write_u64::<LittleEndian>(dict_offset)?;
61    writer.write_u32::<LittleEndian>(num_docs)?;
62    writer.write_u32::<LittleEndian>(if has_dict { 1 } else { 0 })?;
63    writer.write_u32::<LittleEndian>(STORE_VERSION)?;
64    writer.write_u32::<LittleEndian>(STORE_MAGIC)?;
65    Ok(())
66}
67
68/// Binary document format:
69///   num_fields: u16
70///   per field: field_id: u16, type_tag: u8, value data
71///     0=Text:         len:u32 + utf8
72///     1=U64:          u64 LE
73///     2=I64:          i64 LE
74///     3=F64:          f64 LE
75///     4=Bytes:        len:u32 + raw
76///     5=SparseVector: count:u32 + count*(u32+f32)
77///     6=DenseVector:  count:u32 + count*f32
78///     7=Json:         len:u32 + json utf8
79pub fn serialize_document(doc: &Document, schema: &Schema) -> io::Result<Vec<u8>> {
80    let mut buf = Vec::with_capacity(256);
81    serialize_document_into(doc, schema, &mut buf)?;
82    Ok(buf)
83}
84
85/// Serialize a document into a reusable buffer (clears it first).
86/// Avoids per-document allocation when called in a loop.
87pub fn serialize_document_into(
88    doc: &Document,
89    schema: &Schema,
90    buf: &mut Vec<u8>,
91) -> io::Result<()> {
92    use crate::dsl::FieldValue;
93
94    buf.clear();
95
96    // Two-pass approach avoids allocating a Vec just to count + iterate stored fields.
97    let is_stored = |field: &crate::dsl::Field, value: &FieldValue| -> bool {
98        // Dense/binary vectors live in .vectors (LazyFlatVectorData), not in .store
99        if matches!(
100            value,
101            FieldValue::DenseVector(_) | FieldValue::BinaryDenseVector(_)
102        ) {
103            return false;
104        }
105        schema.get_field_entry(*field).is_some_and(|e| e.stored)
106    };
107
108    let stored_count = doc
109        .field_values()
110        .iter()
111        .filter(|(field, value)| is_stored(field, value))
112        .count();
113
114    buf.write_u16::<LittleEndian>(stored_count as u16)?;
115
116    for (field, value) in doc.field_values().iter().filter(|(f, v)| is_stored(f, v)) {
117        buf.write_u16::<LittleEndian>(field.0 as u16)?;
118        match value {
119            FieldValue::Text(s) => {
120                buf.push(0);
121                let bytes = s.as_bytes();
122                buf.write_u32::<LittleEndian>(bytes.len() as u32)?;
123                buf.extend_from_slice(bytes);
124            }
125            FieldValue::U64(v) => {
126                buf.push(1);
127                buf.write_u64::<LittleEndian>(*v)?;
128            }
129            FieldValue::I64(v) => {
130                buf.push(2);
131                buf.write_i64::<LittleEndian>(*v)?;
132            }
133            FieldValue::F64(v) => {
134                buf.push(3);
135                buf.write_f64::<LittleEndian>(*v)?;
136            }
137            FieldValue::Bytes(b) => {
138                buf.push(4);
139                buf.write_u32::<LittleEndian>(b.len() as u32)?;
140                buf.extend_from_slice(b);
141            }
142            FieldValue::SparseVector(entries) => {
143                buf.push(5);
144                buf.write_u32::<LittleEndian>(entries.len() as u32)?;
145                for (idx, val) in entries {
146                    buf.write_u32::<LittleEndian>(*idx)?;
147                    buf.write_f32::<LittleEndian>(*val)?;
148                }
149            }
150            FieldValue::DenseVector(values) => {
151                buf.push(6);
152                buf.write_u32::<LittleEndian>(values.len() as u32)?;
153                // Write raw f32 bytes directly
154                let byte_slice = unsafe {
155                    std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4)
156                };
157                buf.extend_from_slice(byte_slice);
158            }
159            FieldValue::Json(v) => {
160                buf.push(7);
161                let json_bytes = serde_json::to_vec(v)
162                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
163                buf.write_u32::<LittleEndian>(json_bytes.len() as u32)?;
164                buf.extend_from_slice(&json_bytes);
165            }
166            FieldValue::BinaryDenseVector(b) => {
167                buf.push(8);
168                buf.write_u32::<LittleEndian>(b.len() as u32)?;
169                buf.extend_from_slice(b);
170            }
171        }
172    }
173
174    Ok(())
175}
176
177/// Compressed block result
178#[cfg(feature = "native")]
179struct CompressedBlock {
180    seq: usize,
181    first_doc_id: DocId,
182    num_docs: u32,
183    compressed: Vec<u8>,
184}
185
186/// Parallel document store writer - compresses blocks immediately when queued
187///
188/// Spawns compression tasks as soon as blocks are ready, overlapping document
189/// ingestion with compression to reduce total indexing time.
190///
191/// Uses background threads to compress blocks while the main thread continues
192/// accepting documents.
193#[cfg(feature = "native")]
194pub struct EagerParallelStoreWriter<'a> {
195    writer: &'a mut dyn Write,
196    block_buffer: Vec<u8>,
197    /// Reusable buffer for document serialization (avoids per-doc allocation)
198    serialize_buf: Vec<u8>,
199    /// Compressed blocks ready to be written (may arrive out of order)
200    compressed_blocks: Vec<CompressedBlock>,
201    /// Handles for in-flight compression tasks
202    pending_handles: Vec<std::thread::JoinHandle<CompressedBlock>>,
203    next_seq: usize,
204    next_doc_id: DocId,
205    block_first_doc: DocId,
206    dict: Option<Arc<CompressionDict>>,
207    compression_level: CompressionLevel,
208}
209
210#[cfg(feature = "native")]
211impl<'a> EagerParallelStoreWriter<'a> {
212    /// Create a new eager parallel store writer
213    pub fn new(writer: &'a mut dyn Write, _num_threads: usize) -> Self {
214        Self::with_compression_level(writer, _num_threads, DEFAULT_COMPRESSION_LEVEL)
215    }
216
217    /// Create with specific compression level
218    pub fn with_compression_level(
219        writer: &'a mut dyn Write,
220        _num_threads: usize,
221        compression_level: CompressionLevel,
222    ) -> Self {
223        Self {
224            writer,
225            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
226            serialize_buf: Vec::with_capacity(512),
227            compressed_blocks: Vec::new(),
228            pending_handles: Vec::new(),
229            next_seq: 0,
230            next_doc_id: 0,
231            block_first_doc: 0,
232            dict: None,
233            compression_level,
234        }
235    }
236
237    /// Create with dictionary
238    pub fn with_dict(
239        writer: &'a mut dyn Write,
240        dict: CompressionDict,
241        _num_threads: usize,
242    ) -> Self {
243        Self::with_dict_and_level(writer, dict, _num_threads, DEFAULT_COMPRESSION_LEVEL)
244    }
245
246    /// Create with dictionary and specific compression level
247    pub fn with_dict_and_level(
248        writer: &'a mut dyn Write,
249        dict: CompressionDict,
250        _num_threads: usize,
251        compression_level: CompressionLevel,
252    ) -> Self {
253        Self {
254            writer,
255            block_buffer: Vec::with_capacity(STORE_BLOCK_SIZE),
256            serialize_buf: Vec::with_capacity(512),
257            compressed_blocks: Vec::new(),
258            pending_handles: Vec::new(),
259            next_seq: 0,
260            next_doc_id: 0,
261            block_first_doc: 0,
262            dict: Some(Arc::new(dict)),
263            compression_level,
264        }
265    }
266
267    pub fn store(&mut self, doc: &Document, schema: &Schema) -> io::Result<DocId> {
268        serialize_document_into(doc, schema, &mut self.serialize_buf)?;
269        let doc_id = self.next_doc_id;
270        self.next_doc_id += 1;
271        self.block_buffer
272            .write_u32::<LittleEndian>(self.serialize_buf.len() as u32)?;
273        self.block_buffer.extend_from_slice(&self.serialize_buf);
274        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
275            self.spawn_compression();
276        }
277        Ok(doc_id)
278    }
279
280    /// Store pre-serialized document bytes directly (avoids deserialize+reserialize roundtrip).
281    pub fn store_raw(&mut self, doc_bytes: &[u8]) -> io::Result<DocId> {
282        let doc_id = self.next_doc_id;
283        self.next_doc_id += 1;
284
285        self.block_buffer
286            .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
287        self.block_buffer.extend_from_slice(doc_bytes);
288
289        if self.block_buffer.len() >= STORE_BLOCK_SIZE {
290            self.spawn_compression();
291        }
292
293        Ok(doc_id)
294    }
295
296    /// Spawn compression for the current block immediately
297    fn spawn_compression(&mut self) {
298        if self.block_buffer.is_empty() {
299            return;
300        }
301
302        let num_docs = self.next_doc_id - self.block_first_doc;
303        let data = std::mem::replace(&mut self.block_buffer, Vec::with_capacity(STORE_BLOCK_SIZE));
304        let seq = self.next_seq;
305        let first_doc_id = self.block_first_doc;
306        let dict = self.dict.clone();
307
308        self.next_seq += 1;
309        self.block_first_doc = self.next_doc_id;
310
311        // Spawn compression task using thread
312        let level = self.compression_level;
313        let handle = std::thread::spawn(move || {
314            let compressed = if let Some(ref d) = dict {
315                crate::compression::compress_with_dict(&data, level, d).expect("compression failed")
316            } else {
317                crate::compression::compress(&data, level).expect("compression failed")
318            };
319
320            CompressedBlock {
321                seq,
322                first_doc_id,
323                num_docs,
324                compressed,
325            }
326        });
327
328        self.pending_handles.push(handle);
329    }
330
331    /// Collect any completed compression tasks
332    fn collect_completed(&mut self) {
333        let mut remaining = Vec::new();
334        for handle in self.pending_handles.drain(..) {
335            if handle.is_finished() {
336                match handle.join() {
337                    Ok(block) => self.compressed_blocks.push(block),
338                    Err(payload) => std::panic::resume_unwind(payload),
339                }
340            } else {
341                remaining.push(handle);
342            }
343        }
344        self.pending_handles = remaining;
345    }
346
347    pub fn finish(mut self) -> io::Result<u32> {
348        // Spawn compression for any remaining data
349        self.spawn_compression();
350
351        // Collect any already-completed tasks
352        self.collect_completed();
353
354        // Wait for all remaining compression tasks
355        for handle in self.pending_handles.drain(..) {
356            match handle.join() {
357                Ok(block) => self.compressed_blocks.push(block),
358                Err(payload) => std::panic::resume_unwind(payload),
359            }
360        }
361
362        if self.compressed_blocks.is_empty() {
363            write_store_index_and_footer(&mut self.writer, &[], 0, 0, 0, false)?;
364            return Ok(0);
365        }
366
367        // Sort by sequence to maintain order
368        self.compressed_blocks.sort_by_key(|b| b.seq);
369
370        // Write blocks in order and build index
371        let mut index = Vec::with_capacity(self.compressed_blocks.len());
372        let mut current_offset = 0u64;
373
374        for block in &self.compressed_blocks {
375            index.push(StoreBlockIndex {
376                first_doc_id: block.first_doc_id,
377                offset: current_offset,
378                length: block.compressed.len() as u32,
379                num_docs: block.num_docs,
380            });
381
382            self.writer.write_all(&block.compressed)?;
383            current_offset += block.compressed.len() as u64;
384        }
385
386        let data_end_offset = current_offset;
387
388        // Write dictionary if present
389        let dict_offset = if let Some(ref dict) = self.dict {
390            let offset = current_offset;
391            let dict_bytes = dict.as_bytes();
392            self.writer
393                .write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
394            self.writer.write_all(dict_bytes)?;
395            Some(offset)
396        } else {
397            None
398        };
399
400        // Write index + footer
401        write_store_index_and_footer(
402            &mut self.writer,
403            &index,
404            data_end_offset,
405            dict_offset.unwrap_or(0),
406            self.next_doc_id,
407            self.dict.is_some(),
408        )?;
409
410        Ok(self.next_doc_id)
411    }
412}
413
414/// Block index entry for document store
415#[derive(Debug, Clone)]
416pub(crate) struct StoreBlockIndex {
417    pub(crate) first_doc_id: DocId,
418    pub(crate) offset: u64,
419    pub(crate) length: u32,
420    pub(crate) num_docs: u32,
421}
422
423/// Async document store reader - loads blocks on demand
424pub struct AsyncStoreReader {
425    /// FileHandle for the data portion - fetches ranges on demand
426    data_slice: FileHandle,
427    /// Block index
428    index: Vec<StoreBlockIndex>,
429    num_docs: u32,
430    /// Optional compression dictionary
431    dict: Option<CompressionDict>,
432    /// Block cache
433    cache: RwLock<StoreBlockCache>,
434}
435
436/// Decompressed block with pre-built doc offset table.
437///
438/// The offset table is built once on decompression: `offsets[i]` is the byte
439/// position in `data` where doc `i`'s length prefix starts. This turns the
440/// O(n) linear scan per `get()` into O(1) direct indexing.
441struct CachedBlock {
442    data: Vec<u8>,
443    /// Byte offset of each doc's length prefix within `data`.
444    /// `offsets.len()` == number of docs in the block.
445    offsets: Vec<u32>,
446}
447
448impl CachedBlock {
449    fn build(data: Vec<u8>, num_docs: u32) -> io::Result<Self> {
450        let mut offsets = Vec::with_capacity(num_docs as usize);
451        let mut pos = 0usize;
452        for _ in 0..num_docs {
453            if pos + 4 > data.len() {
454                return Err(io::Error::new(
455                    io::ErrorKind::InvalidData,
456                    "truncated block while building offset table",
457                ));
458            }
459            offsets.push(pos as u32);
460            let doc_len =
461                u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]])
462                    as usize;
463            pos += 4 + doc_len;
464        }
465        Ok(Self { data, offsets })
466    }
467
468    /// Get doc bytes by index within the block (O(1))
469    fn doc_bytes(&self, doc_offset_in_block: u32) -> io::Result<&[u8]> {
470        let idx = doc_offset_in_block as usize;
471        if idx >= self.offsets.len() {
472            return Err(io::Error::new(
473                io::ErrorKind::InvalidData,
474                "doc offset out of range",
475            ));
476        }
477        let start = self.offsets[idx] as usize;
478        if start + 4 > self.data.len() {
479            return Err(io::Error::new(
480                io::ErrorKind::InvalidData,
481                "truncated doc length",
482            ));
483        }
484        let doc_len = u32::from_le_bytes([
485            self.data[start],
486            self.data[start + 1],
487            self.data[start + 2],
488            self.data[start + 3],
489        ]) as usize;
490        let data_start = start + 4;
491        if data_start + doc_len > self.data.len() {
492            return Err(io::Error::new(
493                io::ErrorKind::InvalidData,
494                "doc data overflow",
495            ));
496        }
497        Ok(&self.data[data_start..data_start + doc_len])
498    }
499}
500
501/// LRU block cache — O(1) lookup/insert, amortized O(n) promotion.
502///
503/// On `get()`, promotes accessed entry to MRU position.
504/// For typical cache sizes (16-64 blocks), the linear promote scan is negligible.
505struct StoreBlockCache {
506    blocks: FxHashMap<DocId, Arc<CachedBlock>>,
507    lru_order: std::collections::VecDeque<DocId>,
508    max_blocks: usize,
509}
510
511impl StoreBlockCache {
512    fn new(max_blocks: usize) -> Self {
513        Self {
514            blocks: FxHashMap::default(),
515            lru_order: std::collections::VecDeque::with_capacity(max_blocks),
516            max_blocks,
517        }
518    }
519
520    /// Check cache without LRU promotion (safe for read-lock fast path)
521    fn peek(&self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
522        self.blocks.get(&first_doc_id).map(Arc::clone)
523    }
524
525    fn get(&mut self, first_doc_id: DocId) -> Option<Arc<CachedBlock>> {
526        let block = self.blocks.get(&first_doc_id).map(Arc::clone)?;
527        self.promote(first_doc_id);
528        Some(block)
529    }
530
531    fn insert(&mut self, first_doc_id: DocId, block: Arc<CachedBlock>) {
532        if self.blocks.contains_key(&first_doc_id) {
533            self.promote(first_doc_id);
534            return;
535        }
536        while self.blocks.len() >= self.max_blocks {
537            if let Some(evict) = self.lru_order.pop_front() {
538                self.blocks.remove(&evict);
539            } else {
540                break;
541            }
542        }
543        self.blocks.insert(first_doc_id, block);
544        self.lru_order.push_back(first_doc_id);
545    }
546
547    fn promote(&mut self, key: DocId) {
548        if let Some(pos) = self.lru_order.iter().position(|&k| k == key) {
549            self.lru_order.remove(pos);
550            self.lru_order.push_back(key);
551        }
552    }
553}
554
555impl AsyncStoreReader {
556    /// Open a document store from FileHandle
557    /// Only loads footer and index into memory, data blocks are fetched on-demand
558    pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
559        let file_len = file_handle.len();
560        // Footer: data_end(8) + dict_offset(8) + num_docs(4) + has_dict(4) + version(4) + magic(4) = 32 bytes
561        if file_len < 32 {
562            return Err(io::Error::new(
563                io::ErrorKind::InvalidData,
564                "Store too small",
565            ));
566        }
567
568        // Read footer (32 bytes)
569        let footer = file_handle
570            .read_bytes_range(file_len - 32..file_len)
571            .await?;
572        let mut reader = footer.as_slice();
573        let data_end_offset = reader.read_u64::<LittleEndian>()?;
574        let dict_offset = reader.read_u64::<LittleEndian>()?;
575        let num_docs = reader.read_u32::<LittleEndian>()?;
576        let has_dict = reader.read_u32::<LittleEndian>()? != 0;
577        let version = reader.read_u32::<LittleEndian>()?;
578        let magic = reader.read_u32::<LittleEndian>()?;
579
580        if magic != STORE_MAGIC {
581            return Err(io::Error::new(
582                io::ErrorKind::InvalidData,
583                "Invalid store magic",
584            ));
585        }
586        if version != STORE_VERSION {
587            return Err(io::Error::new(
588                io::ErrorKind::InvalidData,
589                format!("Unsupported store version: {}", version),
590            ));
591        }
592
593        // Load dictionary if present, and compute index_start in one pass
594        let (dict, index_start) = if has_dict && dict_offset > 0 {
595            let dict_start = dict_offset;
596            let dict_len_bytes = file_handle
597                .read_bytes_range(dict_start..dict_start + 4)
598                .await?;
599            let dict_len = (&dict_len_bytes[..]).read_u32::<LittleEndian>()? as u64;
600            let dict_bytes = file_handle
601                .read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
602                .await?;
603            let idx_start = dict_start + 4 + dict_len;
604            (
605                Some(CompressionDict::from_owned_bytes(dict_bytes)),
606                idx_start,
607            )
608        } else {
609            (None, data_end_offset)
610        };
611        let index_end = file_len - 32;
612
613        let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
614        let mut reader = index_bytes.as_slice();
615
616        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
617        let mut index = Vec::with_capacity(num_blocks);
618
619        for _ in 0..num_blocks {
620            let first_doc_id = reader.read_u32::<LittleEndian>()?;
621            let offset = reader.read_u64::<LittleEndian>()?;
622            let length = reader.read_u32::<LittleEndian>()?;
623            let num_docs_in_block = reader.read_u32::<LittleEndian>()?;
624
625            index.push(StoreBlockIndex {
626                first_doc_id,
627                offset,
628                length,
629                num_docs: num_docs_in_block,
630            });
631        }
632
633        // Create lazy slice for data portion only
634        let data_slice = file_handle.slice(0..data_end_offset);
635
636        Ok(Self {
637            data_slice,
638            index,
639            num_docs,
640            dict,
641            cache: RwLock::new(StoreBlockCache::new(cache_blocks)),
642        })
643    }
644
645    /// Number of documents
646    pub fn num_docs(&self) -> u32 {
647        self.num_docs
648    }
649
650    /// Number of blocks currently in the cache
651    pub fn cached_blocks(&self) -> usize {
652        self.cache.read().blocks.len()
653    }
654
655    /// Get a document by doc_id (async - may load block)
656    pub async fn get(&self, doc_id: DocId, schema: &Schema) -> io::Result<Option<Document>> {
657        if doc_id >= self.num_docs {
658            return Ok(None);
659        }
660
661        let (entry, block) = self.find_and_load_block(doc_id).await?;
662        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
663        deserialize_document(doc_bytes, schema).map(Some)
664    }
665
666    /// Get specific fields of a document by doc_id (async - may load block)
667    ///
668    /// Only deserializes the requested fields, skipping over unwanted data.
669    /// Much faster than `get()` when documents have large fields (text bodies,
670    /// vectors) that aren't needed for the response.
671    pub async fn get_fields(
672        &self,
673        doc_id: DocId,
674        schema: &Schema,
675        field_ids: &[u32],
676    ) -> io::Result<Option<Document>> {
677        if doc_id >= self.num_docs {
678            return Ok(None);
679        }
680
681        let (entry, block) = self.find_and_load_block(doc_id).await?;
682        let doc_bytes = block.doc_bytes(doc_id - entry.first_doc_id)?;
683        deserialize_document_fields(doc_bytes, schema, field_ids).map(Some)
684    }
685
686    /// Find the block index entry and load/cache the block for a given doc_id
687    async fn find_and_load_block(
688        &self,
689        doc_id: DocId,
690    ) -> io::Result<(&StoreBlockIndex, Arc<CachedBlock>)> {
691        let block_idx = self
692            .index
693            .binary_search_by(|entry| {
694                if doc_id < entry.first_doc_id {
695                    std::cmp::Ordering::Greater
696                } else if doc_id >= entry.first_doc_id + entry.num_docs {
697                    std::cmp::Ordering::Less
698                } else {
699                    std::cmp::Ordering::Equal
700                }
701            })
702            .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Doc not found in index"))?;
703
704        let entry = &self.index[block_idx];
705        let block = self.load_block(entry).await?;
706        Ok((entry, block))
707    }
708
709    async fn load_block(&self, entry: &StoreBlockIndex) -> io::Result<Arc<CachedBlock>> {
710        // Fast path: read lock to check cache (allows concurrent readers)
711        {
712            let cache = self.cache.read();
713            if let Some(block) = cache.peek(entry.first_doc_id) {
714                return Ok(block);
715            }
716        }
717        // Slow path: write lock for LRU promotion or insert
718        {
719            if let Some(block) = self.cache.write().get(entry.first_doc_id) {
720                return Ok(block);
721            }
722        }
723
724        // Load from FileSlice
725        let start = entry.offset;
726        let end = start + entry.length as u64;
727        let compressed = self.data_slice.read_bytes_range(start..end).await?;
728
729        // Use dictionary decompression if available
730        let decompressed = if let Some(ref dict) = self.dict {
731            crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
732        } else {
733            crate::compression::decompress(compressed.as_slice())?
734        };
735
736        // Build offset table for O(1) doc lookup within the block
737        let cached = CachedBlock::build(decompressed, entry.num_docs)?;
738        let block = Arc::new(cached);
739
740        // Insert into cache
741        {
742            let mut cache = self.cache.write();
743            cache.insert(entry.first_doc_id, Arc::clone(&block));
744        }
745
746        Ok(block)
747    }
748}
749
750/// Deserialize only specific fields from document bytes.
751///
752/// Skips over unwanted fields without allocating their values — just advances
753/// the reader past their length-prefixed data. For large documents with many
754/// fields (e.g., full text body), this avoids allocating/copying data that
755/// the caller doesn't need.
756pub fn deserialize_document_fields(
757    data: &[u8],
758    schema: &Schema,
759    field_ids: &[u32],
760) -> io::Result<Document> {
761    deserialize_document_inner(data, schema, Some(field_ids))
762}
763
764/// Deserialize all fields from document bytes.
765///
766/// Delegates to the shared field-parsing core with no field filter.
767pub fn deserialize_document(data: &[u8], schema: &Schema) -> io::Result<Document> {
768    deserialize_document_inner(data, schema, None)
769}
770
771/// Shared deserialization core. `field_filter = None` means all fields wanted.
772fn deserialize_document_inner(
773    data: &[u8],
774    _schema: &Schema,
775    field_filter: Option<&[u32]>,
776) -> io::Result<Document> {
777    use crate::dsl::Field;
778
779    let mut reader = data;
780    let num_fields = reader.read_u16::<LittleEndian>()? as usize;
781    let mut doc = Document::new();
782
783    for _ in 0..num_fields {
784        let field_id = reader.read_u16::<LittleEndian>()?;
785        let type_tag = reader.read_u8()?;
786
787        let wanted = field_filter.is_none_or(|ids| ids.contains(&(field_id as u32)));
788
789        match type_tag {
790            0 => {
791                // Text
792                let len = reader.read_u32::<LittleEndian>()? as usize;
793                if wanted {
794                    let s = std::str::from_utf8(&reader[..len])
795                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
796                    doc.add_text(Field(field_id as u32), s);
797                }
798                reader = &reader[len..];
799            }
800            1 => {
801                // U64
802                let v = reader.read_u64::<LittleEndian>()?;
803                if wanted {
804                    doc.add_u64(Field(field_id as u32), v);
805                }
806            }
807            2 => {
808                // I64
809                let v = reader.read_i64::<LittleEndian>()?;
810                if wanted {
811                    doc.add_i64(Field(field_id as u32), v);
812                }
813            }
814            3 => {
815                // F64
816                let v = reader.read_f64::<LittleEndian>()?;
817                if wanted {
818                    doc.add_f64(Field(field_id as u32), v);
819                }
820            }
821            4 => {
822                // Bytes
823                let len = reader.read_u32::<LittleEndian>()? as usize;
824                if wanted {
825                    doc.add_bytes(Field(field_id as u32), reader[..len].to_vec());
826                }
827                reader = &reader[len..];
828            }
829            5 => {
830                // SparseVector
831                let count = reader.read_u32::<LittleEndian>()? as usize;
832                if wanted {
833                    let mut entries = Vec::with_capacity(count);
834                    for _ in 0..count {
835                        let idx = reader.read_u32::<LittleEndian>()?;
836                        let val = reader.read_f32::<LittleEndian>()?;
837                        entries.push((idx, val));
838                    }
839                    doc.add_sparse_vector(Field(field_id as u32), entries);
840                } else {
841                    let skip = count * 8;
842                    if skip > reader.len() {
843                        return Err(io::Error::new(
844                            io::ErrorKind::UnexpectedEof,
845                            "sparse vector skip overflow",
846                        ));
847                    }
848                    reader = &reader[skip..];
849                }
850            }
851            6 => {
852                // DenseVector
853                let count = reader.read_u32::<LittleEndian>()? as usize;
854                let byte_len = count * 4;
855                if byte_len > reader.len() {
856                    return Err(io::Error::new(
857                        io::ErrorKind::UnexpectedEof,
858                        "dense vector truncated",
859                    ));
860                }
861                if wanted {
862                    let mut values = vec![0.0f32; count];
863                    unsafe {
864                        std::ptr::copy_nonoverlapping(
865                            reader.as_ptr(),
866                            values.as_mut_ptr() as *mut u8,
867                            byte_len,
868                        );
869                    }
870                    doc.add_dense_vector(Field(field_id as u32), values);
871                }
872                reader = &reader[byte_len..];
873            }
874            7 => {
875                // Json
876                let len = reader.read_u32::<LittleEndian>()? as usize;
877                if len > reader.len() {
878                    return Err(io::Error::new(
879                        io::ErrorKind::UnexpectedEof,
880                        "json field truncated",
881                    ));
882                }
883                if wanted {
884                    let v: serde_json::Value = serde_json::from_slice(&reader[..len])
885                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
886                    doc.add_json(Field(field_id as u32), v);
887                }
888                reader = &reader[len..];
889            }
890            8 => {
891                // BinaryDenseVector
892                let len = reader.read_u32::<LittleEndian>()? as usize;
893                if len > reader.len() {
894                    return Err(io::Error::new(
895                        io::ErrorKind::UnexpectedEof,
896                        "binary dense vector truncated",
897                    ));
898                }
899                if wanted {
900                    doc.add_binary_dense_vector(Field(field_id as u32), reader[..len].to_vec());
901                }
902                reader = &reader[len..];
903            }
904            _ => {
905                return Err(io::Error::new(
906                    io::ErrorKind::InvalidData,
907                    format!("Unknown field type tag: {}", type_tag),
908                ));
909            }
910        }
911    }
912
913    Ok(doc)
914}
915
916/// Raw block info for store merging (without decompression)
917#[derive(Debug, Clone)]
918pub struct RawStoreBlock {
919    pub first_doc_id: DocId,
920    pub num_docs: u32,
921    pub offset: u64,
922    pub length: u32,
923}
924
925/// Store merger - concatenates compressed blocks from multiple stores without recompression
926///
927/// This is much faster than rebuilding stores since it avoids:
928/// - Decompressing blocks from source stores
929/// - Re-serializing documents
930/// - Re-compressing blocks at level 22
931///
932/// Limitations:
933/// - All source stores must NOT use dictionaries (or use the same dictionary)
934/// - Doc IDs are remapped sequentially
935pub struct StoreMerger<'a, W: Write> {
936    writer: &'a mut W,
937    index: Vec<StoreBlockIndex>,
938    current_offset: u64,
939    next_doc_id: DocId,
940}
941
942impl<'a, W: Write> StoreMerger<'a, W> {
943    pub fn new(writer: &'a mut W) -> Self {
944        Self {
945            writer,
946            index: Vec::new(),
947            current_offset: 0,
948            next_doc_id: 0,
949        }
950    }
951
952    /// Append raw compressed blocks from a store file
953    ///
954    /// `data_slice` should be the data portion of the store (before index/footer)
955    /// `blocks` contains the block metadata from the source store
956    pub async fn append_store(
957        &mut self,
958        data_slice: &FileHandle,
959        blocks: &[RawStoreBlock],
960    ) -> io::Result<()> {
961        for block in blocks {
962            // Read raw compressed block data
963            let start = block.offset;
964            let end = start + block.length as u64;
965            let compressed_data = data_slice.read_bytes_range(start..end).await?;
966
967            // Write to output
968            self.writer.write_all(compressed_data.as_slice())?;
969
970            // Add to index with remapped doc IDs
971            self.index.push(StoreBlockIndex {
972                first_doc_id: self.next_doc_id,
973                offset: self.current_offset,
974                length: block.length,
975                num_docs: block.num_docs,
976            });
977
978            self.current_offset += block.length as u64;
979            self.next_doc_id += block.num_docs;
980        }
981
982        Ok(())
983    }
984
985    /// Append blocks from a dict-compressed store by decompressing and recompressing.
986    ///
987    /// For stores that use dictionary compression, raw blocks can't be stacked
988    /// directly because the decompressor needs the original dictionary.
989    /// This method decompresses each block with the source dict, then
990    /// recompresses without a dictionary so the merged output is self-contained.
991    pub async fn append_store_recompressing(&mut self, store: &AsyncStoreReader) -> io::Result<()> {
992        let dict = store.dict();
993        let data_slice = store.data_slice();
994        let blocks = store.block_index();
995
996        for block in blocks {
997            let start = block.offset;
998            let end = start + block.length as u64;
999            let compressed = data_slice.read_bytes_range(start..end).await?;
1000
1001            // Decompress with source dict (or without if no dict)
1002            let decompressed = if let Some(d) = dict {
1003                crate::compression::decompress_with_dict(compressed.as_slice(), d)?
1004            } else {
1005                crate::compression::decompress(compressed.as_slice())?
1006            };
1007
1008            // Recompress without dictionary
1009            let recompressed = crate::compression::compress(
1010                &decompressed,
1011                crate::compression::CompressionLevel::default(),
1012            )?;
1013
1014            self.writer.write_all(&recompressed)?;
1015
1016            self.index.push(StoreBlockIndex {
1017                first_doc_id: self.next_doc_id,
1018                offset: self.current_offset,
1019                length: recompressed.len() as u32,
1020                num_docs: block.num_docs,
1021            });
1022
1023            self.current_offset += recompressed.len() as u64;
1024            self.next_doc_id += block.num_docs;
1025        }
1026
1027        Ok(())
1028    }
1029
1030    /// Finish writing the merged store
1031    pub fn finish(self) -> io::Result<u32> {
1032        let data_end_offset = self.current_offset;
1033
1034        // No dictionary support for merged stores (would need same dict across all sources)
1035        let dict_offset = 0u64;
1036
1037        // Write index + footer
1038        write_store_index_and_footer(
1039            self.writer,
1040            &self.index,
1041            data_end_offset,
1042            dict_offset,
1043            self.next_doc_id,
1044            false,
1045        )?;
1046
1047        Ok(self.next_doc_id)
1048    }
1049}
1050
1051impl AsyncStoreReader {
1052    /// Get raw block metadata for merging (without loading block data)
1053    pub fn raw_blocks(&self) -> Vec<RawStoreBlock> {
1054        self.index
1055            .iter()
1056            .map(|entry| RawStoreBlock {
1057                first_doc_id: entry.first_doc_id,
1058                num_docs: entry.num_docs,
1059                offset: entry.offset,
1060                length: entry.length,
1061            })
1062            .collect()
1063    }
1064
1065    /// Get the data slice for raw block access
1066    pub fn data_slice(&self) -> &FileHandle {
1067        &self.data_slice
1068    }
1069
1070    /// Check if this store uses a dictionary (incompatible with raw merging)
1071    pub fn has_dict(&self) -> bool {
1072        self.dict.is_some()
1073    }
1074
1075    /// Get the decompression dictionary (if any)
1076    pub fn dict(&self) -> Option<&CompressionDict> {
1077        self.dict.as_ref()
1078    }
1079
1080    /// Get block index for iteration
1081    pub(crate) fn block_index(&self) -> &[StoreBlockIndex] {
1082        &self.index
1083    }
1084}