Skip to main content

ailake_file/
reader.rs

1use ailake_core::{AilakeError, AilakeResult, Centroid, VectorMetric};
2use ailake_index::{AnyIndex, HnswIndex, IvfPqSerializer, MmapLoader};
3use ailake_parquet::ParquetVectorReader;
4use arrow_array::RecordBatch;
5use bytes::Bytes;
6
7use crate::footer::{AilakeHeader, DistanceMetric, FLAG_INDEX_IVF_PQ, HEADER_SIZE};
8
9pub struct AilakeFileReader {
10    bytes: Bytes,
11    vector_column: String,
12    #[allow(dead_code)]
13    dim: u32,
14}
15
16impl AilakeFileReader {
17    pub fn new(bytes: Bytes, vector_column: &str, dim: u32) -> Self {
18        Self {
19            bytes,
20            vector_column: vector_column.to_string(),
21            dim,
22        }
23    }
24
25    /// Returns the absolute byte offset of the primary AILK section.
26    /// Reads `ailake.footer_offset` from the Parquet footer key-value metadata.
27    pub fn ailk_offset(&self) -> AilakeResult<u64> {
28        let reader = ParquetVectorReader::new(self.bytes.clone(), &self.vector_column);
29        let val = reader
30            .kv_metadata("ailake.footer_offset")?
31            .ok_or(AilakeError::NotAnAilakeFile)?;
32        val.parse::<u64>().map_err(|_| AilakeError::NotAnAilakeFile)
33    }
34
35    /// Returns the absolute byte offset of the AILK section for a named vector column.
36    ///
37    /// For additional columns tries `ailake.{column}.footer_offset` first,
38    /// then falls back to `ailake.footer_offset` (primary / single-column files).
39    pub fn ailk_offset_for_column(&self, column: &str) -> AilakeResult<u64> {
40        let reader = ParquetVectorReader::new(self.bytes.clone(), column);
41        let col_key = format!("ailake.{column}.footer_offset");
42        if let Some(val) = reader.kv_metadata(&col_key)? {
43            return val.parse::<u64>().map_err(|_| AilakeError::NotAnAilakeFile);
44        }
45        let val = reader
46            .kv_metadata("ailake.footer_offset")?
47            .ok_or(AilakeError::NotAnAilakeFile)?;
48        val.parse::<u64>().map_err(|_| AilakeError::NotAnAilakeFile)
49    }
50
51    /// Returns true if the file contains an embedded AILK section.
52    pub fn is_ailake_file(&self) -> bool {
53        self.ailk_offset().is_ok()
54    }
55
56    /// Parse the 64-byte AI-Lake header from the embedded AILK section.
57    pub fn read_header(&self) -> AilakeResult<AilakeHeader> {
58        let offset = self.ailk_offset()? as usize;
59        if offset + HEADER_SIZE > self.bytes.len() {
60            return Err(AilakeError::NotAnAilakeFile);
61        }
62        let header_bytes: &[u8; HEADER_SIZE] = self.bytes[offset..offset + HEADER_SIZE]
63            .try_into()
64            .map_err(|_| AilakeError::NotAnAilakeFile)?;
65        AilakeHeader::from_bytes(header_bytes)
66    }
67
68    /// Read centroid + radius from the AILK section.
69    pub fn get_centroid(&self) -> AilakeResult<Centroid> {
70        let ailk_start = self.ailk_offset()? as usize;
71        let header = self.read_header()?;
72        let centroid_start = ailk_start + header.centroid_offset as usize;
73        let centroid_end = centroid_start + header.centroid_len as usize;
74
75        if centroid_end > self.bytes.len() {
76            return Err(AilakeError::NotAnAilakeFile);
77        }
78
79        let centroid_data = &self.bytes[centroid_start..centroid_end];
80        let dim = header.dim as usize;
81        let expected_len = dim * 4 + 4;
82        if centroid_data.len() != expected_len {
83            return Err(AilakeError::InvalidCentroidLength {
84                expected_dim: header.dim,
85                actual: centroid_data.len(),
86            });
87        }
88
89        let values: Vec<f32> = centroid_data[..dim * 4]
90            .chunks_exact(4)
91            .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
92            .collect();
93        let radius = f32::from_le_bytes(centroid_data[dim * 4..].try_into().unwrap());
94        let metric = distance_metric_to_vector_metric(header.distance_metric);
95
96        Ok(Centroid {
97            values,
98            radius,
99            metric,
100        })
101    }
102
103    /// Load the HNSW index from the primary AILK section.
104    pub fn load_index(&self) -> AilakeResult<HnswIndex> {
105        self.load_index_for_column(&self.vector_column.clone())
106    }
107
108    /// Load the HNSW index for a specific vector column.
109    ///
110    /// Works for both single-column files (falls back to primary AILK) and
111    /// multi-column files written with `AilakeFileWriter::write_multi`.
112    pub fn load_index_for_column(&self, column: &str) -> AilakeResult<HnswIndex> {
113        let ailk_start = self.ailk_offset_for_column(column)? as usize;
114
115        if ailk_start + HEADER_SIZE > self.bytes.len() {
116            return Err(AilakeError::NotAnAilakeFile);
117        }
118        let header_bytes: &[u8; HEADER_SIZE] = self.bytes[ailk_start..ailk_start + HEADER_SIZE]
119            .try_into()
120            .map_err(|_| AilakeError::NotAnAilakeFile)?;
121        let header = AilakeHeader::from_bytes(header_bytes)?;
122
123        let hnsw_start = ailk_start + header.hnsw_offset as usize;
124        let hnsw_end = hnsw_start + header.hnsw_len as usize;
125
126        if hnsw_end > self.bytes.len() {
127            return Err(AilakeError::NotAnAilakeFile);
128        }
129        MmapLoader::from_bytes(&self.bytes[hnsw_start..hnsw_end])
130    }
131
132    /// Load primary index as `AnyIndex`, dispatching on header flags.
133    pub fn load_any_index(&self) -> AilakeResult<AnyIndex> {
134        self.load_any_index_for_column(&self.vector_column.clone())
135    }
136
137    /// Load index for a specific vector column as `AnyIndex`.
138    pub fn load_any_index_for_column(&self, column: &str) -> AilakeResult<AnyIndex> {
139        let ailk_start = self.ailk_offset_for_column(column)? as usize;
140
141        if ailk_start + HEADER_SIZE > self.bytes.len() {
142            return Err(AilakeError::NotAnAilakeFile);
143        }
144        let header_bytes: &[u8; HEADER_SIZE] = self.bytes[ailk_start..ailk_start + HEADER_SIZE]
145            .try_into()
146            .map_err(|_| AilakeError::NotAnAilakeFile)?;
147        let header = AilakeHeader::from_bytes(header_bytes)?;
148
149        let index_start = ailk_start + header.hnsw_offset as usize;
150        let index_end = index_start + header.hnsw_len as usize;
151
152        if index_end > self.bytes.len() {
153            return Err(AilakeError::NotAnAilakeFile);
154        }
155        let index_bytes = &self.bytes[index_start..index_end];
156
157        if header.flags & FLAG_INDEX_IVF_PQ != 0 {
158            let idx = IvfPqSerializer::from_bytes(index_bytes)?;
159            Ok(AnyIndex::IvfPq(idx))
160        } else {
161            let idx = MmapLoader::from_bytes(index_bytes)?;
162            Ok(AnyIndex::Hnsw(idx))
163        }
164    }
165
166    /// Read the Parquet section (tabular data + decoded embeddings).
167    /// The full file is valid Parquet; the AILK section is invisible to standard readers.
168    pub fn read_parquet(&self) -> AilakeResult<(RecordBatch, Vec<Vec<f32>>)> {
169        let reader = ParquetVectorReader::new(self.bytes.clone(), &self.vector_column);
170        reader.read_all()
171    }
172
173    /// Verify the positional invariant: Parquet record_count == HNSW node_count.
174    pub fn verify_integrity(&self) -> AilakeResult<()> {
175        let header = self.read_header()?;
176        let index = self.load_index()?;
177        let reader = ParquetVectorReader::new(self.bytes.clone(), &self.vector_column);
178        let parquet_count = reader.record_count()?;
179
180        if parquet_count != index.node_count() {
181            return Err(AilakeError::RowCountMismatch {
182                parquet: parquet_count,
183                hnsw: index.node_count(),
184            });
185        }
186        if parquet_count != header.record_count {
187            return Err(AilakeError::RowCountMismatch {
188                parquet: parquet_count,
189                hnsw: header.record_count,
190            });
191        }
192        Ok(())
193    }
194}
195
196fn distance_metric_to_vector_metric(dm: DistanceMetric) -> VectorMetric {
197    match dm {
198        DistanceMetric::Cosine => VectorMetric::Cosine,
199        DistanceMetric::Euclidean => VectorMetric::Euclidean,
200        DistanceMetric::DotProduct => VectorMetric::DotProduct,
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use crate::writer::AilakeFileWriter;
208    use ailake_core::{VectorMetric, VectorPrecision, VectorStoragePolicy};
209    use arrow_array::{Int32Array, RecordBatch};
210    use arrow_schema::{DataType, Field, Schema};
211    use std::sync::Arc;
212
213    fn make_policy(dim: u32) -> VectorStoragePolicy {
214        VectorStoragePolicy {
215            column_name: "embedding".to_string(),
216            dim,
217            metric: VectorMetric::Cosine,
218            precision: VectorPrecision::F16,
219            pq: None,
220            keep_raw_for_reranking: false,
221        }
222    }
223
224    fn write_file(rows: usize, dim: u32) -> Bytes {
225        let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
226        let ids: Vec<i32> = (0..rows as i32).collect();
227        let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(ids))]).unwrap();
228        let embs: Vec<Vec<f32>> = (0..rows)
229            .map(|i| {
230                let mut v = vec![0.0f32; dim as usize];
231                v[i % dim as usize] = 1.0;
232                v
233            })
234            .collect();
235        AilakeFileWriter::new(make_policy(dim))
236            .write(&batch, &embs)
237            .unwrap()
238    }
239
240    #[test]
241    fn is_ailake_file() {
242        let file = write_file(3, 4);
243        let reader = AilakeFileReader::new(file, "embedding", 4);
244        assert!(reader.is_ailake_file());
245    }
246
247    #[test]
248    fn integrity_check_passes() {
249        let file = write_file(10, 8);
250        let reader = AilakeFileReader::new(file, "embedding", 8);
251        reader.verify_integrity().unwrap();
252    }
253
254    #[test]
255    fn centroid_has_correct_dim() {
256        let file = write_file(5, 4);
257        let reader = AilakeFileReader::new(file, "embedding", 4);
258        let centroid = reader.get_centroid().unwrap();
259        assert_eq!(centroid.values.len(), 4);
260    }
261
262    #[test]
263    fn search_finds_nearest() {
264        let dim = 4u32;
265        let file = write_file(4, dim);
266        let reader = AilakeFileReader::new(file, "embedding", dim);
267        let index = reader.load_index().unwrap();
268        let query = vec![1.0f32, 0.0, 0.0, 0.0];
269        let results = index.search(&query, 1, 50);
270        assert_eq!(results.len(), 1);
271        assert_eq!(results[0].0, ailake_core::RowId::new(0));
272    }
273
274    #[test]
275    fn parquet_read_returns_tabular_data() {
276        let file = write_file(3, 4);
277        let reader = AilakeFileReader::new(file, "embedding", 4);
278        let (batch, embs) = reader.read_parquet().unwrap();
279        assert_eq!(batch.num_rows(), 3);
280        assert_eq!(embs.len(), 3);
281    }
282}