Skip to main content

skeg_cli/
lib.rs

1#![deny(unsafe_code)]
2
3//! `skeg-cli` - operator tools for skeg.
4//!
5//! Three subcommands ship in v0.1:
6//!
7//! - `build`   - offline Vamana index builder. Reads a vector dataset
8//!   (`.npy` or `.fbin`) and writes a ready-to-serve skeg data directory.
9//!   See [`build_index`].
10//! - `inspect` - offline introspection of a data directory: VINDEX names,
11//!   dims, vector counts, file sizes per shard. See [`inspect`].
12//! - `stats`   - RESP3 client that fetches `SKEG.STATS`, `SKEG.SHARDS`,
13//!   and `SKEG.VINDEX.LIST` from a running server. See [`stats`].
14//!
15//! The build output layout mirrors a single-shard data directory:
16//!
17//! ```text
18//! <output>/shard-0/vindexes.registry
19//! <output>/shard-0/vindex-<name>/graph.vmn
20//! <output>/shard-0/vindex-<name>/vectors.bin
21//! ```
22
23pub mod inspect;
24pub mod stats;
25
26use std::io;
27use std::path::Path;
28
29use skeg_vector::{MmapVectorSource, VamanaConfig, VamanaIndex};
30
31/// Single-shard subdirectory an offline build writes into. A server started
32/// with `--mode serve` opens the output directory with one shard, so the
33/// layout matches `shard-0/` of an ordinary data directory.
34const SERVE_SHARD_DIR: &str = "shard-0";
35
36/// VINDEX registry file name. The byte layout mirrors `skeg-server`'s
37/// `shard::VINDEX_REGISTRY`: `[u32 count]` then, per entry,
38/// `[u16 name_len][name][u32 dim]`.
39const VINDEX_REGISTRY: &str = "vindexes.registry";
40
41/// On-disk file names produced by `VamanaIndex::save`. Hardcoded here because
42/// they are not exported by `skeg-vector`; they are a stable part of the
43/// format.
44const GRAPH_FILE: &str = "graph.vmn";
45const VECTORS_FILE: &str = "vectors.bin";
46
47/// Summary of a completed offline build.
48pub struct BuildStats {
49    /// Number of vectors indexed.
50    pub n: usize,
51    /// Vector dimension.
52    pub dim: usize,
53    /// Size of `graph.vmn` in bytes.
54    pub graph_bytes: u64,
55    /// Size of `vectors.bin` in bytes.
56    pub vectors_bytes: u64,
57}
58
59fn bad_data(msg: impl Into<String>) -> io::Error {
60    io::Error::new(io::ErrorKind::InvalidData, msg.into())
61}
62
63/// Read a row-major f32 dataset, returning `(data, n, dim)`.
64///
65/// The format is selected by extension: `.npy` (`NumPy` v1.0, little-endian
66/// f32, C order) or `.fbin`/`.bin` (`[u32 n][u32 dim][f32 data]`, the
67/// `big-ann-benchmarks` layout).
68///
69/// # Errors
70///
71/// Returns an error if the file is missing, the extension is unknown, or the
72/// contents do not parse as the expected format.
73pub fn read_vectors(path: &Path) -> io::Result<(Vec<f32>, usize, usize)> {
74    match path.extension().and_then(|e| e.to_str()) {
75        Some("npy") => read_npy(path),
76        Some("fbin" | "bin") => read_fbin(path),
77        other => Err(io::Error::new(
78            io::ErrorKind::InvalidInput,
79            format!(
80                "unsupported input extension {:?} (expected .npy or .fbin)",
81                other.unwrap_or("")
82            ),
83        )),
84    }
85}
86
87/// Parse the `(n, dim)` shape from a `NumPy` header dict string, rejecting
88/// anything that is not a 2-D little-endian float32 array.
89fn parse_npy_shape(header: &str) -> io::Result<(usize, usize)> {
90    if !header.contains("<f4") {
91        return Err(bad_data(
92            "only little-endian float32 (<f4) .npy is supported",
93        ));
94    }
95    let sh = header
96        .find("'shape':")
97        .ok_or_else(|| bad_data("no shape in .npy header"))?;
98    let lp = header[sh..]
99        .find('(')
100        .ok_or_else(|| bad_data("malformed .npy shape"))?
101        + sh
102        + 1;
103    let rp = header[lp..]
104        .find(')')
105        .ok_or_else(|| bad_data("malformed .npy shape"))?
106        + lp;
107    let dims: Vec<usize> = header[lp..rp]
108        .split(',')
109        .filter_map(|s| s.trim().parse().ok())
110        .collect();
111    if dims.len() != 2 {
112        return Err(bad_data("expected a 2-D .npy array"));
113    }
114    Ok((dims[0], dims[1]))
115}
116
117fn read_npy(path: &Path) -> io::Result<(Vec<f32>, usize, usize)> {
118    let bytes = std::fs::read(path)?;
119    if bytes.len() < 10 || &bytes[0..6] != b"\x93NUMPY" {
120        return Err(bad_data("not a .npy file (bad magic)"));
121    }
122    let header_len = u16::from_le_bytes([bytes[8], bytes[9]]) as usize;
123    if 10 + header_len > bytes.len() {
124        return Err(bad_data("truncated .npy header"));
125    }
126    let header = std::str::from_utf8(&bytes[10..10 + header_len])
127        .map_err(|_| bad_data("non-utf8 .npy header"))?;
128    let (n, dim) = parse_npy_shape(header)?;
129    let payload = &bytes[10 + header_len..];
130    let need = n.checked_mul(dim).and_then(|v| v.checked_mul(4));
131    match need {
132        Some(need) if payload.len() >= need => {
133            let data = payload[..need]
134                .chunks_exact(4)
135                .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
136                .collect();
137            Ok((data, n, dim))
138        }
139        _ => Err(bad_data("truncated .npy payload")),
140    }
141}
142
143fn read_fbin(path: &Path) -> io::Result<(Vec<f32>, usize, usize)> {
144    let bytes = std::fs::read(path)?;
145    if bytes.len() < 8 {
146        return Err(bad_data("truncated .fbin header"));
147    }
148    let n = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
149    let dim = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
150    let need = n.checked_mul(dim).and_then(|v| v.checked_mul(4));
151    match need {
152        Some(need) if bytes.len() >= 8 + need => {
153            let data = bytes[8..8 + need]
154                .chunks_exact(4)
155                .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
156                .collect();
157            Ok((data, n, dim))
158        }
159        _ => Err(bad_data("truncated .fbin payload")),
160    }
161}
162
163/// Parse only the format header of `path`, returning `(byte_offset, n, dim)`:
164/// where the f32 payload begins and its shape. The payload itself is not
165/// read, so a large dataset is not pulled into the heap.
166///
167/// # Errors
168///
169/// Returns an error if the extension is unknown or the header does not parse.
170pub fn read_header(path: &Path) -> io::Result<(usize, usize, usize)> {
171    match path.extension().and_then(|e| e.to_str()) {
172        Some("npy") => read_npy_header(path),
173        Some("fbin" | "bin") => read_fbin_header(path),
174        other => Err(io::Error::new(
175            io::ErrorKind::InvalidInput,
176            format!(
177                "unsupported input extension {:?} (expected .npy or .fbin)",
178                other.unwrap_or("")
179            ),
180        )),
181    }
182}
183
184fn read_npy_header(path: &Path) -> io::Result<(usize, usize, usize)> {
185    use std::io::Read;
186    let mut f = std::fs::File::open(path)?;
187    let mut pre = [0u8; 10];
188    f.read_exact(&mut pre)
189        .map_err(|_| bad_data("truncated .npy header"))?;
190    if &pre[0..6] != b"\x93NUMPY" {
191        return Err(bad_data("not a .npy file (bad magic)"));
192    }
193    let header_len = u16::from_le_bytes([pre[8], pre[9]]) as usize;
194    let mut header_bytes = vec![0u8; header_len];
195    f.read_exact(&mut header_bytes)
196        .map_err(|_| bad_data("truncated .npy header"))?;
197    let header =
198        std::str::from_utf8(&header_bytes).map_err(|_| bad_data("non-utf8 .npy header"))?;
199    let (n, dim) = parse_npy_shape(header)?;
200    Ok((10 + header_len, n, dim))
201}
202
203fn read_fbin_header(path: &Path) -> io::Result<(usize, usize, usize)> {
204    use std::io::Read;
205    let mut f = std::fs::File::open(path)?;
206    let mut hdr = [0u8; 8];
207    f.read_exact(&mut hdr)
208        .map_err(|_| bad_data("truncated .fbin header"))?;
209    let n = u32::from_le_bytes(hdr[0..4].try_into().unwrap()) as usize;
210    let dim = u32::from_le_bytes(hdr[4..8].try_into().unwrap()) as usize;
211    Ok((8, n, dim))
212}
213
214/// Save a built index under `output` as a ready-to-serve data directory and
215/// report its on-disk sizes.
216fn finish_build(index: &VamanaIndex, output: &Path, name: &str) -> io::Result<BuildStats> {
217    let shard_dir = output.join(SERVE_SHARD_DIR);
218    let vindex_dir = shard_dir.join(format!("vindex-{name}"));
219    index.save(&vindex_dir)?;
220    write_registry(&shard_dir, name, index.dim())?;
221    let graph_bytes = std::fs::metadata(vindex_dir.join(GRAPH_FILE))?.len();
222    let vectors_bytes = std::fs::metadata(vindex_dir.join(VECTORS_FILE))?.len();
223    Ok(BuildStats {
224        n: index.len(),
225        dim: index.dim(),
226        graph_bytes,
227        vectors_bytes,
228    })
229}
230
231/// Build a Vamana index over an already-loaded dataset and write it under
232/// `output` as a ready-to-serve data directory.
233///
234/// # Errors
235///
236/// Returns an error if the dataset is empty, `vectors.len()` disagrees with
237/// `n * dim`, or any file cannot be written.
238pub fn build_index_from(
239    vectors: Vec<f32>,
240    n: usize,
241    dim: usize,
242    output: &Path,
243    name: &str,
244    config: &VamanaConfig,
245) -> io::Result<BuildStats> {
246    if n == 0 || dim == 0 {
247        return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty dataset"));
248    }
249    if vectors.len() != n * dim {
250        return Err(io::Error::new(
251            io::ErrorKind::InvalidInput,
252            format!("expected {} f32 values, got {}", n * dim, vectors.len()),
253        ));
254    }
255    let ids: Vec<u64> = (0..n as u64).collect();
256    let index = VamanaIndex::build(vectors, ids, dim, config);
257    finish_build(&index, output, name)
258}
259
260/// Read a dataset from `input` and build a ready-to-serve index under
261/// `output`.
262///
263/// The input file is memory-mapped: the build draws vectors from the mapping
264/// rather than copying the whole dataset into the heap, so a dataset close to
265/// or larger than RAM can still be indexed (post-Q10 Step 2, Build
266/// Foundation).
267///
268/// # Errors
269///
270/// Returns an error if the input cannot be read or the index cannot be built.
271pub fn build_index(
272    input: &Path,
273    output: &Path,
274    name: &str,
275    config: &VamanaConfig,
276) -> io::Result<BuildStats> {
277    let (byte_offset, n, dim) = read_header(input)?;
278    if n == 0 || dim == 0 {
279        return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty dataset"));
280    }
281    let source = MmapVectorSource::open(input, byte_offset, n, dim)?;
282    let ids: Vec<u64> = (0..n as u64).collect();
283    let index = VamanaIndex::build_from_source(Box::new(source), ids, config);
284    finish_build(&index, output, name)
285}
286
287/// Write the single-entry VINDEX registry the server reads on startup.
288fn write_registry(shard_dir: &Path, name: &str, dim: usize) -> io::Result<()> {
289    std::fs::create_dir_all(shard_dir)?;
290    let name_len = u16::try_from(name.len())
291        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "index name too long"))?;
292    let dim = u32::try_from(dim)
293        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "dim too large"))?;
294    let mut buf = Vec::new();
295    buf.extend_from_slice(&1u32.to_le_bytes());
296    buf.extend_from_slice(&name_len.to_le_bytes());
297    buf.extend_from_slice(name.as_bytes());
298    buf.extend_from_slice(&dim.to_le_bytes());
299    std::fs::write(shard_dir.join(VINDEX_REGISTRY), &buf)
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use skeg_vector::DiskVamanaIndex;
306    use tempfile::TempDir;
307
308    /// Encode a minimal valid `NumPy` v1.0 array of `n * dim` f32 values.
309    fn make_npy(n: usize, dim: usize, data: &[f32]) -> Vec<u8> {
310        let dict = format!("{{'descr': '<f4', 'fortran_order': False, 'shape': ({n}, {dim}), }}");
311        // 10 fixed bytes + dict + trailing '\n', padded so the total is a
312        // multiple of 64 (NumPy v1.0 alignment requirement).
313        let unpadded = 10 + dict.len() + 1;
314        let pad = (64 - unpadded % 64) % 64;
315        let header_len = dict.len() + 1 + pad;
316        let mut out = Vec::new();
317        out.extend_from_slice(b"\x93NUMPY");
318        out.push(1);
319        out.push(0);
320        out.extend_from_slice(&u16::try_from(header_len).unwrap().to_le_bytes());
321        out.extend_from_slice(dict.as_bytes());
322        out.extend(std::iter::repeat_n(b' ', pad));
323        out.push(b'\n');
324        for &x in data {
325            out.extend_from_slice(&x.to_le_bytes());
326        }
327        out
328    }
329
330    fn make_fbin(n: usize, dim: usize, data: &[f32]) -> Vec<u8> {
331        let mut out = Vec::new();
332        out.extend_from_slice(&u32::try_from(n).unwrap().to_le_bytes());
333        out.extend_from_slice(&u32::try_from(dim).unwrap().to_le_bytes());
334        for &x in data {
335            out.extend_from_slice(&x.to_le_bytes());
336        }
337        out
338    }
339
340    /// Deterministic 8-dim test vector, distinct per seed.
341    #[allow(clippy::cast_precision_loss)]
342    fn tvec(seed: u64) -> Vec<f32> {
343        let mut s = (seed << 1) | 1;
344        (0..8)
345            .map(|_| {
346                s ^= s << 13;
347                s ^= s >> 7;
348                s ^= s << 17;
349                ((s & 0xFFFF) as f32 / 32768.0) - 1.0
350            })
351            .collect()
352    }
353
354    #[test]
355    fn npy_roundtrip() {
356        let dir = TempDir::new().unwrap();
357        let data: Vec<f32> = (0..12u8).map(f32::from).collect();
358        let path = dir.path().join("d.npy");
359        std::fs::write(&path, make_npy(3, 4, &data)).unwrap();
360        let (got, n, dim) = read_vectors(&path).unwrap();
361        assert_eq!((n, dim), (3, 4));
362        assert_eq!(got, data);
363    }
364
365    #[test]
366    fn fbin_roundtrip() {
367        let dir = TempDir::new().unwrap();
368        let data: Vec<f32> = (0..10u8).map(|i| f32::from(i) * 0.5).collect();
369        let path = dir.path().join("d.fbin");
370        std::fs::write(&path, make_fbin(2, 5, &data)).unwrap();
371        let (got, n, dim) = read_vectors(&path).unwrap();
372        assert_eq!((n, dim), (2, 5));
373        assert_eq!(got, data);
374    }
375
376    #[test]
377    fn truncated_fbin_is_rejected() {
378        let dir = TempDir::new().unwrap();
379        let mut bytes = make_fbin(2, 5, &[0.0; 10]);
380        bytes.truncate(bytes.len() - 8);
381        let path = dir.path().join("bad.fbin");
382        std::fs::write(&path, bytes).unwrap();
383        assert!(read_vectors(&path).is_err());
384    }
385
386    #[test]
387    fn unknown_extension_is_rejected() {
388        let dir = TempDir::new().unwrap();
389        let path = dir.path().join("d.csv");
390        std::fs::write(&path, b"1,2,3").unwrap();
391        assert!(read_vectors(&path).is_err());
392    }
393
394    #[test]
395    fn build_produces_a_servable_index() {
396        let n = 64;
397        let dim = 8;
398        let flat: Vec<f32> = (0..n).flat_map(|i| tvec(i as u64 + 1)).collect();
399        let out = TempDir::new().unwrap();
400        let stats =
401            build_index_from(flat, n, dim, out.path(), "docs", &VamanaConfig::default()).unwrap();
402        assert_eq!((stats.n, stats.dim), (n, dim));
403        assert!(stats.graph_bytes > 0 && stats.vectors_bytes > 0);
404
405        // The output must open as a DiskVamanaIndex and recover every vector.
406        let vindex_dir = out.path().join("shard-0").join("vindex-docs");
407        let index = DiskVamanaIndex::open(&vindex_dir).unwrap();
408        assert_eq!(index.len(), n);
409        assert_eq!(index.dim(), dim);
410        let hits = index.search(&tvec(43), 5).unwrap();
411        assert_eq!(hits[0].0, 42, "querying a stored vector returns its id");
412
413        // The registry must name the index so the server recovers it.
414        let registry = out.path().join("shard-0").join("vindexes.registry");
415        assert!(registry.exists());
416    }
417
418    #[test]
419    fn build_rejects_a_length_mismatch() {
420        let out = TempDir::new().unwrap();
421        let err = build_index_from(
422            vec![0.0; 10],
423            3,
424            4,
425            out.path(),
426            "x",
427            &VamanaConfig::default(),
428        );
429        assert!(err.is_err());
430    }
431
432    #[test]
433    fn build_index_mmaps_input_and_serves() {
434        // The file-reading `build_index` memory-maps the input rather than
435        // slurping it; the result must still open and answer queries.
436        let n = 64;
437        let dim = 8;
438        let flat: Vec<f32> = (0..n).flat_map(|i| tvec(i as u64 + 1)).collect();
439        let dir = TempDir::new().unwrap();
440        let input = dir.path().join("data.fbin");
441        std::fs::write(&input, make_fbin(n, dim, &flat)).unwrap();
442
443        let out = TempDir::new().unwrap();
444        let stats = build_index(&input, out.path(), "docs", &VamanaConfig::default()).unwrap();
445        assert_eq!((stats.n, stats.dim), (n, dim));
446
447        let vindex_dir = out.path().join("shard-0").join("vindex-docs");
448        let index = DiskVamanaIndex::open(&vindex_dir).unwrap();
449        assert_eq!(index.len(), n);
450        let hits = index.search(&tvec(43), 5).unwrap();
451        assert_eq!(hits[0].0, 42, "querying a stored vector returns its id");
452    }
453}