hermes_core/segment/
reader.rs

1//! Async segment reader with lazy loading
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use crate::directories::{AsyncFileRead, Directory, LazyFileHandle, LazyFileSlice};
8use crate::dsl::{Document, Field, Schema};
9use crate::structures::{
10    AsyncSSTableReader, BlockPostingList, BlockSparsePostingList, CoarseCentroids, IVFPQIndex,
11    IVFRaBitQIndex, PQCodebook, RaBitQCodebook, RaBitQIndex, SSTableStats, TermInfo,
12};
13use crate::{DocId, Error, Result};
14
15use super::store::{AsyncStoreReader, RawStoreBlock};
16use super::types::{SegmentFiles, SegmentId, SegmentMeta};
17
18/// Vector index type - RaBitQ, IVF-RaBitQ, or ScaNN (IVF-PQ)
19#[derive(Clone)]
20#[allow(clippy::upper_case_acronyms)]
21pub enum VectorIndex {
22    /// RaBitQ - binary quantization, good for small datasets
23    RaBitQ(Arc<RaBitQIndex>),
24    /// IVF-RaBitQ - inverted file with RaBitQ, good for medium datasets
25    IVF {
26        index: Arc<IVFRaBitQIndex>,
27        codebook: Arc<RaBitQCodebook>,
28    },
29    /// ScaNN (IVF-PQ) - product quantization with OPQ, best for large datasets
30    ScaNN {
31        index: Arc<IVFPQIndex>,
32        codebook: Arc<PQCodebook>,
33    },
34}
35
36/// Sparse vector index for a field: direct-indexed by dimension ID
37#[derive(Clone)]
38pub struct SparseIndex {
39    /// Posting lists indexed directly by dimension ID (O(1) lookup)
40    /// None means dimension not present in index
41    pub postings: Vec<Option<Arc<BlockSparsePostingList>>>,
42    /// Total document count in this segment (for IDF computation)
43    pub total_docs: u32,
44}
45
46impl SparseIndex {
47    /// Compute IDF (inverse document frequency) for a dimension
48    ///
49    /// IDF = log(N / df) where N = total docs, df = docs containing dimension
50    /// Returns 0.0 if dimension not present
51    #[inline]
52    pub fn idf(&self, dim_id: u32) -> f32 {
53        if let Some(Some(pl)) = self.postings.get(dim_id as usize) {
54            let df = pl.doc_count() as f32;
55            if df > 0.0 {
56                (self.total_docs as f32 / df).ln()
57            } else {
58                0.0
59            }
60        } else {
61            0.0
62        }
63    }
64
65    /// Get IDF weights for multiple dimensions
66    pub fn idf_weights(&self, dim_ids: &[u32]) -> Vec<f32> {
67        dim_ids.iter().map(|&d| self.idf(d)).collect()
68    }
69}
70
71/// Async segment reader with lazy loading
72///
73/// - Term dictionary: only index loaded, blocks loaded on-demand
74/// - Postings: loaded on-demand per term via HTTP range requests
75/// - Document store: only index loaded, blocks loaded on-demand via HTTP range requests
76pub struct AsyncSegmentReader {
77    meta: SegmentMeta,
78    /// Term dictionary with lazy block loading
79    term_dict: Arc<AsyncSSTableReader<TermInfo>>,
80    /// Postings file handle - fetches ranges on demand
81    postings_handle: LazyFileHandle,
82    /// Document store with lazy block loading
83    store: Arc<AsyncStoreReader>,
84    schema: Arc<Schema>,
85    /// Base doc_id offset for this segment
86    doc_id_offset: DocId,
87    /// Dense vector indexes per field (RaBitQ or IVF-RaBitQ)
88    vector_indexes: FxHashMap<u32, VectorIndex>,
89    /// Shared coarse centroids for IVF search (loaded once)
90    coarse_centroids: Option<Arc<CoarseCentroids>>,
91    /// Sparse vector indexes per field
92    sparse_indexes: FxHashMap<u32, SparseIndex>,
93}
94
95impl AsyncSegmentReader {
96    /// Open a segment with lazy loading
97    pub async fn open<D: Directory>(
98        dir: &D,
99        segment_id: SegmentId,
100        schema: Arc<Schema>,
101        doc_id_offset: DocId,
102        cache_blocks: usize,
103    ) -> Result<Self> {
104        let files = SegmentFiles::new(segment_id.0);
105
106        // Read metadata (small, always loaded)
107        let meta_slice = dir.open_read(&files.meta).await?;
108        let meta_bytes = meta_slice.read_bytes().await?;
109        let meta = SegmentMeta::deserialize(meta_bytes.as_slice())?;
110        debug_assert_eq!(meta.id, segment_id.0);
111
112        // Open term dictionary with lazy loading (fetches ranges on demand)
113        let term_dict_handle = dir.open_lazy(&files.term_dict).await?;
114        let term_dict = AsyncSSTableReader::open(term_dict_handle, cache_blocks).await?;
115
116        // Get postings file handle (lazy - fetches ranges on demand)
117        let postings_handle = dir.open_lazy(&files.postings).await?;
118
119        // Open store with lazy loading
120        let store_handle = dir.open_lazy(&files.store).await?;
121        let store = AsyncStoreReader::open(store_handle, cache_blocks).await?;
122
123        // Load dense vector indexes from unified .vectors file
124        let (vector_indexes, coarse_centroids) =
125            Self::load_vectors_file(dir, &files, &schema).await?;
126
127        // Load sparse vector indexes from .sparse file
128        let sparse_indexes = Self::load_sparse_file(dir, &files, meta.num_docs).await?;
129
130        Ok(Self {
131            meta,
132            term_dict: Arc::new(term_dict),
133            postings_handle,
134            store: Arc::new(store),
135            schema,
136            doc_id_offset,
137            vector_indexes,
138            coarse_centroids,
139            sparse_indexes,
140        })
141    }
142
143    pub fn meta(&self) -> &SegmentMeta {
144        &self.meta
145    }
146
147    pub fn num_docs(&self) -> u32 {
148        self.meta.num_docs
149    }
150
151    /// Get average field length for BM25F scoring
152    pub fn avg_field_len(&self, field: Field) -> f32 {
153        self.meta.avg_field_len(field)
154    }
155
156    pub fn doc_id_offset(&self) -> DocId {
157        self.doc_id_offset
158    }
159
160    pub fn schema(&self) -> &Schema {
161        &self.schema
162    }
163
164    /// Get sparse indexes for all fields
165    pub fn sparse_indexes(&self) -> &FxHashMap<u32, SparseIndex> {
166        &self.sparse_indexes
167    }
168
169    /// Get term dictionary stats for debugging
170    pub fn term_dict_stats(&self) -> SSTableStats {
171        self.term_dict.stats()
172    }
173
174    /// Get posting list for a term (async - loads on demand)
175    ///
176    /// For small posting lists (1-3 docs), the data is inlined in the term dictionary
177    /// and no additional I/O is needed. For larger lists, reads from .post file.
178    pub async fn get_postings(
179        &self,
180        field: Field,
181        term: &[u8],
182    ) -> Result<Option<BlockPostingList>> {
183        log::debug!(
184            "SegmentReader::get_postings field={} term_len={}",
185            field.0,
186            term.len()
187        );
188
189        // Build key: field_id + term
190        let mut key = Vec::with_capacity(4 + term.len());
191        key.extend_from_slice(&field.0.to_le_bytes());
192        key.extend_from_slice(term);
193
194        // Look up in term dictionary
195        let term_info = match self.term_dict.get(&key).await? {
196            Some(info) => {
197                log::debug!("SegmentReader::get_postings found term_info");
198                info
199            }
200            None => {
201                log::debug!("SegmentReader::get_postings term not found");
202                return Ok(None);
203            }
204        };
205
206        // Check if posting list is inlined
207        if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
208            // Build BlockPostingList from inline data (no I/O needed!)
209            let mut posting_list = crate::structures::PostingList::with_capacity(doc_ids.len());
210            for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
211                posting_list.push(doc_id, tf);
212            }
213            let block_list = BlockPostingList::from_posting_list(&posting_list)?;
214            return Ok(Some(block_list));
215        }
216
217        // External posting list - read from postings file handle (lazy - HTTP range request)
218        let (posting_offset, posting_len) = term_info.external_info().ok_or_else(|| {
219            Error::Corruption("TermInfo has neither inline nor external data".to_string())
220        })?;
221
222        let start = posting_offset;
223        let end = start + posting_len as u64;
224
225        if end > self.postings_handle.len() {
226            return Err(Error::Corruption(
227                "Posting offset out of bounds".to_string(),
228            ));
229        }
230
231        let posting_bytes = self.postings_handle.read_bytes_range(start..end).await?;
232        let block_list = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
233
234        Ok(Some(block_list))
235    }
236
237    /// Get document by local doc_id (async - loads on demand)
238    pub async fn doc(&self, local_doc_id: DocId) -> Result<Option<Document>> {
239        self.store
240            .get(local_doc_id, &self.schema)
241            .await
242            .map_err(Error::from)
243    }
244
245    /// Prefetch term dictionary blocks for a key range
246    pub async fn prefetch_terms(
247        &self,
248        field: Field,
249        start_term: &[u8],
250        end_term: &[u8],
251    ) -> Result<()> {
252        let mut start_key = Vec::with_capacity(4 + start_term.len());
253        start_key.extend_from_slice(&field.0.to_le_bytes());
254        start_key.extend_from_slice(start_term);
255
256        let mut end_key = Vec::with_capacity(4 + end_term.len());
257        end_key.extend_from_slice(&field.0.to_le_bytes());
258        end_key.extend_from_slice(end_term);
259
260        self.term_dict.prefetch_range(&start_key, &end_key).await?;
261        Ok(())
262    }
263
264    /// Check if store uses dictionary compression (incompatible with raw merging)
265    pub fn store_has_dict(&self) -> bool {
266        self.store.has_dict()
267    }
268
269    /// Get raw store blocks for optimized merging
270    pub fn store_raw_blocks(&self) -> Vec<RawStoreBlock> {
271        self.store.raw_blocks()
272    }
273
274    /// Get store data slice for raw block access
275    pub fn store_data_slice(&self) -> &LazyFileSlice {
276        self.store.data_slice()
277    }
278
279    /// Get all terms from this segment (for merge)
280    pub async fn all_terms(&self) -> Result<Vec<(Vec<u8>, TermInfo)>> {
281        self.term_dict.all_entries().await.map_err(Error::from)
282    }
283
284    /// Get all terms with parsed field and term string (for statistics aggregation)
285    ///
286    /// Returns (field, term_string, doc_freq) for each term in the dictionary.
287    /// Skips terms that aren't valid UTF-8.
288    pub async fn all_terms_with_stats(&self) -> Result<Vec<(Field, String, u32)>> {
289        let entries = self.term_dict.all_entries().await?;
290        let mut result = Vec::with_capacity(entries.len());
291
292        for (key, term_info) in entries {
293            // Key format: field_id (4 bytes little-endian) + term bytes
294            if key.len() > 4 {
295                let field_id = u32::from_le_bytes([key[0], key[1], key[2], key[3]]);
296                let term_bytes = &key[4..];
297                if let Ok(term_str) = std::str::from_utf8(term_bytes) {
298                    result.push((Field(field_id), term_str.to_string(), term_info.doc_freq()));
299                }
300            }
301        }
302
303        Ok(result)
304    }
305
306    /// Get streaming iterator over term dictionary (for memory-efficient merge)
307    pub fn term_dict_iter(&self) -> crate::structures::AsyncSSTableIterator<'_, TermInfo> {
308        self.term_dict.iter()
309    }
310
311    /// Read raw posting bytes at offset
312    pub async fn read_postings(&self, offset: u64, len: u32) -> Result<Vec<u8>> {
313        let start = offset;
314        let end = start + len as u64;
315        let bytes = self.postings_handle.read_bytes_range(start..end).await?;
316        Ok(bytes.to_vec())
317    }
318
319    /// Search dense vectors using RaBitQ
320    ///
321    /// Returns (doc_id, distance) pairs sorted by distance (ascending).
322    /// The doc_ids are adjusted by doc_id_offset for this segment.
323    /// If mrl_dim is configured, the query vector is automatically trimmed.
324    pub fn search_dense_vector(
325        &self,
326        field: Field,
327        query: &[f32],
328        k: usize,
329        rerank_factor: usize,
330    ) -> Result<Vec<(DocId, f32)>> {
331        let index = self
332            .vector_indexes
333            .get(&field.0)
334            .ok_or_else(|| Error::Schema(format!("No dense vector index for field {}", field.0)))?;
335
336        // Get mrl_dim from config to trim query vector if needed
337        let mrl_dim = self
338            .schema
339            .get_field_entry(field)
340            .and_then(|e| e.dense_vector_config.as_ref())
341            .and_then(|c| c.mrl_dim);
342
343        // Trim query vector if mrl_dim is set
344        let query_vec: Vec<f32>;
345        let effective_query = if let Some(trim_dim) = mrl_dim {
346            if trim_dim < query.len() {
347                query_vec = query[..trim_dim].to_vec();
348                query_vec.as_slice()
349            } else {
350                query
351            }
352        } else {
353            query
354        };
355
356        let results: Vec<(u32, f32)> = match index {
357            VectorIndex::RaBitQ(rabitq) => rabitq
358                .search(effective_query, k, rerank_factor)
359                .into_iter()
360                .map(|(idx, dist)| (idx as u32, dist))
361                .collect(),
362            VectorIndex::IVF { index, codebook } => {
363                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
364                    Error::Schema("IVF index requires coarse centroids".to_string())
365                })?;
366                let nprobe = rerank_factor.max(32); // Use rerank_factor as nprobe hint
367                index.search(centroids, codebook, effective_query, k, Some(nprobe))
368            }
369            VectorIndex::ScaNN { index, codebook } => {
370                let centroids = self.coarse_centroids.as_ref().ok_or_else(|| {
371                    Error::Schema("ScaNN index requires coarse centroids".to_string())
372                })?;
373                let nprobe = rerank_factor.max(32);
374                index.search(centroids, codebook, effective_query, k, Some(nprobe))
375            }
376        };
377
378        // Adjust doc_ids by segment offset
379        Ok(results
380            .into_iter()
381            .map(|(idx, dist)| (idx as DocId + self.doc_id_offset, dist))
382            .collect())
383    }
384
385    /// Check if this segment has a dense vector index for the given field
386    pub fn has_dense_vector_index(&self, field: Field) -> bool {
387        self.vector_indexes.contains_key(&field.0)
388    }
389
390    /// Get the dense vector index for a field (if available)
391    pub fn get_dense_vector_index(&self, field: Field) -> Option<Arc<RaBitQIndex>> {
392        match self.vector_indexes.get(&field.0) {
393            Some(VectorIndex::RaBitQ(idx)) => Some(idx.clone()),
394            _ => None,
395        }
396    }
397
398    /// Get the IVF vector index for a field (if available)
399    pub fn get_ivf_vector_index(&self, field: Field) -> Option<Arc<IVFRaBitQIndex>> {
400        match self.vector_indexes.get(&field.0) {
401            Some(VectorIndex::IVF { index, .. }) => Some(index.clone()),
402            _ => None,
403        }
404    }
405
406    /// Get the ScaNN vector index for a field (if available)
407    pub fn get_scann_vector_index(
408        &self,
409        field: Field,
410    ) -> Option<(Arc<IVFPQIndex>, Arc<PQCodebook>)> {
411        match self.vector_indexes.get(&field.0) {
412            Some(VectorIndex::ScaNN { index, codebook }) => Some((index.clone(), codebook.clone())),
413            _ => None,
414        }
415    }
416
417    /// Get the vector index type for a field
418    pub fn get_vector_index(&self, field: Field) -> Option<&VectorIndex> {
419        self.vector_indexes.get(&field.0)
420    }
421
422    /// Search for similar sparse vectors using dedicated sparse posting lists
423    ///
424    /// Uses shared `WandExecutor` with `SparseTermScorer` for efficient top-k retrieval.
425    /// Optimizations (via WandExecutor):
426    /// 1. **MaxScore pruning**: Dimensions sorted by max contribution
427    /// 2. **Block-Max WAND**: Skips blocks where max contribution < threshold
428    /// 3. **Top-K heap**: Efficient score collection
429    ///
430    /// Returns (doc_id, score) pairs sorted by score descending.
431    pub async fn search_sparse_vector(
432        &self,
433        field: Field,
434        vector: &[(u32, f32)],
435        limit: usize,
436    ) -> Result<Vec<(u32, f32)>> {
437        use crate::query::{SparseTermScorer, WandExecutor};
438
439        // Get sparse index for this field
440        let sparse_index = match self.sparse_indexes.get(&field.0) {
441            Some(idx) => idx,
442            None => return Ok(Vec::new()),
443        };
444
445        // Build scorers for each dimension that exists in the index
446        let scorers: Vec<SparseTermScorer> = vector
447            .iter()
448            .filter_map(|&(dim_id, query_weight)| {
449                // Direct indexing: O(1) lookup
450                sparse_index
451                    .postings
452                    .get(dim_id as usize)
453                    .and_then(|opt| opt.as_ref())
454                    .map(|pl| SparseTermScorer::from_arc(pl, query_weight))
455            })
456            .collect();
457
458        if scorers.is_empty() {
459            return Ok(Vec::new());
460        }
461
462        // Use shared WandExecutor for top-k retrieval
463        let results = WandExecutor::new(scorers, limit).execute();
464
465        Ok(results.into_iter().map(|r| (r.doc_id, r.score)).collect())
466    }
467
468    /// Load dense vector indexes from unified .vectors file
469    ///
470    /// Supports RaBitQ (type 0), IVF-RaBitQ (type 1), and ScaNN (type 2).
471    /// Also loads coarse centroids and PQ codebook as needed.
472    ///
473    /// Memory optimization: Uses lazy range reads to load each index separately,
474    /// avoiding loading the entire vectors file into memory at once.
475    async fn load_vectors_file<D: Directory>(
476        dir: &D,
477        files: &SegmentFiles,
478        schema: &Schema,
479    ) -> Result<(FxHashMap<u32, VectorIndex>, Option<Arc<CoarseCentroids>>)> {
480        use byteorder::{LittleEndian, ReadBytesExt};
481        use std::io::Cursor;
482
483        let mut indexes = FxHashMap::default();
484        let mut coarse_centroids: Option<Arc<CoarseCentroids>> = None;
485
486        // Skip loading vectors file if schema has no dense vector fields
487        let has_dense_vectors = schema
488            .fields()
489            .any(|(_, entry)| entry.dense_vector_config.is_some());
490        if !has_dense_vectors {
491            return Ok((indexes, None));
492        }
493
494        // Try to open vectors file (may not exist if no vectors were indexed)
495        let handle = match dir.open_lazy(&files.vectors).await {
496            Ok(h) => h,
497            Err(_) => return Ok((indexes, None)),
498        };
499
500        // Read only the header first (4 bytes for num_fields)
501        let header_bytes = match handle.read_bytes_range(0..4).await {
502            Ok(b) => b,
503            Err(_) => return Ok((indexes, None)),
504        };
505
506        if header_bytes.is_empty() {
507            return Ok((indexes, None));
508        }
509
510        let mut cursor = Cursor::new(header_bytes.as_slice());
511        let num_fields = cursor.read_u32::<LittleEndian>()?;
512
513        if num_fields == 0 {
514            return Ok((indexes, None));
515        }
516
517        // Read field entries header: (field_id: 4, index_type: 1, offset: 8, length: 8) = 21 bytes per field
518        let entries_size = num_fields as u64 * 21;
519        let entries_bytes = handle.read_bytes_range(4..4 + entries_size).await?;
520        let mut cursor = Cursor::new(entries_bytes.as_slice());
521
522        // Read field entries (field_id, index_type, offset, length)
523        let mut entries = Vec::with_capacity(num_fields as usize);
524        for _ in 0..num_fields {
525            let field_id = cursor.read_u32::<LittleEndian>()?;
526            // Try to read index_type - if this fails, assume old format without type
527            let index_type = cursor.read_u8().unwrap_or(255); // 255 = unknown/legacy
528            let offset = cursor.read_u64::<LittleEndian>()?;
529            let length = cursor.read_u64::<LittleEndian>()?;
530            entries.push((field_id, index_type, offset, length));
531        }
532
533        // Load each index on-demand using range reads (memory efficient)
534        for (field_id, index_type, offset, length) in entries {
535            // Read only this index's data
536            let data = handle.read_bytes_range(offset..offset + length).await?;
537            let field = crate::dsl::Field(field_id);
538
539            match index_type {
540                2 => {
541                    // ScaNN (IVF-PQ)
542                    if let Ok(ivfpq_index) = IVFPQIndex::from_bytes(data.as_slice()) {
543                        // Load coarse centroids if not already loaded
544                        if coarse_centroids.is_none()
545                            && let Some(entry) = schema.get_field_entry(field)
546                            && let Some(ref config) = entry.dense_vector_config
547                            && let Some(ref path) = config.coarse_centroids_path
548                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
549                        {
550                            coarse_centroids = Some(Arc::new(c));
551                        }
552
553                        // Load PQ codebook
554                        if let Some(entry) = schema.get_field_entry(field)
555                            && let Some(ref config) = entry.dense_vector_config
556                            && let Some(ref path) = config.pq_codebook_path
557                            && let Ok(codebook) = PQCodebook::load(std::path::Path::new(path))
558                        {
559                            indexes.insert(
560                                field_id,
561                                VectorIndex::ScaNN {
562                                    index: Arc::new(ivfpq_index),
563                                    codebook: Arc::new(codebook),
564                                },
565                            );
566                        }
567                    }
568                }
569                1 => {
570                    // IVF-RaBitQ
571                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
572                    {
573                        if coarse_centroids.is_none()
574                            && let Some(entry) = schema.get_field_entry(field)
575                            && let Some(ref config) = entry.dense_vector_config
576                            && let Some(ref path) = config.coarse_centroids_path
577                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
578                        {
579                            coarse_centroids = Some(Arc::new(c));
580                        }
581                        // Create a codebook for the IVF index
582                        let codebook = Arc::new(RaBitQCodebook::new(
583                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
584                        ));
585                        indexes.insert(
586                            field_id,
587                            VectorIndex::IVF {
588                                index: Arc::new(ivf_index),
589                                codebook,
590                            },
591                        );
592                    }
593                }
594                0 => {
595                    // RaBitQ
596                    if let Ok(rabitq_index) = serde_json::from_slice::<RaBitQIndex>(data.as_slice())
597                    {
598                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
599                    }
600                }
601                _ => {
602                    // Legacy format without type - try to auto-detect
603                    if let Ok(ivf_index) = serde_json::from_slice::<IVFRaBitQIndex>(data.as_slice())
604                    {
605                        if coarse_centroids.is_none()
606                            && let Some(entry) = schema.get_field_entry(field)
607                            && let Some(ref config) = entry.dense_vector_config
608                            && let Some(ref path) = config.coarse_centroids_path
609                            && let Ok(c) = CoarseCentroids::load(std::path::Path::new(path))
610                        {
611                            coarse_centroids = Some(Arc::new(c));
612                        }
613                        // Create a codebook for the IVF index
614                        let codebook = Arc::new(RaBitQCodebook::new(
615                            crate::structures::RaBitQConfig::new(ivf_index.config.dim),
616                        ));
617                        indexes.insert(
618                            field_id,
619                            VectorIndex::IVF {
620                                index: Arc::new(ivf_index),
621                                codebook,
622                            },
623                        );
624                    } else if let Ok(rabitq_index) =
625                        serde_json::from_slice::<RaBitQIndex>(data.as_slice())
626                    {
627                        indexes.insert(field_id, VectorIndex::RaBitQ(Arc::new(rabitq_index)));
628                    }
629                }
630            }
631        }
632
633        Ok((indexes, coarse_centroids))
634    }
635
636    /// Load sparse vector indexes from .sparse file
637    ///
638    /// File format (direct-indexed table for O(1) dimension lookup):
639    /// - Header: num_fields (u32)
640    /// - For each field:
641    ///   - field_id (u32)
642    ///   - quantization (u8)
643    ///   - max_dim_id (u32)          ← table size
644    ///   - table: [(offset: u64, length: u32)] × max_dim_id  ← direct indexed
645    ///     (offset=0, length=0 means dimension not present)
646    /// - Data: concatenated serialized BlockSparsePostingList
647    async fn load_sparse_file<D: Directory>(
648        dir: &D,
649        files: &SegmentFiles,
650        total_docs: u32,
651    ) -> Result<FxHashMap<u32, SparseIndex>> {
652        use byteorder::{LittleEndian, ReadBytesExt};
653        use std::io::Cursor;
654
655        let mut indexes = FxHashMap::default();
656
657        // Try to open sparse file (may not exist if no sparse vectors were indexed)
658        let handle = match dir.open_lazy(&files.sparse).await {
659            Ok(h) => h,
660            Err(_) => return Ok(indexes),
661        };
662
663        // Read the entire file (sparse files are typically small enough)
664        let data = match handle.read_bytes().await {
665            Ok(d) => d,
666            Err(_) => return Ok(indexes),
667        };
668
669        if data.len() < 4 {
670            return Ok(indexes);
671        }
672
673        let mut cursor = Cursor::new(data.as_slice());
674        let num_fields = cursor.read_u32::<LittleEndian>()?;
675
676        if num_fields == 0 {
677            return Ok(indexes);
678        }
679
680        // Read field entries and build indexes
681        for _ in 0..num_fields {
682            let field_id = cursor.read_u32::<LittleEndian>()?;
683            let _quantization = cursor.read_u8()?; // Already stored in each BlockSparsePostingList
684            let max_dim_id = cursor.read_u32::<LittleEndian>()?;
685
686            // Read direct-indexed table
687            let mut postings: Vec<Option<Arc<BlockSparsePostingList>>> =
688                vec![None; max_dim_id as usize];
689
690            for dim_id in 0..max_dim_id {
691                let offset = cursor.read_u64::<LittleEndian>()?;
692                let length = cursor.read_u32::<LittleEndian>()?;
693
694                // offset=0, length=0 means dimension not present
695                if length > 0 {
696                    let start = offset as usize;
697                    let end = start + length as usize;
698                    if end <= data.len() {
699                        let posting_data = &data.as_slice()[start..end];
700                        if let Ok(posting_list) =
701                            BlockSparsePostingList::deserialize(&mut Cursor::new(posting_data))
702                        {
703                            postings[dim_id as usize] = Some(Arc::new(posting_list));
704                        }
705                    }
706                }
707            }
708
709            indexes.insert(
710                field_id,
711                SparseIndex {
712                    postings,
713                    total_docs,
714                },
715            );
716        }
717
718        Ok(indexes)
719    }
720}
721
722/// Alias for AsyncSegmentReader
723pub type SegmentReader = AsyncSegmentReader;