1use std::collections::HashMap;
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, BufWriter, Write};
14use std::path::Path;
15
16use byteorder::{LittleEndian, ReadBytesExt};
17use fs2::FileExt;
18use memmap2::{Mmap, MmapMut};
19use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
20
21use crate::error::{Error, Result};
22
23struct FileLockGuard {
26 _file: File,
27}
28
29impl FileLockGuard {
30 fn acquire(lock_path: &Path) -> Result<Self> {
34 let file = std::fs::OpenOptions::new()
35 .read(true)
36 .write(true)
37 .create(true)
38 .truncate(false)
39 .open(lock_path)
40 .map_err(|e| {
41 Error::IndexLoad(format!("Failed to open lock file {:?}: {}", lock_path, e))
42 })?;
43
44 file.lock_exclusive().map_err(|e| {
45 Error::IndexLoad(format!("Failed to acquire lock on {:?}: {}", lock_path, e))
46 })?;
47
48 Ok(Self { _file: file })
49 }
50}
51
52impl Drop for FileLockGuard {
53 fn drop(&mut self) {
54 let _ = self._file.unlock();
56 }
57}
58
59pub struct MmapArray2F32 {
63 _mmap: Mmap,
64 shape: (usize, usize),
65 data_offset: usize,
66}
67
68impl MmapArray2F32 {
69 pub fn from_raw_file(path: &Path) -> Result<Self> {
76 let file = File::open(path)
77 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
78
79 let mmap = unsafe {
80 Mmap::map(&file)
81 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
82 };
83
84 if mmap.len() < 16 {
85 return Err(Error::IndexLoad("File too small for header".into()));
86 }
87
88 let mut cursor = std::io::Cursor::new(&mmap[..16]);
90 let nrows = cursor
91 .read_i64::<LittleEndian>()
92 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
93 as usize;
94 let ncols = cursor
95 .read_i64::<LittleEndian>()
96 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
97 as usize;
98
99 let expected_size = 16 + nrows * ncols * 4;
100 if mmap.len() < expected_size {
101 return Err(Error::IndexLoad(format!(
102 "File size {} too small for shape ({}, {})",
103 mmap.len(),
104 nrows,
105 ncols
106 )));
107 }
108
109 Ok(Self {
110 _mmap: mmap,
111 shape: (nrows, ncols),
112 data_offset: 16,
113 })
114 }
115
116 pub fn shape(&self) -> (usize, usize) {
118 self.shape
119 }
120
121 pub fn nrows(&self) -> usize {
123 self.shape.0
124 }
125
126 pub fn ncols(&self) -> usize {
128 self.shape.1
129 }
130
131 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
133 let start = self.data_offset + idx * self.shape.1 * 4;
134 let bytes = &self._mmap[start..start + self.shape.1 * 4];
135
136 let data =
138 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
139
140 ArrayView1::from_shape(self.shape.1, data).unwrap()
141 }
142
143 pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
145 let nrows = end - start;
146 let byte_start = self.data_offset + start * self.shape.1 * 4;
147 let byte_end = self.data_offset + end * self.shape.1 * 4;
148 let bytes = &self._mmap[byte_start..byte_end];
149
150 let data = unsafe {
152 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
153 };
154
155 Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
156 }
157
158 pub fn to_owned(&self) -> Array2<f32> {
160 self.load_rows(0, self.shape.0)
161 }
162}
163
164pub struct MmapArray2U8 {
166 _mmap: Mmap,
167 shape: (usize, usize),
168 data_offset: usize,
169}
170
171impl MmapArray2U8 {
172 pub fn from_raw_file(path: &Path) -> Result<Self> {
174 let file = File::open(path)
175 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
176
177 let mmap = unsafe {
178 Mmap::map(&file)
179 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
180 };
181
182 if mmap.len() < 16 {
183 return Err(Error::IndexLoad("File too small for header".into()));
184 }
185
186 let mut cursor = std::io::Cursor::new(&mmap[..16]);
187 let nrows = cursor
188 .read_i64::<LittleEndian>()
189 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
190 as usize;
191 let ncols = cursor
192 .read_i64::<LittleEndian>()
193 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
194 as usize;
195
196 let expected_size = 16 + nrows * ncols;
197 if mmap.len() < expected_size {
198 return Err(Error::IndexLoad(format!(
199 "File size {} too small for shape ({}, {})",
200 mmap.len(),
201 nrows,
202 ncols
203 )));
204 }
205
206 Ok(Self {
207 _mmap: mmap,
208 shape: (nrows, ncols),
209 data_offset: 16,
210 })
211 }
212
213 pub fn shape(&self) -> (usize, usize) {
215 self.shape
216 }
217
218 pub fn view(&self) -> ArrayView2<'_, u8> {
220 let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
221 ArrayView2::from_shape(self.shape, bytes).unwrap()
222 }
223
224 pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
226 let nrows = end - start;
227 let byte_start = self.data_offset + start * self.shape.1;
228 let byte_end = self.data_offset + end * self.shape.1;
229 let bytes = &self._mmap[byte_start..byte_end];
230
231 Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
232 }
233
234 pub fn to_owned(&self) -> Array2<u8> {
236 self.load_rows(0, self.shape.0)
237 }
238}
239
240pub struct MmapArray1I64 {
242 _mmap: Mmap,
243 len: usize,
244 data_offset: usize,
245}
246
247impl MmapArray1I64 {
248 pub fn from_raw_file(path: &Path) -> Result<Self> {
250 let file = File::open(path)
251 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
252
253 let mmap = unsafe {
254 Mmap::map(&file)
255 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
256 };
257
258 if mmap.len() < 8 {
259 return Err(Error::IndexLoad("File too small for header".into()));
260 }
261
262 let mut cursor = std::io::Cursor::new(&mmap[..8]);
263 let len = cursor
264 .read_i64::<LittleEndian>()
265 .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
266 as usize;
267
268 let expected_size = 8 + len * 8;
269 if mmap.len() < expected_size {
270 return Err(Error::IndexLoad(format!(
271 "File size {} too small for length {}",
272 mmap.len(),
273 len
274 )));
275 }
276
277 Ok(Self {
278 _mmap: mmap,
279 len,
280 data_offset: 8,
281 })
282 }
283
284 pub fn len(&self) -> usize {
286 self.len
287 }
288
289 pub fn is_empty(&self) -> bool {
291 self.len == 0
292 }
293
294 pub fn get(&self, idx: usize) -> i64 {
296 let start = self.data_offset + idx * 8;
297 let bytes = &self._mmap[start..start + 8];
298 i64::from_le_bytes(bytes.try_into().unwrap())
299 }
300
301 pub fn to_owned(&self) -> Array1<i64> {
303 let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
304
305 let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
307
308 Array1::from_vec(data.to_vec())
309 }
310}
311
312pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
314 use std::io::Write;
315
316 let file = File::create(path)
317 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
318 let mut writer = std::io::BufWriter::new(file);
319
320 let nrows = array.nrows() as i64;
321 let ncols = array.ncols() as i64;
322
323 writer
324 .write_all(&nrows.to_le_bytes())
325 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
326 writer
327 .write_all(&ncols.to_le_bytes())
328 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
329
330 for val in array.iter() {
331 writer
332 .write_all(&val.to_le_bytes())
333 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
334 }
335
336 writer
337 .flush()
338 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
339
340 Ok(())
341}
342
343pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
345 use std::io::Write;
346
347 let file = File::create(path)
348 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
349 let mut writer = std::io::BufWriter::new(file);
350
351 let nrows = array.nrows() as i64;
352 let ncols = array.ncols() as i64;
353
354 writer
355 .write_all(&nrows.to_le_bytes())
356 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
357 writer
358 .write_all(&ncols.to_le_bytes())
359 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
360
361 for row in array.rows() {
362 writer
363 .write_all(row.as_slice().unwrap())
364 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
365 }
366
367 writer
368 .flush()
369 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
370
371 Ok(())
372}
373
374pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
376 use std::io::Write;
377
378 let file = File::create(path)
379 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
380 let mut writer = std::io::BufWriter::new(file);
381
382 let len = array.len() as i64;
383
384 writer
385 .write_all(&len.to_le_bytes())
386 .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
387
388 for val in array.iter() {
389 writer
390 .write_all(&val.to_le_bytes())
391 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
392 }
393
394 writer
395 .flush()
396 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
397
398 Ok(())
399}
400
401const NPY_MAGIC: &[u8] = b"\x93NUMPY";
407
408fn parse_dtype_from_header(header: &str) -> Result<String> {
410 let descr_start = header
412 .find("'descr':")
413 .ok_or_else(|| Error::IndexLoad("No descr in NPY header".into()))?;
414
415 let after_descr = &header[descr_start + 8..];
416 let quote_start = after_descr
417 .find('\'')
418 .ok_or_else(|| Error::IndexLoad("No dtype quote in NPY header".into()))?;
419 let rest = &after_descr[quote_start + 1..];
420 let quote_end = rest
421 .find('\'')
422 .ok_or_else(|| Error::IndexLoad("Unclosed dtype quote in NPY header".into()))?;
423
424 Ok(rest[..quote_end].to_string())
425}
426
427pub fn detect_npy_dtype(path: &Path) -> Result<String> {
429 let file = File::open(path)
430 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
431
432 let mmap = unsafe {
433 Mmap::map(&file)
434 .map_err(|e| Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e)))?
435 };
436
437 if mmap.len() < 10 {
438 return Err(Error::IndexLoad("NPY file too small".into()));
439 }
440
441 if &mmap[..6] != NPY_MAGIC {
443 return Err(Error::IndexLoad("Invalid NPY magic".into()));
444 }
445
446 let major_version = mmap[6];
447
448 let header_len = if major_version == 1 {
450 u16::from_le_bytes([mmap[8], mmap[9]]) as usize
451 } else if major_version == 2 {
452 if mmap.len() < 12 {
453 return Err(Error::IndexLoad("NPY v2 file too small".into()));
454 }
455 u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
456 } else {
457 return Err(Error::IndexLoad(format!(
458 "Unsupported NPY version: {}",
459 major_version
460 )));
461 };
462
463 let header_start = if major_version == 1 { 10 } else { 12 };
464 let header_end = header_start + header_len;
465
466 if mmap.len() < header_end {
467 return Err(Error::IndexLoad("NPY header exceeds file size".into()));
468 }
469
470 let header_str = std::str::from_utf8(&mmap[header_start..header_end])
471 .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
472
473 parse_dtype_from_header(header_str)
474}
475
476pub fn convert_f16_to_f32_npy(path: &Path) -> Result<()> {
478 use half::f16;
479 use std::io::Read;
480
481 let mut file = File::open(path)
483 .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
484 let mut data = Vec::new();
485 file.read_to_end(&mut data)
486 .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
487
488 if data.len() < 10 || &data[..6] != NPY_MAGIC {
489 return Err(Error::IndexLoad("Invalid NPY file".into()));
490 }
491
492 let major_version = data[6];
493 let header_start = if major_version == 1 { 10 } else { 12 };
494 let header_len = if major_version == 1 {
495 u16::from_le_bytes([data[8], data[9]]) as usize
496 } else {
497 u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
498 };
499 let header_end = header_start + header_len;
500
501 let header_str = std::str::from_utf8(&data[header_start..header_end])
503 .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
504 let shape = parse_shape_from_header(header_str)?;
505
506 let total_elements: usize = shape.iter().product();
508 let f16_data = &data[header_end..header_end + total_elements * 2];
509
510 let mut f32_data = Vec::with_capacity(total_elements * 4);
512 for chunk in f16_data.chunks(2) {
513 let f16_val = f16::from_le_bytes([chunk[0], chunk[1]]);
514 let f32_val: f32 = f16_val.to_f32();
515 f32_data.extend_from_slice(&f32_val.to_le_bytes());
516 }
517
518 let file = File::create(path)
520 .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
521 let mut writer = BufWriter::new(file);
522
523 if shape.len() == 1 {
524 write_npy_header_1d(&mut writer, shape[0], "<f4")?;
525 } else if shape.len() == 2 {
526 write_npy_header_2d(&mut writer, shape[0], shape[1], "<f4")?;
527 } else {
528 return Err(Error::IndexLoad("Unsupported shape dimensions".into()));
529 }
530
531 writer
532 .write_all(&f32_data)
533 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
534 writer.flush()?;
535
536 Ok(())
537}
538
539pub fn convert_i64_to_i32_npy(path: &Path) -> Result<()> {
541 use std::io::Read;
542
543 let mut file = File::open(path)
545 .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
546 let mut data = Vec::new();
547 file.read_to_end(&mut data)
548 .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
549
550 if data.len() < 10 || &data[..6] != NPY_MAGIC {
551 return Err(Error::IndexLoad("Invalid NPY file".into()));
552 }
553
554 let major_version = data[6];
555 let header_start = if major_version == 1 { 10 } else { 12 };
556 let header_len = if major_version == 1 {
557 u16::from_le_bytes([data[8], data[9]]) as usize
558 } else {
559 u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
560 };
561 let header_end = header_start + header_len;
562
563 let header_str = std::str::from_utf8(&data[header_start..header_end])
565 .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
566 let shape = parse_shape_from_header(header_str)?;
567
568 if shape.len() != 1 {
569 return Err(Error::IndexLoad("Expected 1D array for i64->i32".into()));
570 }
571
572 let len = shape[0];
573 let i64_data = &data[header_end..header_end + len * 8];
574
575 let mut i32_data = Vec::with_capacity(len * 4);
577 for chunk in i64_data.chunks(8) {
578 let i64_val = i64::from_le_bytes(chunk.try_into().unwrap());
579 let i32_val = i64_val as i32;
580 i32_data.extend_from_slice(&i32_val.to_le_bytes());
581 }
582
583 let file = File::create(path)
585 .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
586 let mut writer = BufWriter::new(file);
587
588 write_npy_header_1d(&mut writer, len, "<i4")?;
589
590 writer
591 .write_all(&i32_data)
592 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
593 writer.flush()?;
594
595 Ok(())
596}
597
598pub fn normalize_u8_npy(path: &Path) -> Result<()> {
603 use std::io::Read;
604
605 let mut file = File::open(path)
607 .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
608 let mut data = Vec::new();
609 file.read_to_end(&mut data)
610 .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
611
612 if data.len() < 10 || &data[..6] != NPY_MAGIC {
613 return Err(Error::IndexLoad("Invalid NPY file".into()));
614 }
615
616 let major_version = data[6];
617 let header_start = if major_version == 1 { 10 } else { 12 };
618 let header_len = if major_version == 1 {
619 u16::from_le_bytes([data[8], data[9]]) as usize
620 } else {
621 u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
622 };
623 let header_end = header_start + header_len;
624
625 let header_str = std::str::from_utf8(&data[header_start..header_end])
627 .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
628 let shape = parse_shape_from_header(header_str)?;
629
630 if shape.len() != 2 {
631 return Err(Error::IndexLoad(
632 "Expected 2D array for u8 normalization".into(),
633 ));
634 }
635
636 let nrows = shape[0];
637 let ncols = shape[1];
638 let u8_data = &data[header_end..header_end + nrows * ncols];
639
640 let new_file = File::create(path)
642 .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
643 let mut writer = BufWriter::new(new_file);
644
645 write_npy_header_2d(&mut writer, nrows, ncols, "|u1")?;
646
647 writer
648 .write_all(u8_data)
649 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
650 writer.flush()?;
651
652 Ok(())
653}
654
655fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
657 if mmap.len() < 10 {
658 return Err(Error::IndexLoad("NPY file too small".into()));
659 }
660
661 if &mmap[..6] != NPY_MAGIC {
663 return Err(Error::IndexLoad("Invalid NPY magic".into()));
664 }
665
666 let major_version = mmap[6];
667 let _minor_version = mmap[7];
668
669 let header_len = if major_version == 1 {
671 u16::from_le_bytes([mmap[8], mmap[9]]) as usize
672 } else if major_version == 2 {
673 if mmap.len() < 12 {
674 return Err(Error::IndexLoad("NPY v2 file too small".into()));
675 }
676 u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
677 } else {
678 return Err(Error::IndexLoad(format!(
679 "Unsupported NPY version: {}",
680 major_version
681 )));
682 };
683
684 let header_start = if major_version == 1 { 10 } else { 12 };
685 let header_end = header_start + header_len;
686
687 if mmap.len() < header_end {
688 return Err(Error::IndexLoad("NPY header exceeds file size".into()));
689 }
690
691 let header_str = std::str::from_utf8(&mmap[header_start..header_end])
693 .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
694
695 let shape = parse_shape_from_header(header_str)?;
697 let fortran_order = header_str.contains("'fortran_order': True");
698
699 Ok((shape, header_end, fortran_order))
700}
701
702fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
704 let shape_start = header
706 .find("'shape':")
707 .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
708
709 let after_shape = &header[shape_start + 8..];
710 let paren_start = after_shape
711 .find('(')
712 .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
713 let paren_end = after_shape
714 .find(')')
715 .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
716
717 let shape_content = &after_shape[paren_start + 1..paren_end];
718
719 let mut shape = Vec::new();
721 for part in shape_content.split(',') {
722 let trimmed = part.trim();
723 if !trimmed.is_empty() {
724 let dim: usize = trimmed.parse().map_err(|e| {
725 Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
726 })?;
727 shape.push(dim);
728 }
729 }
730
731 Ok(shape)
732}
733
734pub struct MmapNpyArray1I64 {
738 _mmap: Mmap,
739 len: usize,
740 data_offset: usize,
741}
742
743impl MmapNpyArray1I64 {
744 pub fn empty() -> Self {
749 let mmap = MmapMut::map_anon(1)
750 .expect("failed to create anonymous mmap")
751 .make_read_only()
752 .expect("failed to make anonymous mmap read-only");
753 Self {
754 _mmap: mmap,
755 len: 0,
756 data_offset: 0,
757 }
758 }
759
760 pub fn from_npy_file(path: &Path) -> Result<Self> {
762 let file = File::open(path)
763 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
764
765 let mmap = unsafe {
766 Mmap::map(&file).map_err(|e| {
767 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
768 })?
769 };
770
771 let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
772
773 if shape.is_empty() {
774 return Err(Error::IndexLoad("Empty shape in NPY file".into()));
775 }
776
777 let len = shape[0];
778
779 let expected_size = data_offset + len * 8;
781 if mmap.len() < expected_size {
782 return Err(Error::IndexLoad(format!(
783 "NPY file size {} too small for {} elements",
784 mmap.len(),
785 len
786 )));
787 }
788
789 Ok(Self {
790 _mmap: mmap,
791 len,
792 data_offset,
793 })
794 }
795
796 pub fn len(&self) -> usize {
798 self.len
799 }
800
801 pub fn is_empty(&self) -> bool {
803 self.len == 0
804 }
805
806 pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
813 let count = end - start;
814 let mut result = Vec::with_capacity(count);
815
816 for i in start..end {
817 result.push(self.get(i));
818 }
819
820 result
821 }
822
823 pub fn get(&self, idx: usize) -> i64 {
825 let start = self.data_offset + idx * 8;
826 let bytes = &self._mmap[start..start + 8];
827 i64::from_le_bytes(bytes.try_into().unwrap())
828 }
829}
830
831pub struct MmapNpyArray2F32 {
837 _mmap: Mmap,
838 shape: (usize, usize),
839 data_offset: usize,
840}
841
842impl MmapNpyArray2F32 {
843 pub fn from_npy_file(path: &Path) -> Result<Self> {
845 let file = File::open(path)
846 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
847
848 let mmap = unsafe {
849 Mmap::map(&file).map_err(|e| {
850 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
851 })?
852 };
853
854 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
855
856 if shape_vec.len() != 2 {
857 return Err(Error::IndexLoad(format!(
858 "Expected 2D array, got {}D",
859 shape_vec.len()
860 )));
861 }
862
863 let shape = (shape_vec[0], shape_vec[1]);
864
865 let expected_size = data_offset + shape.0 * shape.1 * 4;
867 if mmap.len() < expected_size {
868 return Err(Error::IndexLoad(format!(
869 "NPY file size {} too small for shape {:?}",
870 mmap.len(),
871 shape
872 )));
873 }
874
875 Ok(Self {
876 _mmap: mmap,
877 shape,
878 data_offset,
879 })
880 }
881
882 pub fn shape(&self) -> (usize, usize) {
884 self.shape
885 }
886
887 pub fn nrows(&self) -> usize {
889 self.shape.0
890 }
891
892 pub fn ncols(&self) -> usize {
894 self.shape.1
895 }
896
897 pub fn view(&self) -> ArrayView2<'_, f32> {
901 let byte_start = self.data_offset;
902 let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
903 let bytes = &self._mmap[byte_start..byte_end];
904
905 let data = unsafe {
907 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
908 };
909
910 ArrayView2::from_shape(self.shape, data).unwrap()
911 }
912
913 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
915 let byte_start = self.data_offset + idx * self.shape.1 * 4;
916 let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
917
918 let data =
920 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
921
922 ArrayView1::from_shape(self.shape.1, data).unwrap()
923 }
924
925 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
927 let nrows = end - start;
928 let byte_start = self.data_offset + start * self.shape.1 * 4;
929 let byte_end = self.data_offset + end * self.shape.1 * 4;
930 let bytes = &self._mmap[byte_start..byte_end];
931
932 let data = unsafe {
934 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
935 };
936
937 ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
938 }
939
940 pub fn to_owned(&self) -> Array2<f32> {
944 self.view().to_owned()
945 }
946}
947
948pub struct MmapNpyArray2U8 {
952 _mmap: Mmap,
953 shape: (usize, usize),
954 data_offset: usize,
955}
956
957impl MmapNpyArray2U8 {
958 pub fn empty() -> Self {
963 let mmap = MmapMut::map_anon(1)
964 .expect("failed to create anonymous mmap")
965 .make_read_only()
966 .expect("failed to make anonymous mmap read-only");
967 Self {
968 _mmap: mmap,
969 shape: (0, 0),
970 data_offset: 0,
971 }
972 }
973
974 pub fn from_npy_file(path: &Path) -> Result<Self> {
976 let file = File::open(path)
977 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
978
979 let mmap = unsafe {
980 Mmap::map(&file).map_err(|e| {
981 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
982 })?
983 };
984
985 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
986
987 if shape_vec.len() != 2 {
988 return Err(Error::IndexLoad(format!(
989 "Expected 2D array, got {}D",
990 shape_vec.len()
991 )));
992 }
993
994 let shape = (shape_vec[0], shape_vec[1]);
995
996 let expected_size = data_offset + shape.0 * shape.1;
998 if mmap.len() < expected_size {
999 return Err(Error::IndexLoad(format!(
1000 "NPY file size {} too small for shape {:?}",
1001 mmap.len(),
1002 shape
1003 )));
1004 }
1005
1006 Ok(Self {
1007 _mmap: mmap,
1008 shape,
1009 data_offset,
1010 })
1011 }
1012
1013 pub fn shape(&self) -> (usize, usize) {
1015 self.shape
1016 }
1017
1018 pub fn nrows(&self) -> usize {
1020 self.shape.0
1021 }
1022
1023 pub fn ncols(&self) -> usize {
1025 self.shape.1
1026 }
1027
1028 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
1030 let nrows = end - start;
1031 let byte_start = self.data_offset + start * self.shape.1;
1032 let byte_end = self.data_offset + end * self.shape.1;
1033 let bytes = &self._mmap[byte_start..byte_end];
1034
1035 ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
1036 }
1037
1038 pub fn view(&self) -> ArrayView2<'_, u8> {
1040 self.slice_rows(0, self.shape.0)
1041 }
1042
1043 pub fn row(&self, idx: usize) -> &[u8] {
1045 let byte_start = self.data_offset + idx * self.shape.1;
1046 let byte_end = byte_start + self.shape.1;
1047 &self._mmap[byte_start..byte_end]
1048 }
1049}
1050
1051#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1057pub struct ChunkManifestEntry {
1058 pub rows: usize,
1059 pub mtime: f64,
1060}
1061
1062#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1064pub struct MergeManifest {
1065 pub chunks: HashMap<String, ChunkManifestEntry>,
1067 #[serde(default)]
1069 pub padding_rows: usize,
1070 #[serde(default)]
1072 pub total_rows: usize,
1073 #[serde(default)]
1075 pub ncols: usize,
1076}
1077
1078pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
1080
1081fn load_merge_manifest(manifest_path: &Path) -> Option<MergeManifest> {
1084 if manifest_path.exists() {
1085 if let Ok(file) = File::open(manifest_path) {
1086 let reader = BufReader::new(file);
1088 if let Ok(manifest) = serde_json::from_reader::<_, MergeManifest>(reader) {
1089 return Some(manifest);
1090 }
1091 if let Ok(file) = File::open(manifest_path) {
1093 if let Ok(chunks) =
1094 serde_json::from_reader::<_, ChunkManifest>(BufReader::new(file))
1095 {
1096 return Some(MergeManifest {
1098 chunks,
1099 padding_rows: 0,
1100 total_rows: 0,
1101 ncols: 0,
1102 });
1103 }
1104 }
1105 }
1106 }
1107 None
1108}
1109
1110fn save_merge_manifest(manifest_path: &Path, manifest: &MergeManifest) -> Result<()> {
1112 let temp_path = manifest_path.with_extension("manifest.json.tmp");
1113
1114 let file = File::create(&temp_path)
1116 .map_err(|e| Error::IndexLoad(format!("Failed to create temp manifest: {}", e)))?;
1117 let mut writer = BufWriter::new(file);
1118 serde_json::to_writer(&mut writer, manifest)
1119 .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
1120 writer
1121 .flush()
1122 .map_err(|e| Error::IndexLoad(format!("Failed to flush manifest: {}", e)))?;
1123
1124 writer
1126 .into_inner()
1127 .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?
1128 .sync_all()
1129 .map_err(|e| Error::IndexLoad(format!("Failed to sync manifest: {}", e)))?;
1130
1131 fs::rename(&temp_path, manifest_path)
1133 .map_err(|e| Error::IndexLoad(format!("Failed to rename manifest: {}", e)))?;
1134
1135 Ok(())
1136}
1137
1138fn get_mtime(path: &Path) -> Result<f64> {
1140 let metadata = fs::metadata(path)
1141 .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
1142 let mtime = metadata
1143 .modified()
1144 .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
1145 let duration = mtime
1146 .duration_since(std::time::UNIX_EPOCH)
1147 .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
1148 Ok(duration.as_secs_f64())
1149}
1150
1151fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
1153 let header_dict = format!(
1155 "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
1156 dtype, len
1157 );
1158
1159 let header_len = header_dict.len();
1161 let padding = (64 - ((10 + header_len) % 64)) % 64;
1162 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1163
1164 writer
1166 .write_all(NPY_MAGIC)
1167 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1168 writer
1169 .write_all(&[1, 0])
1170 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?; let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1174 writer
1175 .write_all(&header_len_bytes)
1176 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1177
1178 writer
1180 .write_all(padded_header.as_bytes())
1181 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1182
1183 Ok(10 + padded_header.len())
1184}
1185
1186fn write_npy_header_2d(
1188 writer: &mut impl Write,
1189 nrows: usize,
1190 ncols: usize,
1191 dtype: &str,
1192) -> Result<usize> {
1193 let header_dict = format!(
1195 "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
1196 dtype, nrows, ncols
1197 );
1198
1199 let header_len = header_dict.len();
1201 let padding = (64 - ((10 + header_len) % 64)) % 64;
1202 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1203
1204 writer
1206 .write_all(NPY_MAGIC)
1207 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1208 writer
1209 .write_all(&[1, 0])
1210 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
1211
1212 let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1214 writer
1215 .write_all(&header_len_bytes)
1216 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1217
1218 writer
1220 .write_all(padded_header.as_bytes())
1221 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1222
1223 Ok(10 + padded_header.len())
1224}
1225
1226struct ChunkInfo {
1228 path: std::path::PathBuf,
1229 filename: String,
1230 rows: usize,
1231 mtime: f64,
1232}
1233
1234pub fn merge_codes_chunks(
1241 index_path: &Path,
1242 num_chunks: usize,
1243 padding_rows: usize,
1244) -> Result<std::path::PathBuf> {
1245 use ndarray_npy::ReadNpyExt;
1246
1247 let merged_path = index_path.join("merged_codes.npy");
1248 let manifest_path = index_path.join("merged_codes.manifest.json");
1249 let temp_path = index_path.join("merged_codes.npy.tmp");
1250 let lock_path = index_path.join("merged_codes.lock");
1251
1252 let _lock = FileLockGuard::acquire(&lock_path)?;
1255
1256 let old_manifest = load_merge_manifest(&manifest_path);
1261
1262 let mut chunks: Vec<ChunkInfo> = Vec::new();
1264 let mut total_rows = 0usize;
1265 let mut chain_broken = false;
1266
1267 for i in 0..num_chunks {
1268 let filename = format!("{}.codes.npy", i);
1269 let path = index_path.join(&filename);
1270
1271 if path.exists() {
1272 let mtime = get_mtime(&path)?;
1273
1274 let file = File::open(&path)?;
1276 let arr: Array1<i64> = Array1::read_npy(file)?;
1277 let rows = arr.len();
1278
1279 if rows > 0 {
1280 total_rows += rows;
1281
1282 let is_clean = if let Some(ref manifest) = old_manifest {
1284 manifest
1285 .chunks
1286 .get(&filename)
1287 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1288 } else {
1289 false
1290 };
1291
1292 if !is_clean {
1293 chain_broken = true;
1294 }
1295
1296 chunks.push(ChunkInfo {
1297 path,
1298 filename,
1299 rows,
1300 mtime,
1301 });
1302 }
1303 }
1304 }
1305
1306 if total_rows == 0 {
1307 return Err(Error::IndexLoad("No data to merge".into()));
1308 }
1309
1310 let final_rows = total_rows + padding_rows;
1311
1312 let padding_changed = old_manifest
1318 .as_ref()
1319 .map(|m| m.padding_rows != padding_rows)
1320 .unwrap_or(true);
1321 let total_rows_mismatch = old_manifest
1322 .as_ref()
1323 .map(|m| m.total_rows != final_rows)
1324 .unwrap_or(true);
1325
1326 let needs_full_rewrite =
1327 !merged_path.exists() || chain_broken || padding_changed || total_rows_mismatch;
1328
1329 if needs_full_rewrite {
1330 let file = File::create(&temp_path)
1332 .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1333 let mut writer = BufWriter::new(file);
1334
1335 let header_size = write_npy_header_1d(&mut writer, final_rows, "<i8")?;
1337
1338 let mut written_rows = 0usize;
1340 for chunk in &chunks {
1341 let file = File::open(&chunk.path)?;
1342 let arr: Array1<i64> = Array1::read_npy(file)?;
1343 for &val in arr.iter() {
1344 writer.write_all(&val.to_le_bytes())?;
1345 }
1346 written_rows += arr.len();
1347 }
1348
1349 for _ in 0..padding_rows {
1351 writer.write_all(&0i64.to_le_bytes())?;
1352 }
1353 written_rows += padding_rows;
1354
1355 writer
1357 .flush()
1358 .map_err(|e| Error::IndexLoad(format!("Failed to flush merged file: {}", e)))?;
1359 let file = writer
1360 .into_inner()
1361 .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1362 file.sync_all()
1363 .map_err(|e| Error::IndexLoad(format!("Failed to sync merged file to disk: {}", e)))?;
1364
1365 let expected_size = header_size + written_rows * 8;
1367 let actual_size = fs::metadata(&temp_path)
1368 .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1369 .len() as usize;
1370
1371 if actual_size != expected_size {
1372 let _ = fs::remove_file(&temp_path);
1374 return Err(Error::IndexLoad(format!(
1375 "Merged codes file size mismatch: expected {} bytes, got {} bytes",
1376 expected_size, actual_size
1377 )));
1378 }
1379
1380 fs::rename(&temp_path, &merged_path)
1382 .map_err(|e| Error::IndexLoad(format!("Failed to rename merged file: {}", e)))?;
1383 } else {
1384 if merged_path.exists() {
1386 let file_size = fs::metadata(&merged_path)
1387 .map_err(|e| {
1388 Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1389 })?
1390 .len() as usize;
1391
1392 let min_expected_size = 64 + final_rows * 8;
1394 if file_size < min_expected_size {
1395 let _ = fs::remove_file(&merged_path);
1397 let _ = fs::remove_file(&manifest_path);
1398 drop(_lock);
1400 return merge_codes_chunks(index_path, num_chunks, padding_rows);
1401 }
1402 }
1403 }
1404
1405 let mut chunk_map = HashMap::new();
1407 for chunk in &chunks {
1408 chunk_map.insert(
1409 chunk.filename.clone(),
1410 ChunkManifestEntry {
1411 rows: chunk.rows,
1412 mtime: chunk.mtime,
1413 },
1414 );
1415 }
1416 let new_manifest = MergeManifest {
1417 chunks: chunk_map,
1418 padding_rows,
1419 total_rows: final_rows,
1420 ncols: 0, };
1422 save_merge_manifest(&manifest_path, &new_manifest)?;
1423
1424 Ok(merged_path)
1425}
1426
1427pub fn merge_residuals_chunks(
1432 index_path: &Path,
1433 num_chunks: usize,
1434 padding_rows: usize,
1435) -> Result<std::path::PathBuf> {
1436 use ndarray_npy::ReadNpyExt;
1437
1438 let merged_path = index_path.join("merged_residuals.npy");
1439 let manifest_path = index_path.join("merged_residuals.manifest.json");
1440 let temp_path = index_path.join("merged_residuals.npy.tmp");
1441 let lock_path = index_path.join("merged_residuals.lock");
1442
1443 let _lock = FileLockGuard::acquire(&lock_path)?;
1446
1447 let old_manifest = load_merge_manifest(&manifest_path);
1452
1453 let mut chunks: Vec<ChunkInfo> = Vec::new();
1455 let mut total_rows = 0usize;
1456 let mut ncols = 0usize;
1457 let mut chain_broken = false;
1458
1459 for i in 0..num_chunks {
1460 let filename = format!("{}.residuals.npy", i);
1461 let path = index_path.join(&filename);
1462
1463 if path.exists() {
1464 let mtime = get_mtime(&path)?;
1465
1466 let file = File::open(&path)?;
1468 let arr: Array2<u8> = Array2::read_npy(file)?;
1469 let rows = arr.nrows();
1470 ncols = arr.ncols();
1471
1472 if rows > 0 {
1473 total_rows += rows;
1474
1475 let is_clean = if let Some(ref manifest) = old_manifest {
1476 manifest
1477 .chunks
1478 .get(&filename)
1479 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1480 } else {
1481 false
1482 };
1483
1484 if !is_clean {
1485 chain_broken = true;
1486 }
1487
1488 chunks.push(ChunkInfo {
1489 path,
1490 filename,
1491 rows,
1492 mtime,
1493 });
1494 }
1495 }
1496 }
1497
1498 if total_rows == 0 || ncols == 0 {
1499 return Err(Error::IndexLoad("No residual data to merge".into()));
1500 }
1501
1502 let final_rows = total_rows + padding_rows;
1503
1504 let padding_changed = old_manifest
1510 .as_ref()
1511 .map(|m| m.padding_rows != padding_rows)
1512 .unwrap_or(true);
1513 let total_rows_mismatch = old_manifest
1514 .as_ref()
1515 .map(|m| m.total_rows != final_rows)
1516 .unwrap_or(true);
1517 let ncols_mismatch = old_manifest
1518 .as_ref()
1519 .map(|m| m.ncols != ncols && m.ncols != 0)
1520 .unwrap_or(false);
1521
1522 let needs_full_rewrite = !merged_path.exists()
1523 || chain_broken
1524 || padding_changed
1525 || total_rows_mismatch
1526 || ncols_mismatch;
1527
1528 if needs_full_rewrite {
1529 let file = File::create(&temp_path)
1531 .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1532 let mut writer = BufWriter::new(file);
1533
1534 let header_size = write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1536
1537 let mut written_rows = 0usize;
1539 for chunk in &chunks {
1540 let file = File::open(&chunk.path)?;
1541 let arr: Array2<u8> = Array2::read_npy(file)?;
1542 for row in arr.rows() {
1543 writer.write_all(row.as_slice().unwrap())?;
1544 }
1545 written_rows += arr.nrows();
1546 }
1547
1548 let zero_row = vec![0u8; ncols];
1550 for _ in 0..padding_rows {
1551 writer.write_all(&zero_row)?;
1552 }
1553 written_rows += padding_rows;
1554
1555 writer
1557 .flush()
1558 .map_err(|e| Error::IndexLoad(format!("Failed to flush merged residuals: {}", e)))?;
1559 let file = writer
1560 .into_inner()
1561 .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1562 file.sync_all().map_err(|e| {
1563 Error::IndexLoad(format!("Failed to sync merged residuals to disk: {}", e))
1564 })?;
1565
1566 let expected_size = header_size + written_rows * ncols;
1568 let actual_size = fs::metadata(&temp_path)
1569 .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1570 .len() as usize;
1571
1572 if actual_size != expected_size {
1573 let _ = fs::remove_file(&temp_path);
1575 return Err(Error::IndexLoad(format!(
1576 "Merged residuals file size mismatch: expected {} bytes, got {} bytes",
1577 expected_size, actual_size
1578 )));
1579 }
1580
1581 fs::rename(&temp_path, &merged_path)
1583 .map_err(|e| Error::IndexLoad(format!("Failed to rename merged residuals: {}", e)))?;
1584 } else {
1585 if merged_path.exists() {
1587 let file_size = fs::metadata(&merged_path)
1588 .map_err(|e| {
1589 Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1590 })?
1591 .len() as usize;
1592
1593 let min_expected_size = 64 + final_rows * ncols;
1595 if file_size < min_expected_size {
1596 let _ = fs::remove_file(&merged_path);
1598 let _ = fs::remove_file(&manifest_path);
1599 drop(_lock);
1601 return merge_residuals_chunks(index_path, num_chunks, padding_rows);
1602 }
1603 }
1604 }
1605
1606 let mut chunk_map = HashMap::new();
1608 for chunk in &chunks {
1609 chunk_map.insert(
1610 chunk.filename.clone(),
1611 ChunkManifestEntry {
1612 rows: chunk.rows,
1613 mtime: chunk.mtime,
1614 },
1615 );
1616 }
1617 let new_manifest = MergeManifest {
1618 chunks: chunk_map,
1619 padding_rows,
1620 total_rows: final_rows,
1621 ncols,
1622 };
1623 save_merge_manifest(&manifest_path, &new_manifest)?;
1624
1625 Ok(merged_path)
1626}
1627
1628pub fn clear_merged_files(index_path: &Path) -> Result<()> {
1637 let codes_lock_path = index_path.join("merged_codes.lock");
1641 let residuals_lock_path = index_path.join("merged_residuals.lock");
1642 let _codes_lock = FileLockGuard::acquire(&codes_lock_path)?;
1643 let _residuals_lock = FileLockGuard::acquire(&residuals_lock_path)?;
1644
1645 let files_to_remove = [
1646 "merged_codes.npy",
1647 "merged_codes.npy.tmp",
1648 "merged_codes.manifest.json",
1649 "merged_codes.manifest.json.tmp",
1650 "merged_residuals.npy",
1651 "merged_residuals.npy.tmp",
1652 "merged_residuals.manifest.json",
1653 "merged_residuals.manifest.json.tmp",
1654 ];
1655
1656 for filename in files_to_remove {
1657 let path = index_path.join(filename);
1658 if path.exists() {
1659 fs::remove_file(&path)
1660 .map_err(|e| Error::IndexLoad(format!("Failed to remove {}: {}", filename, e)))?;
1661 }
1662 }
1663
1664 Ok(())
1665}
1666
1667pub fn convert_fastplaid_to_nextplaid(index_path: &Path) -> Result<bool> {
1680 let mut converted = false;
1681
1682 let float_files = [
1684 "centroids.npy",
1685 "avg_residual.npy",
1686 "bucket_cutoffs.npy",
1687 "bucket_weights.npy",
1688 ];
1689
1690 for filename in float_files {
1691 let path = index_path.join(filename);
1692 if path.exists() {
1693 let dtype = detect_npy_dtype(&path)?;
1694 if dtype == "<f2" {
1695 eprintln!(" Converting {} from float16 to float32", filename);
1696 convert_f16_to_f32_npy(&path)?;
1697 converted = true;
1698 }
1699 }
1700 }
1701
1702 let ivf_lengths_path = index_path.join("ivf_lengths.npy");
1704 if ivf_lengths_path.exists() {
1705 let dtype = detect_npy_dtype(&ivf_lengths_path)?;
1706 if dtype == "<i8" {
1707 eprintln!(" Converting ivf_lengths.npy from int64 to int32");
1708 convert_i64_to_i32_npy(&ivf_lengths_path)?;
1709 converted = true;
1710 }
1711 }
1712
1713 for entry in fs::read_dir(index_path)? {
1716 let entry = entry?;
1717 let filename = entry.file_name().to_string_lossy().to_string();
1718 if filename.ends_with(".residuals.npy") {
1719 let path = entry.path();
1720 let dtype = detect_npy_dtype(&path)?;
1721 if dtype == "<u1" {
1722 eprintln!(
1723 " Normalizing {} dtype descriptor from <u1 to |u1",
1724 filename
1725 );
1726 normalize_u8_npy(&path)?;
1727 converted = true;
1728 }
1729 }
1730 }
1731
1732 Ok(converted)
1733}
1734
1735#[cfg(test)]
1736mod tests {
1737 use super::*;
1738 use std::io::Write;
1739 use tempfile::NamedTempFile;
1740
1741 #[test]
1742 fn test_mmap_array2_f32() {
1743 let mut file = NamedTempFile::new().unwrap();
1745
1746 file.write_all(&3i64.to_le_bytes()).unwrap();
1748 file.write_all(&2i64.to_le_bytes()).unwrap();
1749
1750 for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1752 file.write_all(&val.to_le_bytes()).unwrap();
1753 }
1754
1755 file.flush().unwrap();
1756
1757 let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1759 assert_eq!(mmap.shape(), (3, 2));
1760
1761 let row0 = mmap.row(0);
1762 assert_eq!(row0[0], 1.0);
1763 assert_eq!(row0[1], 2.0);
1764
1765 let owned = mmap.to_owned();
1766 assert_eq!(owned[[2, 0]], 5.0);
1767 assert_eq!(owned[[2, 1]], 6.0);
1768 }
1769
1770 #[test]
1771 fn test_mmap_array1_i64() {
1772 let mut file = NamedTempFile::new().unwrap();
1773
1774 file.write_all(&4i64.to_le_bytes()).unwrap();
1776
1777 for val in [10i64, 20, 30, 40] {
1779 file.write_all(&val.to_le_bytes()).unwrap();
1780 }
1781
1782 file.flush().unwrap();
1783
1784 let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1785 assert_eq!(mmap.len(), 4);
1786 assert_eq!(mmap.get(0), 10);
1787 assert_eq!(mmap.get(3), 40);
1788
1789 let owned = mmap.to_owned();
1790 assert_eq!(owned[1], 20);
1791 assert_eq!(owned[2], 30);
1792 }
1793
1794 #[test]
1795 fn test_write_read_roundtrip() {
1796 let file = NamedTempFile::new().unwrap();
1797 let path = file.path();
1798
1799 let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1801
1802 write_array2_f32(&array, path).unwrap();
1804
1805 let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1807 let loaded = mmap.to_owned();
1808
1809 assert_eq!(array, loaded);
1810 }
1811}