use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
use byteorder::{LittleEndian, ReadBytesExt};
use fs2::FileExt;
use memmap2::{Mmap, MmapMut};
use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
use crate::error::{Error, Result};
struct FileLockGuard {
_file: File,
}
impl FileLockGuard {
fn acquire(lock_path: &Path) -> Result<Self> {
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(lock_path)
.map_err(|e| {
Error::IndexLoad(format!("Failed to open lock file {:?}: {}", lock_path, e))
})?;
file.lock_exclusive().map_err(|e| {
Error::IndexLoad(format!("Failed to acquire lock on {:?}: {}", lock_path, e))
})?;
Ok(Self { _file: file })
}
}
impl Drop for FileLockGuard {
fn drop(&mut self) {
let _ = self._file.unlock();
}
}
pub struct MmapArray2F32 {
_mmap: Mmap,
shape: (usize, usize),
data_offset: usize,
}
impl MmapArray2F32 {
pub fn from_raw_file(path: &Path) -> Result<Self> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file)
.map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
};
if mmap.len() < 16 {
return Err(Error::IndexLoad("File too small for header".into()));
}
let mut cursor = std::io::Cursor::new(&mmap[..16]);
let nrows = cursor
.read_i64::<LittleEndian>()
.map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
as usize;
let ncols = cursor
.read_i64::<LittleEndian>()
.map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
as usize;
let expected_size = 16 + nrows * ncols * 4;
if mmap.len() < expected_size {
return Err(Error::IndexLoad(format!(
"File size {} too small for shape ({}, {})",
mmap.len(),
nrows,
ncols
)));
}
Ok(Self {
_mmap: mmap,
shape: (nrows, ncols),
data_offset: 16,
})
}
pub fn shape(&self) -> (usize, usize) {
self.shape
}
pub fn nrows(&self) -> usize {
self.shape.0
}
pub fn ncols(&self) -> usize {
self.shape.1
}
pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
let start = self.data_offset + idx * self.shape.1 * 4;
let bytes = &self._mmap[start..start + self.shape.1 * 4];
let data =
unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
ArrayView1::from_shape(self.shape.1, data).unwrap()
}
pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
let nrows = end - start;
let byte_start = self.data_offset + start * self.shape.1 * 4;
let byte_end = self.data_offset + end * self.shape.1 * 4;
let bytes = &self._mmap[byte_start..byte_end];
let data = unsafe {
std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
};
Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
}
pub fn to_owned(&self) -> Array2<f32> {
self.load_rows(0, self.shape.0)
}
}
pub struct MmapArray2U8 {
_mmap: Mmap,
shape: (usize, usize),
data_offset: usize,
}
impl MmapArray2U8 {
pub fn from_raw_file(path: &Path) -> Result<Self> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file)
.map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
};
if mmap.len() < 16 {
return Err(Error::IndexLoad("File too small for header".into()));
}
let mut cursor = std::io::Cursor::new(&mmap[..16]);
let nrows = cursor
.read_i64::<LittleEndian>()
.map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
as usize;
let ncols = cursor
.read_i64::<LittleEndian>()
.map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
as usize;
let expected_size = 16 + nrows * ncols;
if mmap.len() < expected_size {
return Err(Error::IndexLoad(format!(
"File size {} too small for shape ({}, {})",
mmap.len(),
nrows,
ncols
)));
}
Ok(Self {
_mmap: mmap,
shape: (nrows, ncols),
data_offset: 16,
})
}
pub fn shape(&self) -> (usize, usize) {
self.shape
}
pub fn view(&self) -> ArrayView2<'_, u8> {
let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
ArrayView2::from_shape(self.shape, bytes).unwrap()
}
pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
let nrows = end - start;
let byte_start = self.data_offset + start * self.shape.1;
let byte_end = self.data_offset + end * self.shape.1;
let bytes = &self._mmap[byte_start..byte_end];
Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
}
pub fn to_owned(&self) -> Array2<u8> {
self.load_rows(0, self.shape.0)
}
}
pub struct MmapArray1I64 {
_mmap: Mmap,
len: usize,
data_offset: usize,
}
impl MmapArray1I64 {
pub fn from_raw_file(path: &Path) -> Result<Self> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file)
.map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
};
if mmap.len() < 8 {
return Err(Error::IndexLoad("File too small for header".into()));
}
let mut cursor = std::io::Cursor::new(&mmap[..8]);
let len = cursor
.read_i64::<LittleEndian>()
.map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
as usize;
let expected_size = 8 + len * 8;
if mmap.len() < expected_size {
return Err(Error::IndexLoad(format!(
"File size {} too small for length {}",
mmap.len(),
len
)));
}
Ok(Self {
_mmap: mmap,
len,
data_offset: 8,
})
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn get(&self, idx: usize) -> i64 {
let start = self.data_offset + idx * 8;
let bytes = &self._mmap[start..start + 8];
i64::from_le_bytes(bytes.try_into().unwrap())
}
pub fn to_owned(&self) -> Array1<i64> {
let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
Array1::from_vec(data.to_vec())
}
}
pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
use std::io::Write;
let file = File::create(path)
.map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
let mut writer = std::io::BufWriter::new(file);
let nrows = array.nrows() as i64;
let ncols = array.ncols() as i64;
writer
.write_all(&nrows.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
writer
.write_all(&ncols.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
for val in array.iter() {
writer
.write_all(&val.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
}
writer
.flush()
.map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
Ok(())
}
pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
use std::io::Write;
let file = File::create(path)
.map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
let mut writer = std::io::BufWriter::new(file);
let nrows = array.nrows() as i64;
let ncols = array.ncols() as i64;
writer
.write_all(&nrows.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
writer
.write_all(&ncols.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
for row in array.rows() {
writer
.write_all(row.as_slice().unwrap())
.map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
}
writer
.flush()
.map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
Ok(())
}
pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
use std::io::Write;
let file = File::create(path)
.map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
let mut writer = std::io::BufWriter::new(file);
let len = array.len() as i64;
writer
.write_all(&len.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
for val in array.iter() {
writer
.write_all(&val.to_le_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
}
writer
.flush()
.map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
Ok(())
}
const NPY_MAGIC: &[u8] = b"\x93NUMPY";
fn parse_dtype_from_header(header: &str) -> Result<String> {
let descr_start = header
.find("'descr':")
.ok_or_else(|| Error::IndexLoad("No descr in NPY header".into()))?;
let after_descr = &header[descr_start + 8..];
let quote_start = after_descr
.find('\'')
.ok_or_else(|| Error::IndexLoad("No dtype quote in NPY header".into()))?;
let rest = &after_descr[quote_start + 1..];
let quote_end = rest
.find('\'')
.ok_or_else(|| Error::IndexLoad("Unclosed dtype quote in NPY header".into()))?;
Ok(rest[..quote_end].to_string())
}
pub fn detect_npy_dtype(path: &Path) -> Result<String> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file)
.map_err(|e| Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e)))?
};
if mmap.len() < 10 {
return Err(Error::IndexLoad("NPY file too small".into()));
}
if &mmap[..6] != NPY_MAGIC {
return Err(Error::IndexLoad("Invalid NPY magic".into()));
}
let major_version = mmap[6];
let header_len = if major_version == 1 {
u16::from_le_bytes([mmap[8], mmap[9]]) as usize
} else if major_version == 2 {
if mmap.len() < 12 {
return Err(Error::IndexLoad("NPY v2 file too small".into()));
}
u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
} else {
return Err(Error::IndexLoad(format!(
"Unsupported NPY version: {}",
major_version
)));
};
let header_start = if major_version == 1 { 10 } else { 12 };
let header_end = header_start + header_len;
if mmap.len() < header_end {
return Err(Error::IndexLoad("NPY header exceeds file size".into()));
}
let header_str = std::str::from_utf8(&mmap[header_start..header_end])
.map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
parse_dtype_from_header(header_str)
}
pub fn convert_f16_to_f32_npy(path: &Path) -> Result<()> {
use half::f16;
use std::io::Read;
let mut file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
let mut data = Vec::new();
file.read_to_end(&mut data)
.map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
if data.len() < 10 || &data[..6] != NPY_MAGIC {
return Err(Error::IndexLoad("Invalid NPY file".into()));
}
let major_version = data[6];
let header_start = if major_version == 1 { 10 } else { 12 };
let header_len = if major_version == 1 {
u16::from_le_bytes([data[8], data[9]]) as usize
} else {
u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
};
let header_end = header_start + header_len;
let header_str = std::str::from_utf8(&data[header_start..header_end])
.map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
let shape = parse_shape_from_header(header_str)?;
let total_elements: usize = shape.iter().product();
let f16_data = &data[header_end..header_end + total_elements * 2];
let mut f32_data = Vec::with_capacity(total_elements * 4);
for chunk in f16_data.chunks(2) {
let f16_val = f16::from_le_bytes([chunk[0], chunk[1]]);
let f32_val: f32 = f16_val.to_f32();
f32_data.extend_from_slice(&f32_val.to_le_bytes());
}
let file = File::create(path)
.map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
let mut writer = BufWriter::new(file);
if shape.len() == 1 {
write_npy_header_1d(&mut writer, shape[0], "<f4")?;
} else if shape.len() == 2 {
write_npy_header_2d(&mut writer, shape[0], shape[1], "<f4")?;
} else {
return Err(Error::IndexLoad("Unsupported shape dimensions".into()));
}
writer
.write_all(&f32_data)
.map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
writer.flush()?;
Ok(())
}
pub fn convert_i64_to_i32_npy(path: &Path) -> Result<()> {
use std::io::Read;
let mut file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
let mut data = Vec::new();
file.read_to_end(&mut data)
.map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
if data.len() < 10 || &data[..6] != NPY_MAGIC {
return Err(Error::IndexLoad("Invalid NPY file".into()));
}
let major_version = data[6];
let header_start = if major_version == 1 { 10 } else { 12 };
let header_len = if major_version == 1 {
u16::from_le_bytes([data[8], data[9]]) as usize
} else {
u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
};
let header_end = header_start + header_len;
let header_str = std::str::from_utf8(&data[header_start..header_end])
.map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
let shape = parse_shape_from_header(header_str)?;
if shape.len() != 1 {
return Err(Error::IndexLoad("Expected 1D array for i64->i32".into()));
}
let len = shape[0];
let i64_data = &data[header_end..header_end + len * 8];
let mut i32_data = Vec::with_capacity(len * 4);
for chunk in i64_data.chunks(8) {
let i64_val = i64::from_le_bytes(chunk.try_into().unwrap());
let i32_val = i64_val as i32;
i32_data.extend_from_slice(&i32_val.to_le_bytes());
}
let file = File::create(path)
.map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
let mut writer = BufWriter::new(file);
write_npy_header_1d(&mut writer, len, "<i4")?;
writer
.write_all(&i32_data)
.map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
writer.flush()?;
Ok(())
}
pub fn normalize_u8_npy(path: &Path) -> Result<()> {
use std::io::Read;
let mut file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
let mut data = Vec::new();
file.read_to_end(&mut data)
.map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
if data.len() < 10 || &data[..6] != NPY_MAGIC {
return Err(Error::IndexLoad("Invalid NPY file".into()));
}
let major_version = data[6];
let header_start = if major_version == 1 { 10 } else { 12 };
let header_len = if major_version == 1 {
u16::from_le_bytes([data[8], data[9]]) as usize
} else {
u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
};
let header_end = header_start + header_len;
let header_str = std::str::from_utf8(&data[header_start..header_end])
.map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
let shape = parse_shape_from_header(header_str)?;
if shape.len() != 2 {
return Err(Error::IndexLoad(
"Expected 2D array for u8 normalization".into(),
));
}
let nrows = shape[0];
let ncols = shape[1];
let u8_data = &data[header_end..header_end + nrows * ncols];
let new_file = File::create(path)
.map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
let mut writer = BufWriter::new(new_file);
write_npy_header_2d(&mut writer, nrows, ncols, "|u1")?;
writer
.write_all(u8_data)
.map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
writer.flush()?;
Ok(())
}
fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
if mmap.len() < 10 {
return Err(Error::IndexLoad("NPY file too small".into()));
}
if &mmap[..6] != NPY_MAGIC {
return Err(Error::IndexLoad("Invalid NPY magic".into()));
}
let major_version = mmap[6];
let _minor_version = mmap[7];
let header_len = if major_version == 1 {
u16::from_le_bytes([mmap[8], mmap[9]]) as usize
} else if major_version == 2 {
if mmap.len() < 12 {
return Err(Error::IndexLoad("NPY v2 file too small".into()));
}
u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
} else {
return Err(Error::IndexLoad(format!(
"Unsupported NPY version: {}",
major_version
)));
};
let header_start = if major_version == 1 { 10 } else { 12 };
let header_end = header_start + header_len;
if mmap.len() < header_end {
return Err(Error::IndexLoad("NPY header exceeds file size".into()));
}
let header_str = std::str::from_utf8(&mmap[header_start..header_end])
.map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
let shape = parse_shape_from_header(header_str)?;
let fortran_order = header_str.contains("'fortran_order': True");
Ok((shape, header_end, fortran_order))
}
fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
let shape_start = header
.find("'shape':")
.ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
let after_shape = &header[shape_start + 8..];
let paren_start = after_shape
.find('(')
.ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
let paren_end = after_shape
.find(')')
.ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
let shape_content = &after_shape[paren_start + 1..paren_end];
let mut shape = Vec::new();
for part in shape_content.split(',') {
let trimmed = part.trim();
if !trimmed.is_empty() {
let dim: usize = trimmed.parse().map_err(|e| {
Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
})?;
shape.push(dim);
}
}
Ok(shape)
}
pub struct MmapNpyArray1I64 {
_mmap: Mmap,
len: usize,
data_offset: usize,
}
impl MmapNpyArray1I64 {
pub fn empty() -> Self {
let mmap = MmapMut::map_anon(1)
.expect("failed to create anonymous mmap")
.make_read_only()
.expect("failed to make anonymous mmap read-only");
Self {
_mmap: mmap,
len: 0,
data_offset: 0,
}
}
pub fn from_npy_file(path: &Path) -> Result<Self> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file).map_err(|e| {
Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
})?
};
let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
if shape.is_empty() {
return Err(Error::IndexLoad("Empty shape in NPY file".into()));
}
let len = shape[0];
let expected_size = data_offset + len * 8;
if mmap.len() < expected_size {
return Err(Error::IndexLoad(format!(
"NPY file size {} too small for {} elements",
mmap.len(),
len
)));
}
Ok(Self {
_mmap: mmap,
len,
data_offset,
})
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
let count = end - start;
let mut result = Vec::with_capacity(count);
for i in start..end {
result.push(self.get(i));
}
result
}
pub fn get(&self, idx: usize) -> i64 {
let start = self.data_offset + idx * 8;
let bytes = &self._mmap[start..start + 8];
i64::from_le_bytes(bytes.try_into().unwrap())
}
}
pub struct MmapNpyArray2F32 {
_mmap: Mmap,
shape: (usize, usize),
data_offset: usize,
}
impl MmapNpyArray2F32 {
pub fn from_npy_file(path: &Path) -> Result<Self> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file).map_err(|e| {
Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
})?
};
let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
if shape_vec.len() != 2 {
return Err(Error::IndexLoad(format!(
"Expected 2D array, got {}D",
shape_vec.len()
)));
}
let shape = (shape_vec[0], shape_vec[1]);
let expected_size = data_offset + shape.0 * shape.1 * 4;
if mmap.len() < expected_size {
return Err(Error::IndexLoad(format!(
"NPY file size {} too small for shape {:?}",
mmap.len(),
shape
)));
}
Ok(Self {
_mmap: mmap,
shape,
data_offset,
})
}
pub fn shape(&self) -> (usize, usize) {
self.shape
}
pub fn nrows(&self) -> usize {
self.shape.0
}
pub fn ncols(&self) -> usize {
self.shape.1
}
pub fn view(&self) -> ArrayView2<'_, f32> {
let byte_start = self.data_offset;
let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
let bytes = &self._mmap[byte_start..byte_end];
let data = unsafe {
std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
};
ArrayView2::from_shape(self.shape, data).unwrap()
}
pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
let byte_start = self.data_offset + idx * self.shape.1 * 4;
let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
let data =
unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
ArrayView1::from_shape(self.shape.1, data).unwrap()
}
pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
let nrows = end - start;
let byte_start = self.data_offset + start * self.shape.1 * 4;
let byte_end = self.data_offset + end * self.shape.1 * 4;
let bytes = &self._mmap[byte_start..byte_end];
let data = unsafe {
std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
};
ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
}
pub fn to_owned(&self) -> Array2<f32> {
self.view().to_owned()
}
}
pub struct MmapNpyArray2U8 {
_mmap: Mmap,
shape: (usize, usize),
data_offset: usize,
}
impl MmapNpyArray2U8 {
pub fn empty() -> Self {
let mmap = MmapMut::map_anon(1)
.expect("failed to create anonymous mmap")
.make_read_only()
.expect("failed to make anonymous mmap read-only");
Self {
_mmap: mmap,
shape: (0, 0),
data_offset: 0,
}
}
pub fn from_npy_file(path: &Path) -> Result<Self> {
let file = File::open(path)
.map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
let mmap = unsafe {
Mmap::map(&file).map_err(|e| {
Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
})?
};
let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
if shape_vec.len() != 2 {
return Err(Error::IndexLoad(format!(
"Expected 2D array, got {}D",
shape_vec.len()
)));
}
let shape = (shape_vec[0], shape_vec[1]);
let expected_size = data_offset + shape.0 * shape.1;
if mmap.len() < expected_size {
return Err(Error::IndexLoad(format!(
"NPY file size {} too small for shape {:?}",
mmap.len(),
shape
)));
}
Ok(Self {
_mmap: mmap,
shape,
data_offset,
})
}
pub fn shape(&self) -> (usize, usize) {
self.shape
}
pub fn nrows(&self) -> usize {
self.shape.0
}
pub fn ncols(&self) -> usize {
self.shape.1
}
pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
let nrows = end - start;
let byte_start = self.data_offset + start * self.shape.1;
let byte_end = self.data_offset + end * self.shape.1;
let bytes = &self._mmap[byte_start..byte_end];
ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
}
pub fn view(&self) -> ArrayView2<'_, u8> {
self.slice_rows(0, self.shape.0)
}
pub fn row(&self, idx: usize) -> &[u8] {
let byte_start = self.data_offset + idx * self.shape.1;
let byte_end = byte_start + self.shape.1;
&self._mmap[byte_start..byte_end]
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChunkManifestEntry {
pub rows: usize,
pub mtime: f64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct MergeManifest {
pub chunks: HashMap<String, ChunkManifestEntry>,
#[serde(default)]
pub padding_rows: usize,
#[serde(default)]
pub num_chunks: usize,
#[serde(default)]
pub metadata_mtime: f64,
#[serde(default)]
pub total_rows: usize,
#[serde(default)]
pub ncols: usize,
}
pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
fn load_merge_manifest(manifest_path: &Path) -> Option<MergeManifest> {
if manifest_path.exists() {
if let Ok(file) = File::open(manifest_path) {
let reader = BufReader::new(file);
if let Ok(manifest) = serde_json::from_reader::<_, MergeManifest>(reader) {
return Some(manifest);
}
if let Ok(file) = File::open(manifest_path) {
if let Ok(chunks) =
serde_json::from_reader::<_, ChunkManifest>(BufReader::new(file))
{
return Some(MergeManifest {
chunks,
padding_rows: 0,
total_rows: 0,
ncols: 0,
num_chunks: 0,
metadata_mtime: 0.0,
});
}
}
}
}
None
}
fn save_merge_manifest(manifest_path: &Path, manifest: &MergeManifest) -> Result<()> {
let temp_path = manifest_path.with_extension("manifest.json.tmp");
let file = File::create(&temp_path)
.map_err(|e| Error::IndexLoad(format!("Failed to create temp manifest: {}", e)))?;
let mut writer = BufWriter::new(file);
serde_json::to_writer(&mut writer, manifest)
.map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
writer
.flush()
.map_err(|e| Error::IndexLoad(format!("Failed to flush manifest: {}", e)))?;
writer
.into_inner()
.map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?
.sync_all()
.map_err(|e| Error::IndexLoad(format!("Failed to sync manifest: {}", e)))?;
fs::rename(&temp_path, manifest_path)
.map_err(|e| Error::IndexLoad(format!("Failed to rename manifest: {}", e)))?;
Ok(())
}
fn get_mtime(path: &Path) -> Result<f64> {
let metadata = fs::metadata(path)
.map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
let mtime = metadata
.modified()
.map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
let duration = mtime
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
Ok(duration.as_secs_f64())
}
fn npy_header_layout(header_dict: &str) -> (usize, usize) {
let header_len = header_dict.len();
let padding = (64 - ((10 + header_len) % 64)) % 64;
let total = 10 + header_len + padding + 1; (padding, total)
}
fn npy_header_dict_1d(len: usize, dtype: &str) -> String {
format!(
"{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
dtype, len
)
}
fn npy_header_dict_2d(nrows: usize, ncols: usize, dtype: &str) -> String {
format!(
"{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
dtype, nrows, ncols
)
}
fn npy_header_size_1d(len: usize, dtype: &str) -> usize {
let dict = npy_header_dict_1d(len, dtype);
npy_header_layout(&dict).1
}
fn npy_header_size_2d(nrows: usize, ncols: usize, dtype: &str) -> usize {
let dict = npy_header_dict_2d(nrows, ncols, dtype);
npy_header_layout(&dict).1
}
fn write_npy_header(writer: &mut impl Write, header_dict: &str) -> Result<usize> {
let (padding, total) = npy_header_layout(header_dict);
let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
writer
.write_all(NPY_MAGIC)
.map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
writer
.write_all(&[1, 0])
.map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
writer
.write_all(&header_len_bytes)
.map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
writer
.write_all(padded_header.as_bytes())
.map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
Ok(total)
}
fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
write_npy_header(writer, &npy_header_dict_1d(len, dtype))
}
fn write_npy_header_2d(
writer: &mut impl Write,
nrows: usize,
ncols: usize,
dtype: &str,
) -> Result<usize> {
write_npy_header(writer, &npy_header_dict_2d(nrows, ncols, dtype))
}
struct ChunkInfo {
path: std::path::PathBuf,
filename: String,
rows: usize,
mtime: f64,
}
pub fn merge_codes_chunks(
index_path: &Path,
num_chunks: usize,
padding_rows: usize,
) -> Result<std::path::PathBuf> {
use ndarray_npy::ReadNpyExt;
let merged_path = index_path.join("merged_codes.npy");
let manifest_path = index_path.join("merged_codes.manifest.json");
let temp_path = index_path.join("merged_codes.npy.tmp");
let lock_path = index_path.join("merged_codes.lock");
let metadata_json_path = index_path.join("metadata.json");
let current_metadata_mtime = get_mtime(&metadata_json_path).unwrap_or(0.0);
if let Some(ref manifest) = load_merge_manifest(&manifest_path) {
let mtime_matches = manifest.metadata_mtime > 0.0
&& (manifest.metadata_mtime - current_metadata_mtime).abs() < 0.001;
if manifest.num_chunks == num_chunks
&& manifest.padding_rows == padding_rows
&& manifest.chunks.len() == num_chunks
&& manifest.total_rows > 0
&& mtime_matches
&& merged_path.exists()
{
if let Ok(meta) = std::fs::metadata(&merged_path) {
let expected_size = npy_header_size_1d(manifest.total_rows, "<i8")
+ manifest.total_rows * std::mem::size_of::<i64>();
if meta.len() == expected_size as u64 {
return Ok(merged_path);
}
}
}
}
let _lock = FileLockGuard::acquire(&lock_path)?;
let old_manifest = load_merge_manifest(&manifest_path);
let mut chunks: Vec<ChunkInfo> = Vec::new();
let mut total_rows = 0usize;
let mut chain_broken = false;
for i in 0..num_chunks {
let filename = format!("{}.codes.npy", i);
let path = index_path.join(&filename);
if path.exists() {
let mtime = get_mtime(&path)?;
let file = File::open(&path)?;
let arr: Array1<i64> = Array1::read_npy(file)?;
let rows = arr.len();
if rows > 0 {
total_rows += rows;
let is_clean = if let Some(ref manifest) = old_manifest {
manifest
.chunks
.get(&filename)
.is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
} else {
false
};
if !is_clean {
chain_broken = true;
}
chunks.push(ChunkInfo {
path,
filename,
rows,
mtime,
});
}
}
}
if total_rows == 0 {
return Err(Error::IndexLoad("No data to merge".into()));
}
let final_rows = total_rows + padding_rows;
let padding_changed = old_manifest
.as_ref()
.map(|m| m.padding_rows != padding_rows)
.unwrap_or(true);
let total_rows_mismatch = old_manifest
.as_ref()
.map(|m| m.total_rows != final_rows)
.unwrap_or(true);
let needs_full_rewrite =
!merged_path.exists() || chain_broken || padding_changed || total_rows_mismatch;
if needs_full_rewrite {
let file = File::create(&temp_path)
.map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
let mut writer = BufWriter::new(file);
let header_size = write_npy_header_1d(&mut writer, final_rows, "<i8")?;
let mut written_rows = 0usize;
for chunk in &chunks {
let file = File::open(&chunk.path)?;
let arr: Array1<i64> = Array1::read_npy(file)?;
for &val in arr.iter() {
writer.write_all(&val.to_le_bytes())?;
}
written_rows += arr.len();
}
for _ in 0..padding_rows {
writer.write_all(&0i64.to_le_bytes())?;
}
written_rows += padding_rows;
writer
.flush()
.map_err(|e| Error::IndexLoad(format!("Failed to flush merged file: {}", e)))?;
let file = writer
.into_inner()
.map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
file.sync_all()
.map_err(|e| Error::IndexLoad(format!("Failed to sync merged file to disk: {}", e)))?;
let expected_size = header_size + written_rows * 8;
let actual_size = fs::metadata(&temp_path)
.map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
.len() as usize;
if actual_size != expected_size {
let _ = fs::remove_file(&temp_path);
return Err(Error::IndexLoad(format!(
"Merged codes file size mismatch: expected {} bytes, got {} bytes",
expected_size, actual_size
)));
}
fs::rename(&temp_path, &merged_path)
.map_err(|e| Error::IndexLoad(format!("Failed to rename merged file: {}", e)))?;
} else {
if merged_path.exists() {
let file_size = fs::metadata(&merged_path)
.map_err(|e| {
Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
})?
.len() as usize;
let min_expected_size = 64 + final_rows * 8;
if file_size < min_expected_size {
let _ = fs::remove_file(&merged_path);
let _ = fs::remove_file(&manifest_path);
drop(_lock);
return merge_codes_chunks(index_path, num_chunks, padding_rows);
}
}
}
let mut chunk_map = HashMap::new();
for chunk in &chunks {
chunk_map.insert(
chunk.filename.clone(),
ChunkManifestEntry {
rows: chunk.rows,
mtime: chunk.mtime,
},
);
}
let new_manifest = MergeManifest {
chunks: chunk_map,
padding_rows,
total_rows: final_rows,
ncols: 0, num_chunks,
metadata_mtime: current_metadata_mtime,
};
save_merge_manifest(&manifest_path, &new_manifest)?;
Ok(merged_path)
}
pub fn merge_residuals_chunks(
index_path: &Path,
num_chunks: usize,
padding_rows: usize,
) -> Result<std::path::PathBuf> {
use ndarray_npy::ReadNpyExt;
let merged_path = index_path.join("merged_residuals.npy");
let manifest_path = index_path.join("merged_residuals.manifest.json");
let temp_path = index_path.join("merged_residuals.npy.tmp");
let lock_path = index_path.join("merged_residuals.lock");
let metadata_json_path = index_path.join("metadata.json");
let current_metadata_mtime = get_mtime(&metadata_json_path).unwrap_or(0.0);
if let Some(ref manifest) = load_merge_manifest(&manifest_path) {
if manifest.num_chunks == num_chunks
&& manifest.padding_rows == padding_rows
&& manifest.chunks.len() == num_chunks
&& manifest.total_rows > 0
&& manifest.ncols > 0
&& manifest.metadata_mtime > 0.0
&& (manifest.metadata_mtime - current_metadata_mtime).abs() < 0.001
&& merged_path.exists()
{
if let Ok(meta) = std::fs::metadata(&merged_path) {
let expected_size = npy_header_size_2d(manifest.total_rows, manifest.ncols, "|u1")
+ manifest.total_rows * manifest.ncols;
if meta.len() == expected_size as u64 {
return Ok(merged_path);
}
}
}
}
let _lock = FileLockGuard::acquire(&lock_path)?;
let old_manifest = load_merge_manifest(&manifest_path);
let mut chunks: Vec<ChunkInfo> = Vec::new();
let mut total_rows = 0usize;
let mut ncols = 0usize;
let mut chain_broken = false;
for i in 0..num_chunks {
let filename = format!("{}.residuals.npy", i);
let path = index_path.join(&filename);
if path.exists() {
let mtime = get_mtime(&path)?;
let file = File::open(&path)?;
let arr: Array2<u8> = Array2::read_npy(file)?;
let rows = arr.nrows();
ncols = arr.ncols();
if rows > 0 {
total_rows += rows;
let is_clean = if let Some(ref manifest) = old_manifest {
manifest
.chunks
.get(&filename)
.is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
} else {
false
};
if !is_clean {
chain_broken = true;
}
chunks.push(ChunkInfo {
path,
filename,
rows,
mtime,
});
}
}
}
if total_rows == 0 || ncols == 0 {
return Err(Error::IndexLoad("No residual data to merge".into()));
}
let final_rows = total_rows + padding_rows;
let padding_changed = old_manifest
.as_ref()
.map(|m| m.padding_rows != padding_rows)
.unwrap_or(true);
let total_rows_mismatch = old_manifest
.as_ref()
.map(|m| m.total_rows != final_rows)
.unwrap_or(true);
let ncols_mismatch = old_manifest
.as_ref()
.map(|m| m.ncols != ncols && m.ncols != 0)
.unwrap_or(false);
let needs_full_rewrite = !merged_path.exists()
|| chain_broken
|| padding_changed
|| total_rows_mismatch
|| ncols_mismatch;
if needs_full_rewrite {
let file = File::create(&temp_path)
.map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
let mut writer = BufWriter::new(file);
let header_size = write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
let mut written_rows = 0usize;
for chunk in &chunks {
let file = File::open(&chunk.path)?;
let arr: Array2<u8> = Array2::read_npy(file)?;
for row in arr.rows() {
writer.write_all(row.as_slice().unwrap())?;
}
written_rows += arr.nrows();
}
let zero_row = vec![0u8; ncols];
for _ in 0..padding_rows {
writer.write_all(&zero_row)?;
}
written_rows += padding_rows;
writer
.flush()
.map_err(|e| Error::IndexLoad(format!("Failed to flush merged residuals: {}", e)))?;
let file = writer
.into_inner()
.map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
file.sync_all().map_err(|e| {
Error::IndexLoad(format!("Failed to sync merged residuals to disk: {}", e))
})?;
let expected_size = header_size + written_rows * ncols;
let actual_size = fs::metadata(&temp_path)
.map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
.len() as usize;
if actual_size != expected_size {
let _ = fs::remove_file(&temp_path);
return Err(Error::IndexLoad(format!(
"Merged residuals file size mismatch: expected {} bytes, got {} bytes",
expected_size, actual_size
)));
}
fs::rename(&temp_path, &merged_path)
.map_err(|e| Error::IndexLoad(format!("Failed to rename merged residuals: {}", e)))?;
} else {
if merged_path.exists() {
let file_size = fs::metadata(&merged_path)
.map_err(|e| {
Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
})?
.len() as usize;
let min_expected_size = 64 + final_rows * ncols;
if file_size < min_expected_size {
let _ = fs::remove_file(&merged_path);
let _ = fs::remove_file(&manifest_path);
drop(_lock);
return merge_residuals_chunks(index_path, num_chunks, padding_rows);
}
}
}
let mut chunk_map = HashMap::new();
for chunk in &chunks {
chunk_map.insert(
chunk.filename.clone(),
ChunkManifestEntry {
rows: chunk.rows,
mtime: chunk.mtime,
},
);
}
let new_manifest = MergeManifest {
chunks: chunk_map,
padding_rows,
total_rows: final_rows,
ncols,
num_chunks,
metadata_mtime: current_metadata_mtime,
};
save_merge_manifest(&manifest_path, &new_manifest)?;
Ok(merged_path)
}
pub fn clear_merged_files(index_path: &Path) -> Result<()> {
let codes_lock_path = index_path.join("merged_codes.lock");
let residuals_lock_path = index_path.join("merged_residuals.lock");
let _codes_lock = FileLockGuard::acquire(&codes_lock_path)?;
let _residuals_lock = FileLockGuard::acquire(&residuals_lock_path)?;
let files_to_remove = [
"merged_codes.npy",
"merged_codes.npy.tmp",
"merged_codes.manifest.json",
"merged_codes.manifest.json.tmp",
"merged_residuals.npy",
"merged_residuals.npy.tmp",
"merged_residuals.manifest.json",
"merged_residuals.manifest.json.tmp",
];
for filename in files_to_remove {
let path = index_path.join(filename);
if path.exists() {
fs::remove_file(&path)
.map_err(|e| Error::IndexLoad(format!("Failed to remove {}: {}", filename, e)))?;
}
}
Ok(())
}
pub fn convert_fastplaid_to_nextplaid(index_path: &Path) -> Result<bool> {
let mut converted = false;
let float_files = [
"centroids.npy",
"avg_residual.npy",
"bucket_cutoffs.npy",
"bucket_weights.npy",
];
for filename in float_files {
let path = index_path.join(filename);
if path.exists() {
let dtype = detect_npy_dtype(&path)?;
if dtype == "<f2" {
eprintln!(" Converting {} from float16 to float32", filename);
convert_f16_to_f32_npy(&path)?;
converted = true;
}
}
}
let ivf_lengths_path = index_path.join("ivf_lengths.npy");
if ivf_lengths_path.exists() {
let dtype = detect_npy_dtype(&ivf_lengths_path)?;
if dtype == "<i8" {
eprintln!(" Converting ivf_lengths.npy from int64 to int32");
convert_i64_to_i32_npy(&ivf_lengths_path)?;
converted = true;
}
}
for entry in fs::read_dir(index_path)? {
let entry = entry?;
let filename = entry.file_name().to_string_lossy().to_string();
if filename.ends_with(".residuals.npy") {
let path = entry.path();
let dtype = detect_npy_dtype(&path)?;
if dtype == "<u1" {
eprintln!(
" Normalizing {} dtype descriptor from <u1 to |u1",
filename
);
normalize_u8_npy(&path)?;
converted = true;
}
}
}
Ok(converted)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_mmap_array2_f32() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(&3i64.to_le_bytes()).unwrap();
file.write_all(&2i64.to_le_bytes()).unwrap();
for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
file.write_all(&val.to_le_bytes()).unwrap();
}
file.flush().unwrap();
let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
assert_eq!(mmap.shape(), (3, 2));
let row0 = mmap.row(0);
assert_eq!(row0[0], 1.0);
assert_eq!(row0[1], 2.0);
let owned = mmap.to_owned();
assert_eq!(owned[[2, 0]], 5.0);
assert_eq!(owned[[2, 1]], 6.0);
}
#[test]
fn test_mmap_array1_i64() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(&4i64.to_le_bytes()).unwrap();
for val in [10i64, 20, 30, 40] {
file.write_all(&val.to_le_bytes()).unwrap();
}
file.flush().unwrap();
let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
assert_eq!(mmap.len(), 4);
assert_eq!(mmap.get(0), 10);
assert_eq!(mmap.get(3), 40);
let owned = mmap.to_owned();
assert_eq!(owned[1], 20);
assert_eq!(owned[2], 30);
}
#[test]
fn test_write_read_roundtrip() {
let file = NamedTempFile::new().unwrap();
let path = file.path();
let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
write_array2_f32(&array, path).unwrap();
let mmap = MmapArray2F32::from_raw_file(path).unwrap();
let loaded = mmap.to_owned();
assert_eq!(array, loaded);
}
}