1#![deny(unsafe_code)]
2
3pub mod inspect;
24pub mod stats;
25
26use std::io;
27use std::path::Path;
28
29use skeg_vector::{MmapVectorSource, VamanaConfig, VamanaIndex};
30
31const SERVE_SHARD_DIR: &str = "shard-0";
35
36const VINDEX_REGISTRY: &str = "vindexes.registry";
40
41const GRAPH_FILE: &str = "graph.vmn";
45const VECTORS_FILE: &str = "vectors.bin";
46
47pub struct BuildStats {
49 pub n: usize,
51 pub dim: usize,
53 pub graph_bytes: u64,
55 pub vectors_bytes: u64,
57}
58
59fn bad_data(msg: impl Into<String>) -> io::Error {
60 io::Error::new(io::ErrorKind::InvalidData, msg.into())
61}
62
63pub fn read_vectors(path: &Path) -> io::Result<(Vec<f32>, usize, usize)> {
74 match path.extension().and_then(|e| e.to_str()) {
75 Some("npy") => read_npy(path),
76 Some("fbin" | "bin") => read_fbin(path),
77 other => Err(io::Error::new(
78 io::ErrorKind::InvalidInput,
79 format!(
80 "unsupported input extension {:?} (expected .npy or .fbin)",
81 other.unwrap_or("")
82 ),
83 )),
84 }
85}
86
87fn parse_npy_shape(header: &str) -> io::Result<(usize, usize)> {
90 if !header.contains("<f4") {
91 return Err(bad_data(
92 "only little-endian float32 (<f4) .npy is supported",
93 ));
94 }
95 let sh = header
96 .find("'shape':")
97 .ok_or_else(|| bad_data("no shape in .npy header"))?;
98 let lp = header[sh..]
99 .find('(')
100 .ok_or_else(|| bad_data("malformed .npy shape"))?
101 + sh
102 + 1;
103 let rp = header[lp..]
104 .find(')')
105 .ok_or_else(|| bad_data("malformed .npy shape"))?
106 + lp;
107 let dims: Vec<usize> = header[lp..rp]
108 .split(',')
109 .filter_map(|s| s.trim().parse().ok())
110 .collect();
111 if dims.len() != 2 {
112 return Err(bad_data("expected a 2-D .npy array"));
113 }
114 Ok((dims[0], dims[1]))
115}
116
117fn read_npy(path: &Path) -> io::Result<(Vec<f32>, usize, usize)> {
118 let bytes = std::fs::read(path)?;
119 if bytes.len() < 10 || &bytes[0..6] != b"\x93NUMPY" {
120 return Err(bad_data("not a .npy file (bad magic)"));
121 }
122 let header_len = u16::from_le_bytes([bytes[8], bytes[9]]) as usize;
123 if 10 + header_len > bytes.len() {
124 return Err(bad_data("truncated .npy header"));
125 }
126 let header = std::str::from_utf8(&bytes[10..10 + header_len])
127 .map_err(|_| bad_data("non-utf8 .npy header"))?;
128 let (n, dim) = parse_npy_shape(header)?;
129 let payload = &bytes[10 + header_len..];
130 let need = n.checked_mul(dim).and_then(|v| v.checked_mul(4));
131 match need {
132 Some(need) if payload.len() >= need => {
133 let data = payload[..need]
134 .chunks_exact(4)
135 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
136 .collect();
137 Ok((data, n, dim))
138 }
139 _ => Err(bad_data("truncated .npy payload")),
140 }
141}
142
143fn read_fbin(path: &Path) -> io::Result<(Vec<f32>, usize, usize)> {
144 let bytes = std::fs::read(path)?;
145 if bytes.len() < 8 {
146 return Err(bad_data("truncated .fbin header"));
147 }
148 let n = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
149 let dim = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
150 let need = n.checked_mul(dim).and_then(|v| v.checked_mul(4));
151 match need {
152 Some(need) if bytes.len() >= 8 + need => {
153 let data = bytes[8..8 + need]
154 .chunks_exact(4)
155 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
156 .collect();
157 Ok((data, n, dim))
158 }
159 _ => Err(bad_data("truncated .fbin payload")),
160 }
161}
162
163pub fn read_header(path: &Path) -> io::Result<(usize, usize, usize)> {
171 match path.extension().and_then(|e| e.to_str()) {
172 Some("npy") => read_npy_header(path),
173 Some("fbin" | "bin") => read_fbin_header(path),
174 other => Err(io::Error::new(
175 io::ErrorKind::InvalidInput,
176 format!(
177 "unsupported input extension {:?} (expected .npy or .fbin)",
178 other.unwrap_or("")
179 ),
180 )),
181 }
182}
183
184fn read_npy_header(path: &Path) -> io::Result<(usize, usize, usize)> {
185 use std::io::Read;
186 let mut f = std::fs::File::open(path)?;
187 let mut pre = [0u8; 10];
188 f.read_exact(&mut pre)
189 .map_err(|_| bad_data("truncated .npy header"))?;
190 if &pre[0..6] != b"\x93NUMPY" {
191 return Err(bad_data("not a .npy file (bad magic)"));
192 }
193 let header_len = u16::from_le_bytes([pre[8], pre[9]]) as usize;
194 let mut header_bytes = vec![0u8; header_len];
195 f.read_exact(&mut header_bytes)
196 .map_err(|_| bad_data("truncated .npy header"))?;
197 let header =
198 std::str::from_utf8(&header_bytes).map_err(|_| bad_data("non-utf8 .npy header"))?;
199 let (n, dim) = parse_npy_shape(header)?;
200 Ok((10 + header_len, n, dim))
201}
202
203fn read_fbin_header(path: &Path) -> io::Result<(usize, usize, usize)> {
204 use std::io::Read;
205 let mut f = std::fs::File::open(path)?;
206 let mut hdr = [0u8; 8];
207 f.read_exact(&mut hdr)
208 .map_err(|_| bad_data("truncated .fbin header"))?;
209 let n = u32::from_le_bytes(hdr[0..4].try_into().unwrap()) as usize;
210 let dim = u32::from_le_bytes(hdr[4..8].try_into().unwrap()) as usize;
211 Ok((8, n, dim))
212}
213
214fn finish_build(index: &VamanaIndex, output: &Path, name: &str) -> io::Result<BuildStats> {
217 let shard_dir = output.join(SERVE_SHARD_DIR);
218 let vindex_dir = shard_dir.join(format!("vindex-{name}"));
219 index.save(&vindex_dir)?;
220 write_registry(&shard_dir, name, index.dim())?;
221 let graph_bytes = std::fs::metadata(vindex_dir.join(GRAPH_FILE))?.len();
222 let vectors_bytes = std::fs::metadata(vindex_dir.join(VECTORS_FILE))?.len();
223 Ok(BuildStats {
224 n: index.len(),
225 dim: index.dim(),
226 graph_bytes,
227 vectors_bytes,
228 })
229}
230
231pub fn build_index_from(
239 vectors: Vec<f32>,
240 n: usize,
241 dim: usize,
242 output: &Path,
243 name: &str,
244 config: &VamanaConfig,
245) -> io::Result<BuildStats> {
246 if n == 0 || dim == 0 {
247 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty dataset"));
248 }
249 if vectors.len() != n * dim {
250 return Err(io::Error::new(
251 io::ErrorKind::InvalidInput,
252 format!("expected {} f32 values, got {}", n * dim, vectors.len()),
253 ));
254 }
255 let ids: Vec<u64> = (0..n as u64).collect();
256 let index = VamanaIndex::build(vectors, ids, dim, config);
257 finish_build(&index, output, name)
258}
259
260pub fn build_index(
272 input: &Path,
273 output: &Path,
274 name: &str,
275 config: &VamanaConfig,
276) -> io::Result<BuildStats> {
277 let (byte_offset, n, dim) = read_header(input)?;
278 if n == 0 || dim == 0 {
279 return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty dataset"));
280 }
281 let source = MmapVectorSource::open(input, byte_offset, n, dim)?;
282 let ids: Vec<u64> = (0..n as u64).collect();
283 let index = VamanaIndex::build_from_source(Box::new(source), ids, config);
284 finish_build(&index, output, name)
285}
286
287fn write_registry(shard_dir: &Path, name: &str, dim: usize) -> io::Result<()> {
289 std::fs::create_dir_all(shard_dir)?;
290 let name_len = u16::try_from(name.len())
291 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "index name too long"))?;
292 let dim = u32::try_from(dim)
293 .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "dim too large"))?;
294 let mut buf = Vec::new();
295 buf.extend_from_slice(&1u32.to_le_bytes());
296 buf.extend_from_slice(&name_len.to_le_bytes());
297 buf.extend_from_slice(name.as_bytes());
298 buf.extend_from_slice(&dim.to_le_bytes());
299 std::fs::write(shard_dir.join(VINDEX_REGISTRY), &buf)
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use skeg_vector::DiskVamanaIndex;
306 use tempfile::TempDir;
307
308 fn make_npy(n: usize, dim: usize, data: &[f32]) -> Vec<u8> {
310 let dict = format!("{{'descr': '<f4', 'fortran_order': False, 'shape': ({n}, {dim}), }}");
311 let unpadded = 10 + dict.len() + 1;
314 let pad = (64 - unpadded % 64) % 64;
315 let header_len = dict.len() + 1 + pad;
316 let mut out = Vec::new();
317 out.extend_from_slice(b"\x93NUMPY");
318 out.push(1);
319 out.push(0);
320 out.extend_from_slice(&u16::try_from(header_len).unwrap().to_le_bytes());
321 out.extend_from_slice(dict.as_bytes());
322 out.extend(std::iter::repeat_n(b' ', pad));
323 out.push(b'\n');
324 for &x in data {
325 out.extend_from_slice(&x.to_le_bytes());
326 }
327 out
328 }
329
330 fn make_fbin(n: usize, dim: usize, data: &[f32]) -> Vec<u8> {
331 let mut out = Vec::new();
332 out.extend_from_slice(&u32::try_from(n).unwrap().to_le_bytes());
333 out.extend_from_slice(&u32::try_from(dim).unwrap().to_le_bytes());
334 for &x in data {
335 out.extend_from_slice(&x.to_le_bytes());
336 }
337 out
338 }
339
340 #[allow(clippy::cast_precision_loss)]
342 fn tvec(seed: u64) -> Vec<f32> {
343 let mut s = (seed << 1) | 1;
344 (0..8)
345 .map(|_| {
346 s ^= s << 13;
347 s ^= s >> 7;
348 s ^= s << 17;
349 ((s & 0xFFFF) as f32 / 32768.0) - 1.0
350 })
351 .collect()
352 }
353
354 #[test]
355 fn npy_roundtrip() {
356 let dir = TempDir::new().unwrap();
357 let data: Vec<f32> = (0..12u8).map(f32::from).collect();
358 let path = dir.path().join("d.npy");
359 std::fs::write(&path, make_npy(3, 4, &data)).unwrap();
360 let (got, n, dim) = read_vectors(&path).unwrap();
361 assert_eq!((n, dim), (3, 4));
362 assert_eq!(got, data);
363 }
364
365 #[test]
366 fn fbin_roundtrip() {
367 let dir = TempDir::new().unwrap();
368 let data: Vec<f32> = (0..10u8).map(|i| f32::from(i) * 0.5).collect();
369 let path = dir.path().join("d.fbin");
370 std::fs::write(&path, make_fbin(2, 5, &data)).unwrap();
371 let (got, n, dim) = read_vectors(&path).unwrap();
372 assert_eq!((n, dim), (2, 5));
373 assert_eq!(got, data);
374 }
375
376 #[test]
377 fn truncated_fbin_is_rejected() {
378 let dir = TempDir::new().unwrap();
379 let mut bytes = make_fbin(2, 5, &[0.0; 10]);
380 bytes.truncate(bytes.len() - 8);
381 let path = dir.path().join("bad.fbin");
382 std::fs::write(&path, bytes).unwrap();
383 assert!(read_vectors(&path).is_err());
384 }
385
386 #[test]
387 fn unknown_extension_is_rejected() {
388 let dir = TempDir::new().unwrap();
389 let path = dir.path().join("d.csv");
390 std::fs::write(&path, b"1,2,3").unwrap();
391 assert!(read_vectors(&path).is_err());
392 }
393
394 #[test]
395 fn build_produces_a_servable_index() {
396 let n = 64;
397 let dim = 8;
398 let flat: Vec<f32> = (0..n).flat_map(|i| tvec(i as u64 + 1)).collect();
399 let out = TempDir::new().unwrap();
400 let stats =
401 build_index_from(flat, n, dim, out.path(), "docs", &VamanaConfig::default()).unwrap();
402 assert_eq!((stats.n, stats.dim), (n, dim));
403 assert!(stats.graph_bytes > 0 && stats.vectors_bytes > 0);
404
405 let vindex_dir = out.path().join("shard-0").join("vindex-docs");
407 let index = DiskVamanaIndex::open(&vindex_dir).unwrap();
408 assert_eq!(index.len(), n);
409 assert_eq!(index.dim(), dim);
410 let hits = index.search(&tvec(43), 5).unwrap();
411 assert_eq!(hits[0].0, 42, "querying a stored vector returns its id");
412
413 let registry = out.path().join("shard-0").join("vindexes.registry");
415 assert!(registry.exists());
416 }
417
418 #[test]
419 fn build_rejects_a_length_mismatch() {
420 let out = TempDir::new().unwrap();
421 let err = build_index_from(
422 vec![0.0; 10],
423 3,
424 4,
425 out.path(),
426 "x",
427 &VamanaConfig::default(),
428 );
429 assert!(err.is_err());
430 }
431
432 #[test]
433 fn build_index_mmaps_input_and_serves() {
434 let n = 64;
437 let dim = 8;
438 let flat: Vec<f32> = (0..n).flat_map(|i| tvec(i as u64 + 1)).collect();
439 let dir = TempDir::new().unwrap();
440 let input = dir.path().join("data.fbin");
441 std::fs::write(&input, make_fbin(n, dim, &flat)).unwrap();
442
443 let out = TempDir::new().unwrap();
444 let stats = build_index(&input, out.path(), "docs", &VamanaConfig::default()).unwrap();
445 assert_eq!((stats.n, stats.dim), (n, dim));
446
447 let vindex_dir = out.path().join("shard-0").join("vindex-docs");
448 let index = DiskVamanaIndex::open(&vindex_dir).unwrap();
449 assert_eq!(index.len(), n);
450 let hits = index.search(&tvec(43), 5).unwrap();
451 assert_eq!(hits[0].0, 42, "querying a stored vector returns its id");
452 }
453}