1use ailake_core::{AilakeError, AilakeResult, Centroid, VectorMetric};
2use ailake_index::{AnyIndex, HnswIndex, IvfPqSerializer, MmapLoader};
3use ailake_parquet::ParquetVectorReader;
4use arrow_array::RecordBatch;
5use bytes::Bytes;
6
7use crate::footer::{AilakeHeader, DistanceMetric, FLAG_INDEX_IVF_PQ, HEADER_SIZE};
8
9pub struct AilakeFileReader {
10 bytes: Bytes,
11 vector_column: String,
12 #[allow(dead_code)]
13 dim: u32,
14}
15
16impl AilakeFileReader {
17 pub fn new(bytes: Bytes, vector_column: &str, dim: u32) -> Self {
18 Self {
19 bytes,
20 vector_column: vector_column.to_string(),
21 dim,
22 }
23 }
24
25 pub fn ailk_offset(&self) -> AilakeResult<u64> {
28 let reader = ParquetVectorReader::new(self.bytes.clone(), &self.vector_column);
29 let val = reader
30 .kv_metadata("ailake.footer_offset")?
31 .ok_or(AilakeError::NotAnAilakeFile)?;
32 val.parse::<u64>().map_err(|_| AilakeError::NotAnAilakeFile)
33 }
34
35 pub fn ailk_offset_for_column(&self, column: &str) -> AilakeResult<u64> {
40 let reader = ParquetVectorReader::new(self.bytes.clone(), column);
41 let col_key = format!("ailake.{column}.footer_offset");
42 if let Some(val) = reader.kv_metadata(&col_key)? {
43 return val.parse::<u64>().map_err(|_| AilakeError::NotAnAilakeFile);
44 }
45 let val = reader
46 .kv_metadata("ailake.footer_offset")?
47 .ok_or(AilakeError::NotAnAilakeFile)?;
48 val.parse::<u64>().map_err(|_| AilakeError::NotAnAilakeFile)
49 }
50
51 pub fn is_ailake_file(&self) -> bool {
53 self.ailk_offset().is_ok()
54 }
55
56 pub fn read_header(&self) -> AilakeResult<AilakeHeader> {
58 let offset = self.ailk_offset()? as usize;
59 if offset + HEADER_SIZE > self.bytes.len() {
60 return Err(AilakeError::NotAnAilakeFile);
61 }
62 let header_bytes: &[u8; HEADER_SIZE] = self.bytes[offset..offset + HEADER_SIZE]
63 .try_into()
64 .map_err(|_| AilakeError::NotAnAilakeFile)?;
65 AilakeHeader::from_bytes(header_bytes)
66 }
67
68 pub fn get_centroid(&self) -> AilakeResult<Centroid> {
70 let ailk_start = self.ailk_offset()? as usize;
71 let header = self.read_header()?;
72 let centroid_start = ailk_start + header.centroid_offset as usize;
73 let centroid_end = centroid_start + header.centroid_len as usize;
74
75 if centroid_end > self.bytes.len() {
76 return Err(AilakeError::NotAnAilakeFile);
77 }
78
79 let centroid_data = &self.bytes[centroid_start..centroid_end];
80 let dim = header.dim as usize;
81 let expected_len = dim * 4 + 4;
82 if centroid_data.len() != expected_len {
83 return Err(AilakeError::InvalidCentroidLength {
84 expected_dim: header.dim,
85 actual: centroid_data.len(),
86 });
87 }
88
89 let values: Vec<f32> = centroid_data[..dim * 4]
90 .chunks_exact(4)
91 .map(|b| f32::from_le_bytes(b.try_into().unwrap()))
92 .collect();
93 let radius = f32::from_le_bytes(centroid_data[dim * 4..].try_into().unwrap());
94 let metric = distance_metric_to_vector_metric(header.distance_metric);
95
96 Ok(Centroid {
97 values,
98 radius,
99 metric,
100 })
101 }
102
103 pub fn load_index(&self) -> AilakeResult<HnswIndex> {
105 self.load_index_for_column(&self.vector_column.clone())
106 }
107
108 pub fn load_index_for_column(&self, column: &str) -> AilakeResult<HnswIndex> {
113 let ailk_start = self.ailk_offset_for_column(column)? as usize;
114
115 if ailk_start + HEADER_SIZE > self.bytes.len() {
116 return Err(AilakeError::NotAnAilakeFile);
117 }
118 let header_bytes: &[u8; HEADER_SIZE] = self.bytes[ailk_start..ailk_start + HEADER_SIZE]
119 .try_into()
120 .map_err(|_| AilakeError::NotAnAilakeFile)?;
121 let header = AilakeHeader::from_bytes(header_bytes)?;
122
123 let hnsw_start = ailk_start + header.hnsw_offset as usize;
124 let hnsw_end = hnsw_start + header.hnsw_len as usize;
125
126 if hnsw_end > self.bytes.len() {
127 return Err(AilakeError::NotAnAilakeFile);
128 }
129 MmapLoader::from_bytes(&self.bytes[hnsw_start..hnsw_end])
130 }
131
132 pub fn load_any_index(&self) -> AilakeResult<AnyIndex> {
134 self.load_any_index_for_column(&self.vector_column.clone())
135 }
136
137 pub fn load_any_index_for_column(&self, column: &str) -> AilakeResult<AnyIndex> {
139 let ailk_start = self.ailk_offset_for_column(column)? as usize;
140
141 if ailk_start + HEADER_SIZE > self.bytes.len() {
142 return Err(AilakeError::NotAnAilakeFile);
143 }
144 let header_bytes: &[u8; HEADER_SIZE] = self.bytes[ailk_start..ailk_start + HEADER_SIZE]
145 .try_into()
146 .map_err(|_| AilakeError::NotAnAilakeFile)?;
147 let header = AilakeHeader::from_bytes(header_bytes)?;
148
149 let index_start = ailk_start + header.hnsw_offset as usize;
150 let index_end = index_start + header.hnsw_len as usize;
151
152 if index_end > self.bytes.len() {
153 return Err(AilakeError::NotAnAilakeFile);
154 }
155 let index_bytes = &self.bytes[index_start..index_end];
156
157 if header.flags & FLAG_INDEX_IVF_PQ != 0 {
158 let idx = IvfPqSerializer::from_bytes(index_bytes)?;
159 Ok(AnyIndex::IvfPq(idx))
160 } else {
161 let idx = MmapLoader::from_bytes(index_bytes)?;
162 Ok(AnyIndex::Hnsw(idx))
163 }
164 }
165
166 pub fn read_parquet(&self) -> AilakeResult<(RecordBatch, Vec<Vec<f32>>)> {
169 let reader = ParquetVectorReader::new(self.bytes.clone(), &self.vector_column);
170 reader.read_all()
171 }
172
173 pub fn verify_integrity(&self) -> AilakeResult<()> {
175 let header = self.read_header()?;
176 let index = self.load_index()?;
177 let reader = ParquetVectorReader::new(self.bytes.clone(), &self.vector_column);
178 let parquet_count = reader.record_count()?;
179
180 if parquet_count != index.node_count() {
181 return Err(AilakeError::RowCountMismatch {
182 parquet: parquet_count,
183 hnsw: index.node_count(),
184 });
185 }
186 if parquet_count != header.record_count {
187 return Err(AilakeError::RowCountMismatch {
188 parquet: parquet_count,
189 hnsw: header.record_count,
190 });
191 }
192 Ok(())
193 }
194}
195
196fn distance_metric_to_vector_metric(dm: DistanceMetric) -> VectorMetric {
197 match dm {
198 DistanceMetric::Cosine => VectorMetric::Cosine,
199 DistanceMetric::Euclidean => VectorMetric::Euclidean,
200 DistanceMetric::DotProduct => VectorMetric::DotProduct,
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use crate::writer::AilakeFileWriter;
208 use ailake_core::{VectorMetric, VectorPrecision, VectorStoragePolicy};
209 use arrow_array::{Int32Array, RecordBatch};
210 use arrow_schema::{DataType, Field, Schema};
211 use std::sync::Arc;
212
213 fn make_policy(dim: u32) -> VectorStoragePolicy {
214 VectorStoragePolicy {
215 column_name: "embedding".to_string(),
216 dim,
217 metric: VectorMetric::Cosine,
218 precision: VectorPrecision::F16,
219 pq: None,
220 keep_raw_for_reranking: false,
221 }
222 }
223
224 fn write_file(rows: usize, dim: u32) -> Bytes {
225 let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
226 let ids: Vec<i32> = (0..rows as i32).collect();
227 let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(ids))]).unwrap();
228 let embs: Vec<Vec<f32>> = (0..rows)
229 .map(|i| {
230 let mut v = vec![0.0f32; dim as usize];
231 v[i % dim as usize] = 1.0;
232 v
233 })
234 .collect();
235 AilakeFileWriter::new(make_policy(dim))
236 .write(&batch, &embs)
237 .unwrap()
238 }
239
240 #[test]
241 fn is_ailake_file() {
242 let file = write_file(3, 4);
243 let reader = AilakeFileReader::new(file, "embedding", 4);
244 assert!(reader.is_ailake_file());
245 }
246
247 #[test]
248 fn integrity_check_passes() {
249 let file = write_file(10, 8);
250 let reader = AilakeFileReader::new(file, "embedding", 8);
251 reader.verify_integrity().unwrap();
252 }
253
254 #[test]
255 fn centroid_has_correct_dim() {
256 let file = write_file(5, 4);
257 let reader = AilakeFileReader::new(file, "embedding", 4);
258 let centroid = reader.get_centroid().unwrap();
259 assert_eq!(centroid.values.len(), 4);
260 }
261
262 #[test]
263 fn search_finds_nearest() {
264 let dim = 4u32;
265 let file = write_file(4, dim);
266 let reader = AilakeFileReader::new(file, "embedding", dim);
267 let index = reader.load_index().unwrap();
268 let query = vec![1.0f32, 0.0, 0.0, 0.0];
269 let results = index.search(&query, 1, 50);
270 assert_eq!(results.len(), 1);
271 assert_eq!(results[0].0, ailake_core::RowId::new(0));
272 }
273
274 #[test]
275 fn parquet_read_returns_tabular_data() {
276 let file = write_file(3, 4);
277 let reader = AilakeFileReader::new(file, "embedding", 4);
278 let (batch, embs) = reader.read_parquet().unwrap();
279 assert_eq!(batch.num_rows(), 3);
280 assert_eq!(embs.len(), 3);
281 }
282}