1use std::collections::HashMap;
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, BufWriter, Write};
14use std::path::Path;
15
16use byteorder::{LittleEndian, ReadBytesExt};
17use fs2::FileExt;
18use memmap2::{Mmap, MmapMut};
19use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
20
21use crate::error::{Error, Result};
22
23struct FileLockGuard {
26 _file: File,
27}
28
29impl FileLockGuard {
30 fn acquire(lock_path: &Path) -> Result<Self> {
34 let file = std::fs::OpenOptions::new()
35 .read(true)
36 .write(true)
37 .create(true)
38 .truncate(false)
39 .open(lock_path)
40 .map_err(|e| {
41 Error::IndexLoad(format!("Failed to open lock file {:?}: {}", lock_path, e))
42 })?;
43
44 file.lock_exclusive().map_err(|e| {
45 Error::IndexLoad(format!("Failed to acquire lock on {:?}: {}", lock_path, e))
46 })?;
47
48 Ok(Self { _file: file })
49 }
50}
51
52impl Drop for FileLockGuard {
53 fn drop(&mut self) {
54 let _ = self._file.unlock();
56 }
57}
58
59pub struct MmapArray2F32 {
63 _mmap: Mmap,
64 shape: (usize, usize),
65 data_offset: usize,
66}
67
68impl MmapArray2F32 {
69 pub fn from_raw_file(path: &Path) -> Result<Self> {
76 let file = File::open(path)
77 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
78
79 let mmap = unsafe {
80 Mmap::map(&file)
81 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
82 };
83
84 if mmap.len() < 16 {
85 return Err(Error::IndexLoad("File too small for header".into()));
86 }
87
88 let mut cursor = std::io::Cursor::new(&mmap[..16]);
90 let nrows = cursor
91 .read_i64::<LittleEndian>()
92 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
93 as usize;
94 let ncols = cursor
95 .read_i64::<LittleEndian>()
96 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
97 as usize;
98
99 let expected_size = 16 + nrows * ncols * 4;
100 if mmap.len() < expected_size {
101 return Err(Error::IndexLoad(format!(
102 "File size {} too small for shape ({}, {})",
103 mmap.len(),
104 nrows,
105 ncols
106 )));
107 }
108
109 Ok(Self {
110 _mmap: mmap,
111 shape: (nrows, ncols),
112 data_offset: 16,
113 })
114 }
115
116 pub fn shape(&self) -> (usize, usize) {
118 self.shape
119 }
120
121 pub fn nrows(&self) -> usize {
123 self.shape.0
124 }
125
126 pub fn ncols(&self) -> usize {
128 self.shape.1
129 }
130
131 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
133 let start = self.data_offset + idx * self.shape.1 * 4;
134 let bytes = &self._mmap[start..start + self.shape.1 * 4];
135
136 let data =
138 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
139
140 ArrayView1::from_shape(self.shape.1, data).unwrap()
141 }
142
143 pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
145 let nrows = end - start;
146 let byte_start = self.data_offset + start * self.shape.1 * 4;
147 let byte_end = self.data_offset + end * self.shape.1 * 4;
148 let bytes = &self._mmap[byte_start..byte_end];
149
150 let data = unsafe {
152 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
153 };
154
155 Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
156 }
157
158 pub fn to_owned(&self) -> Array2<f32> {
160 self.load_rows(0, self.shape.0)
161 }
162}
163
164pub struct MmapArray2U8 {
166 _mmap: Mmap,
167 shape: (usize, usize),
168 data_offset: usize,
169}
170
171impl MmapArray2U8 {
172 pub fn from_raw_file(path: &Path) -> Result<Self> {
174 let file = File::open(path)
175 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
176
177 let mmap = unsafe {
178 Mmap::map(&file)
179 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
180 };
181
182 if mmap.len() < 16 {
183 return Err(Error::IndexLoad("File too small for header".into()));
184 }
185
186 let mut cursor = std::io::Cursor::new(&mmap[..16]);
187 let nrows = cursor
188 .read_i64::<LittleEndian>()
189 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
190 as usize;
191 let ncols = cursor
192 .read_i64::<LittleEndian>()
193 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
194 as usize;
195
196 let expected_size = 16 + nrows * ncols;
197 if mmap.len() < expected_size {
198 return Err(Error::IndexLoad(format!(
199 "File size {} too small for shape ({}, {})",
200 mmap.len(),
201 nrows,
202 ncols
203 )));
204 }
205
206 Ok(Self {
207 _mmap: mmap,
208 shape: (nrows, ncols),
209 data_offset: 16,
210 })
211 }
212
213 pub fn shape(&self) -> (usize, usize) {
215 self.shape
216 }
217
218 pub fn view(&self) -> ArrayView2<'_, u8> {
220 let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
221 ArrayView2::from_shape(self.shape, bytes).unwrap()
222 }
223
224 pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
226 let nrows = end - start;
227 let byte_start = self.data_offset + start * self.shape.1;
228 let byte_end = self.data_offset + end * self.shape.1;
229 let bytes = &self._mmap[byte_start..byte_end];
230
231 Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
232 }
233
234 pub fn to_owned(&self) -> Array2<u8> {
236 self.load_rows(0, self.shape.0)
237 }
238}
239
240pub struct MmapArray1I64 {
242 _mmap: Mmap,
243 len: usize,
244 data_offset: usize,
245}
246
247impl MmapArray1I64 {
248 pub fn from_raw_file(path: &Path) -> Result<Self> {
250 let file = File::open(path)
251 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
252
253 let mmap = unsafe {
254 Mmap::map(&file)
255 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
256 };
257
258 if mmap.len() < 8 {
259 return Err(Error::IndexLoad("File too small for header".into()));
260 }
261
262 let mut cursor = std::io::Cursor::new(&mmap[..8]);
263 let len = cursor
264 .read_i64::<LittleEndian>()
265 .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
266 as usize;
267
268 let expected_size = 8 + len * 8;
269 if mmap.len() < expected_size {
270 return Err(Error::IndexLoad(format!(
271 "File size {} too small for length {}",
272 mmap.len(),
273 len
274 )));
275 }
276
277 Ok(Self {
278 _mmap: mmap,
279 len,
280 data_offset: 8,
281 })
282 }
283
284 pub fn len(&self) -> usize {
286 self.len
287 }
288
289 pub fn is_empty(&self) -> bool {
291 self.len == 0
292 }
293
294 pub fn get(&self, idx: usize) -> i64 {
296 let start = self.data_offset + idx * 8;
297 let bytes = &self._mmap[start..start + 8];
298 i64::from_le_bytes(bytes.try_into().unwrap())
299 }
300
301 pub fn to_owned(&self) -> Array1<i64> {
303 let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
304
305 let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
307
308 Array1::from_vec(data.to_vec())
309 }
310}
311
312pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
314 use std::io::Write;
315
316 let file = File::create(path)
317 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
318 let mut writer = std::io::BufWriter::new(file);
319
320 let nrows = array.nrows() as i64;
321 let ncols = array.ncols() as i64;
322
323 writer
324 .write_all(&nrows.to_le_bytes())
325 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
326 writer
327 .write_all(&ncols.to_le_bytes())
328 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
329
330 for val in array.iter() {
331 writer
332 .write_all(&val.to_le_bytes())
333 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
334 }
335
336 writer
337 .flush()
338 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
339
340 Ok(())
341}
342
343pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
345 use std::io::Write;
346
347 let file = File::create(path)
348 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
349 let mut writer = std::io::BufWriter::new(file);
350
351 let nrows = array.nrows() as i64;
352 let ncols = array.ncols() as i64;
353
354 writer
355 .write_all(&nrows.to_le_bytes())
356 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
357 writer
358 .write_all(&ncols.to_le_bytes())
359 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
360
361 for row in array.rows() {
362 writer
363 .write_all(row.as_slice().unwrap())
364 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
365 }
366
367 writer
368 .flush()
369 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
370
371 Ok(())
372}
373
374pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
376 use std::io::Write;
377
378 let file = File::create(path)
379 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
380 let mut writer = std::io::BufWriter::new(file);
381
382 let len = array.len() as i64;
383
384 writer
385 .write_all(&len.to_le_bytes())
386 .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
387
388 for val in array.iter() {
389 writer
390 .write_all(&val.to_le_bytes())
391 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
392 }
393
394 writer
395 .flush()
396 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
397
398 Ok(())
399}
400
401const NPY_MAGIC: &[u8] = b"\x93NUMPY";
407
408fn parse_dtype_from_header(header: &str) -> Result<String> {
410 let descr_start = header
412 .find("'descr':")
413 .ok_or_else(|| Error::IndexLoad("No descr in NPY header".into()))?;
414
415 let after_descr = &header[descr_start + 8..];
416 let quote_start = after_descr
417 .find('\'')
418 .ok_or_else(|| Error::IndexLoad("No dtype quote in NPY header".into()))?;
419 let rest = &after_descr[quote_start + 1..];
420 let quote_end = rest
421 .find('\'')
422 .ok_or_else(|| Error::IndexLoad("Unclosed dtype quote in NPY header".into()))?;
423
424 Ok(rest[..quote_end].to_string())
425}
426
427pub fn detect_npy_dtype(path: &Path) -> Result<String> {
429 let file = File::open(path)
430 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
431
432 let mmap = unsafe {
433 Mmap::map(&file)
434 .map_err(|e| Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e)))?
435 };
436
437 if mmap.len() < 10 {
438 return Err(Error::IndexLoad("NPY file too small".into()));
439 }
440
441 if &mmap[..6] != NPY_MAGIC {
443 return Err(Error::IndexLoad("Invalid NPY magic".into()));
444 }
445
446 let major_version = mmap[6];
447
448 let header_len = if major_version == 1 {
450 u16::from_le_bytes([mmap[8], mmap[9]]) as usize
451 } else if major_version == 2 {
452 if mmap.len() < 12 {
453 return Err(Error::IndexLoad("NPY v2 file too small".into()));
454 }
455 u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
456 } else {
457 return Err(Error::IndexLoad(format!(
458 "Unsupported NPY version: {}",
459 major_version
460 )));
461 };
462
463 let header_start = if major_version == 1 { 10 } else { 12 };
464 let header_end = header_start + header_len;
465
466 if mmap.len() < header_end {
467 return Err(Error::IndexLoad("NPY header exceeds file size".into()));
468 }
469
470 let header_str = std::str::from_utf8(&mmap[header_start..header_end])
471 .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
472
473 parse_dtype_from_header(header_str)
474}
475
476pub fn convert_f16_to_f32_npy(path: &Path) -> Result<()> {
478 use half::f16;
479 use std::io::Read;
480
481 let mut file = File::open(path)
483 .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
484 let mut data = Vec::new();
485 file.read_to_end(&mut data)
486 .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
487
488 if data.len() < 10 || &data[..6] != NPY_MAGIC {
489 return Err(Error::IndexLoad("Invalid NPY file".into()));
490 }
491
492 let major_version = data[6];
493 let header_start = if major_version == 1 { 10 } else { 12 };
494 let header_len = if major_version == 1 {
495 u16::from_le_bytes([data[8], data[9]]) as usize
496 } else {
497 u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
498 };
499 let header_end = header_start + header_len;
500
501 let header_str = std::str::from_utf8(&data[header_start..header_end])
503 .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
504 let shape = parse_shape_from_header(header_str)?;
505
506 let total_elements: usize = shape.iter().product();
508 let f16_data = &data[header_end..header_end + total_elements * 2];
509
510 let mut f32_data = Vec::with_capacity(total_elements * 4);
512 for chunk in f16_data.chunks(2) {
513 let f16_val = f16::from_le_bytes([chunk[0], chunk[1]]);
514 let f32_val: f32 = f16_val.to_f32();
515 f32_data.extend_from_slice(&f32_val.to_le_bytes());
516 }
517
518 let file = File::create(path)
520 .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
521 let mut writer = BufWriter::new(file);
522
523 if shape.len() == 1 {
524 write_npy_header_1d(&mut writer, shape[0], "<f4")?;
525 } else if shape.len() == 2 {
526 write_npy_header_2d(&mut writer, shape[0], shape[1], "<f4")?;
527 } else {
528 return Err(Error::IndexLoad("Unsupported shape dimensions".into()));
529 }
530
531 writer
532 .write_all(&f32_data)
533 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
534 writer.flush()?;
535
536 Ok(())
537}
538
539pub fn convert_i64_to_i32_npy(path: &Path) -> Result<()> {
541 use std::io::Read;
542
543 let mut file = File::open(path)
545 .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
546 let mut data = Vec::new();
547 file.read_to_end(&mut data)
548 .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
549
550 if data.len() < 10 || &data[..6] != NPY_MAGIC {
551 return Err(Error::IndexLoad("Invalid NPY file".into()));
552 }
553
554 let major_version = data[6];
555 let header_start = if major_version == 1 { 10 } else { 12 };
556 let header_len = if major_version == 1 {
557 u16::from_le_bytes([data[8], data[9]]) as usize
558 } else {
559 u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
560 };
561 let header_end = header_start + header_len;
562
563 let header_str = std::str::from_utf8(&data[header_start..header_end])
565 .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
566 let shape = parse_shape_from_header(header_str)?;
567
568 if shape.len() != 1 {
569 return Err(Error::IndexLoad("Expected 1D array for i64->i32".into()));
570 }
571
572 let len = shape[0];
573 let i64_data = &data[header_end..header_end + len * 8];
574
575 let mut i32_data = Vec::with_capacity(len * 4);
577 for chunk in i64_data.chunks(8) {
578 let i64_val = i64::from_le_bytes(chunk.try_into().unwrap());
579 let i32_val = i64_val as i32;
580 i32_data.extend_from_slice(&i32_val.to_le_bytes());
581 }
582
583 let file = File::create(path)
585 .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
586 let mut writer = BufWriter::new(file);
587
588 write_npy_header_1d(&mut writer, len, "<i4")?;
589
590 writer
591 .write_all(&i32_data)
592 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
593 writer.flush()?;
594
595 Ok(())
596}
597
598pub fn normalize_u8_npy(path: &Path) -> Result<()> {
603 use std::io::Read;
604
605 let mut file = File::open(path)
607 .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
608 let mut data = Vec::new();
609 file.read_to_end(&mut data)
610 .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
611
612 if data.len() < 10 || &data[..6] != NPY_MAGIC {
613 return Err(Error::IndexLoad("Invalid NPY file".into()));
614 }
615
616 let major_version = data[6];
617 let header_start = if major_version == 1 { 10 } else { 12 };
618 let header_len = if major_version == 1 {
619 u16::from_le_bytes([data[8], data[9]]) as usize
620 } else {
621 u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
622 };
623 let header_end = header_start + header_len;
624
625 let header_str = std::str::from_utf8(&data[header_start..header_end])
627 .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
628 let shape = parse_shape_from_header(header_str)?;
629
630 if shape.len() != 2 {
631 return Err(Error::IndexLoad(
632 "Expected 2D array for u8 normalization".into(),
633 ));
634 }
635
636 let nrows = shape[0];
637 let ncols = shape[1];
638 let u8_data = &data[header_end..header_end + nrows * ncols];
639
640 let new_file = File::create(path)
642 .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
643 let mut writer = BufWriter::new(new_file);
644
645 write_npy_header_2d(&mut writer, nrows, ncols, "|u1")?;
646
647 writer
648 .write_all(u8_data)
649 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
650 writer.flush()?;
651
652 Ok(())
653}
654
655fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
657 if mmap.len() < 10 {
658 return Err(Error::IndexLoad("NPY file too small".into()));
659 }
660
661 if &mmap[..6] != NPY_MAGIC {
663 return Err(Error::IndexLoad("Invalid NPY magic".into()));
664 }
665
666 let major_version = mmap[6];
667 let _minor_version = mmap[7];
668
669 let header_len = if major_version == 1 {
671 u16::from_le_bytes([mmap[8], mmap[9]]) as usize
672 } else if major_version == 2 {
673 if mmap.len() < 12 {
674 return Err(Error::IndexLoad("NPY v2 file too small".into()));
675 }
676 u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
677 } else {
678 return Err(Error::IndexLoad(format!(
679 "Unsupported NPY version: {}",
680 major_version
681 )));
682 };
683
684 let header_start = if major_version == 1 { 10 } else { 12 };
685 let header_end = header_start + header_len;
686
687 if mmap.len() < header_end {
688 return Err(Error::IndexLoad("NPY header exceeds file size".into()));
689 }
690
691 let header_str = std::str::from_utf8(&mmap[header_start..header_end])
693 .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
694
695 let shape = parse_shape_from_header(header_str)?;
697 let fortran_order = header_str.contains("'fortran_order': True");
698
699 Ok((shape, header_end, fortran_order))
700}
701
702fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
704 let shape_start = header
706 .find("'shape':")
707 .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
708
709 let after_shape = &header[shape_start + 8..];
710 let paren_start = after_shape
711 .find('(')
712 .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
713 let paren_end = after_shape
714 .find(')')
715 .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
716
717 let shape_content = &after_shape[paren_start + 1..paren_end];
718
719 let mut shape = Vec::new();
721 for part in shape_content.split(',') {
722 let trimmed = part.trim();
723 if !trimmed.is_empty() {
724 let dim: usize = trimmed.parse().map_err(|e| {
725 Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
726 })?;
727 shape.push(dim);
728 }
729 }
730
731 Ok(shape)
732}
733
734pub struct MmapNpyArray1I64 {
738 _mmap: Mmap,
739 len: usize,
740 data_offset: usize,
741}
742
743impl MmapNpyArray1I64 {
744 pub fn empty() -> Self {
749 let mmap = MmapMut::map_anon(1)
750 .expect("failed to create anonymous mmap")
751 .make_read_only()
752 .expect("failed to make anonymous mmap read-only");
753 Self {
754 _mmap: mmap,
755 len: 0,
756 data_offset: 0,
757 }
758 }
759
760 pub fn from_npy_file(path: &Path) -> Result<Self> {
762 let file = File::open(path)
763 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
764
765 let mmap = unsafe {
766 Mmap::map(&file).map_err(|e| {
767 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
768 })?
769 };
770
771 let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
772
773 if shape.is_empty() {
774 return Err(Error::IndexLoad("Empty shape in NPY file".into()));
775 }
776
777 let len = shape[0];
778
779 let expected_size = data_offset + len * 8;
781 if mmap.len() < expected_size {
782 return Err(Error::IndexLoad(format!(
783 "NPY file size {} too small for {} elements",
784 mmap.len(),
785 len
786 )));
787 }
788
789 Ok(Self {
790 _mmap: mmap,
791 len,
792 data_offset,
793 })
794 }
795
796 pub fn len(&self) -> usize {
798 self.len
799 }
800
801 pub fn is_empty(&self) -> bool {
803 self.len == 0
804 }
805
806 pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
813 let count = end - start;
814 let mut result = Vec::with_capacity(count);
815
816 for i in start..end {
817 result.push(self.get(i));
818 }
819
820 result
821 }
822
823 pub fn get(&self, idx: usize) -> i64 {
825 let start = self.data_offset + idx * 8;
826 let bytes = &self._mmap[start..start + 8];
827 i64::from_le_bytes(bytes.try_into().unwrap())
828 }
829}
830
831pub struct MmapNpyArray2F32 {
837 _mmap: Mmap,
838 shape: (usize, usize),
839 data_offset: usize,
840}
841
842impl MmapNpyArray2F32 {
843 pub fn from_npy_file(path: &Path) -> Result<Self> {
845 let file = File::open(path)
846 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
847
848 let mmap = unsafe {
849 Mmap::map(&file).map_err(|e| {
850 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
851 })?
852 };
853
854 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
855
856 if shape_vec.len() != 2 {
857 return Err(Error::IndexLoad(format!(
858 "Expected 2D array, got {}D",
859 shape_vec.len()
860 )));
861 }
862
863 let shape = (shape_vec[0], shape_vec[1]);
864
865 let expected_size = data_offset + shape.0 * shape.1 * 4;
867 if mmap.len() < expected_size {
868 return Err(Error::IndexLoad(format!(
869 "NPY file size {} too small for shape {:?}",
870 mmap.len(),
871 shape
872 )));
873 }
874
875 Ok(Self {
876 _mmap: mmap,
877 shape,
878 data_offset,
879 })
880 }
881
882 pub fn shape(&self) -> (usize, usize) {
884 self.shape
885 }
886
887 pub fn nrows(&self) -> usize {
889 self.shape.0
890 }
891
892 pub fn ncols(&self) -> usize {
894 self.shape.1
895 }
896
897 pub fn view(&self) -> ArrayView2<'_, f32> {
901 let byte_start = self.data_offset;
902 let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
903 let bytes = &self._mmap[byte_start..byte_end];
904
905 let data = unsafe {
907 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
908 };
909
910 ArrayView2::from_shape(self.shape, data).unwrap()
911 }
912
913 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
915 let byte_start = self.data_offset + idx * self.shape.1 * 4;
916 let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
917
918 let data =
920 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
921
922 ArrayView1::from_shape(self.shape.1, data).unwrap()
923 }
924
925 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
927 let nrows = end - start;
928 let byte_start = self.data_offset + start * self.shape.1 * 4;
929 let byte_end = self.data_offset + end * self.shape.1 * 4;
930 let bytes = &self._mmap[byte_start..byte_end];
931
932 let data = unsafe {
934 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
935 };
936
937 ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
938 }
939
940 pub fn to_owned(&self) -> Array2<f32> {
944 self.view().to_owned()
945 }
946}
947
948pub struct MmapNpyArray2U8 {
952 _mmap: Mmap,
953 shape: (usize, usize),
954 data_offset: usize,
955}
956
957impl MmapNpyArray2U8 {
958 pub fn empty() -> Self {
963 let mmap = MmapMut::map_anon(1)
964 .expect("failed to create anonymous mmap")
965 .make_read_only()
966 .expect("failed to make anonymous mmap read-only");
967 Self {
968 _mmap: mmap,
969 shape: (0, 0),
970 data_offset: 0,
971 }
972 }
973
974 pub fn from_npy_file(path: &Path) -> Result<Self> {
976 let file = File::open(path)
977 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
978
979 let mmap = unsafe {
980 Mmap::map(&file).map_err(|e| {
981 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
982 })?
983 };
984
985 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
986
987 if shape_vec.len() != 2 {
988 return Err(Error::IndexLoad(format!(
989 "Expected 2D array, got {}D",
990 shape_vec.len()
991 )));
992 }
993
994 let shape = (shape_vec[0], shape_vec[1]);
995
996 let expected_size = data_offset + shape.0 * shape.1;
998 if mmap.len() < expected_size {
999 return Err(Error::IndexLoad(format!(
1000 "NPY file size {} too small for shape {:?}",
1001 mmap.len(),
1002 shape
1003 )));
1004 }
1005
1006 Ok(Self {
1007 _mmap: mmap,
1008 shape,
1009 data_offset,
1010 })
1011 }
1012
1013 pub fn shape(&self) -> (usize, usize) {
1015 self.shape
1016 }
1017
1018 pub fn nrows(&self) -> usize {
1020 self.shape.0
1021 }
1022
1023 pub fn ncols(&self) -> usize {
1025 self.shape.1
1026 }
1027
1028 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
1030 let nrows = end - start;
1031 let byte_start = self.data_offset + start * self.shape.1;
1032 let byte_end = self.data_offset + end * self.shape.1;
1033 let bytes = &self._mmap[byte_start..byte_end];
1034
1035 ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
1036 }
1037
1038 pub fn view(&self) -> ArrayView2<'_, u8> {
1040 self.slice_rows(0, self.shape.0)
1041 }
1042
1043 pub fn row(&self, idx: usize) -> &[u8] {
1045 let byte_start = self.data_offset + idx * self.shape.1;
1046 let byte_end = byte_start + self.shape.1;
1047 &self._mmap[byte_start..byte_end]
1048 }
1049}
1050
1051#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1057pub struct ChunkManifestEntry {
1058 pub rows: usize,
1059 pub mtime: f64,
1060}
1061
1062#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1064pub struct MergeManifest {
1065 pub chunks: HashMap<String, ChunkManifestEntry>,
1067 #[serde(default)]
1069 pub padding_rows: usize,
1070 #[serde(default)]
1072 pub num_chunks: usize,
1073 #[serde(default)]
1075 pub metadata_mtime: f64,
1076 #[serde(default)]
1078 pub total_rows: usize,
1079 #[serde(default)]
1081 pub ncols: usize,
1082}
1083
1084pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
1086
1087fn load_merge_manifest(manifest_path: &Path) -> Option<MergeManifest> {
1090 if manifest_path.exists() {
1091 if let Ok(file) = File::open(manifest_path) {
1092 let reader = BufReader::new(file);
1094 if let Ok(manifest) = serde_json::from_reader::<_, MergeManifest>(reader) {
1095 return Some(manifest);
1096 }
1097 if let Ok(file) = File::open(manifest_path) {
1099 if let Ok(chunks) =
1100 serde_json::from_reader::<_, ChunkManifest>(BufReader::new(file))
1101 {
1102 return Some(MergeManifest {
1104 chunks,
1105 padding_rows: 0,
1106 total_rows: 0,
1107 ncols: 0,
1108 num_chunks: 0,
1109 metadata_mtime: 0.0,
1110 });
1111 }
1112 }
1113 }
1114 }
1115 None
1116}
1117
1118fn save_merge_manifest(manifest_path: &Path, manifest: &MergeManifest) -> Result<()> {
1120 let temp_path = manifest_path.with_extension("manifest.json.tmp");
1121
1122 let file = File::create(&temp_path)
1124 .map_err(|e| Error::IndexLoad(format!("Failed to create temp manifest: {}", e)))?;
1125 let mut writer = BufWriter::new(file);
1126 serde_json::to_writer(&mut writer, manifest)
1127 .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
1128 writer
1129 .flush()
1130 .map_err(|e| Error::IndexLoad(format!("Failed to flush manifest: {}", e)))?;
1131
1132 writer
1134 .into_inner()
1135 .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?
1136 .sync_all()
1137 .map_err(|e| Error::IndexLoad(format!("Failed to sync manifest: {}", e)))?;
1138
1139 fs::rename(&temp_path, manifest_path)
1141 .map_err(|e| Error::IndexLoad(format!("Failed to rename manifest: {}", e)))?;
1142
1143 Ok(())
1144}
1145
1146fn get_mtime(path: &Path) -> Result<f64> {
1148 let metadata = fs::metadata(path)
1149 .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
1150 let mtime = metadata
1151 .modified()
1152 .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
1153 let duration = mtime
1154 .duration_since(std::time::UNIX_EPOCH)
1155 .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
1156 Ok(duration.as_secs_f64())
1157}
1158
1159fn npy_header_layout(header_dict: &str) -> (usize, usize) {
1161 let header_len = header_dict.len();
1162 let padding = (64 - ((10 + header_len) % 64)) % 64;
1163 let total = 10 + header_len + padding + 1; (padding, total)
1165}
1166
1167fn npy_header_dict_1d(len: usize, dtype: &str) -> String {
1168 format!(
1169 "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
1170 dtype, len
1171 )
1172}
1173
1174fn npy_header_dict_2d(nrows: usize, ncols: usize, dtype: &str) -> String {
1175 format!(
1176 "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
1177 dtype, nrows, ncols
1178 )
1179}
1180
1181fn npy_header_size_1d(len: usize, dtype: &str) -> usize {
1183 let dict = npy_header_dict_1d(len, dtype);
1184 npy_header_layout(&dict).1
1185}
1186
1187fn npy_header_size_2d(nrows: usize, ncols: usize, dtype: &str) -> usize {
1189 let dict = npy_header_dict_2d(nrows, ncols, dtype);
1190 npy_header_layout(&dict).1
1191}
1192
1193fn write_npy_header(writer: &mut impl Write, header_dict: &str) -> Result<usize> {
1195 let (padding, total) = npy_header_layout(header_dict);
1196 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1197
1198 writer
1200 .write_all(NPY_MAGIC)
1201 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1202 writer
1203 .write_all(&[1, 0])
1204 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
1205
1206 let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1208 writer
1209 .write_all(&header_len_bytes)
1210 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1211
1212 writer
1214 .write_all(padded_header.as_bytes())
1215 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1216
1217 Ok(total)
1218}
1219
1220fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
1222 write_npy_header(writer, &npy_header_dict_1d(len, dtype))
1223}
1224
1225fn write_npy_header_2d(
1227 writer: &mut impl Write,
1228 nrows: usize,
1229 ncols: usize,
1230 dtype: &str,
1231) -> Result<usize> {
1232 write_npy_header(writer, &npy_header_dict_2d(nrows, ncols, dtype))
1233}
1234
1235struct ChunkInfo {
1237 path: std::path::PathBuf,
1238 filename: String,
1239 rows: usize,
1240 mtime: f64,
1241}
1242
1243pub fn merge_codes_chunks(
1250 index_path: &Path,
1251 num_chunks: usize,
1252 padding_rows: usize,
1253) -> Result<std::path::PathBuf> {
1254 use ndarray_npy::ReadNpyExt;
1255
1256 let merged_path = index_path.join("merged_codes.npy");
1257 let manifest_path = index_path.join("merged_codes.manifest.json");
1258 let temp_path = index_path.join("merged_codes.npy.tmp");
1259 let lock_path = index_path.join("merged_codes.lock");
1260
1261 let metadata_json_path = index_path.join("metadata.json");
1264 let current_metadata_mtime = get_mtime(&metadata_json_path).unwrap_or(0.0);
1265 if let Some(ref manifest) = load_merge_manifest(&manifest_path) {
1266 let mtime_matches = manifest.metadata_mtime > 0.0
1267 && (manifest.metadata_mtime - current_metadata_mtime).abs() < 0.001;
1268 if manifest.num_chunks == num_chunks
1269 && manifest.padding_rows == padding_rows
1270 && manifest.chunks.len() == num_chunks
1271 && manifest.total_rows > 0
1272 && mtime_matches
1273 && merged_path.exists()
1274 {
1275 if let Ok(meta) = std::fs::metadata(&merged_path) {
1276 let expected_size = npy_header_size_1d(manifest.total_rows, "<i8")
1277 + manifest.total_rows * std::mem::size_of::<i64>();
1278 if meta.len() == expected_size as u64 {
1279 return Ok(merged_path);
1280 }
1281 }
1282 }
1283 }
1284
1285 let _lock = FileLockGuard::acquire(&lock_path)?;
1288
1289 let old_manifest = load_merge_manifest(&manifest_path);
1294
1295 let mut chunks: Vec<ChunkInfo> = Vec::new();
1297 let mut total_rows = 0usize;
1298 let mut chain_broken = false;
1299
1300 for i in 0..num_chunks {
1301 let filename = format!("{}.codes.npy", i);
1302 let path = index_path.join(&filename);
1303
1304 if path.exists() {
1305 let mtime = get_mtime(&path)?;
1306
1307 let file = File::open(&path)?;
1309 let arr: Array1<i64> = Array1::read_npy(file)?;
1310 let rows = arr.len();
1311
1312 if rows > 0 {
1313 total_rows += rows;
1314
1315 let is_clean = if let Some(ref manifest) = old_manifest {
1317 manifest
1318 .chunks
1319 .get(&filename)
1320 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1321 } else {
1322 false
1323 };
1324
1325 if !is_clean {
1326 chain_broken = true;
1327 }
1328
1329 chunks.push(ChunkInfo {
1330 path,
1331 filename,
1332 rows,
1333 mtime,
1334 });
1335 }
1336 }
1337 }
1338
1339 if total_rows == 0 {
1340 return Err(Error::IndexLoad("No data to merge".into()));
1341 }
1342
1343 let final_rows = total_rows + padding_rows;
1344
1345 let padding_changed = old_manifest
1351 .as_ref()
1352 .map(|m| m.padding_rows != padding_rows)
1353 .unwrap_or(true);
1354 let total_rows_mismatch = old_manifest
1355 .as_ref()
1356 .map(|m| m.total_rows != final_rows)
1357 .unwrap_or(true);
1358
1359 let needs_full_rewrite =
1360 !merged_path.exists() || chain_broken || padding_changed || total_rows_mismatch;
1361
1362 if needs_full_rewrite {
1363 let file = File::create(&temp_path)
1365 .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1366 let mut writer = BufWriter::new(file);
1367
1368 let header_size = write_npy_header_1d(&mut writer, final_rows, "<i8")?;
1370
1371 let mut written_rows = 0usize;
1373 for chunk in &chunks {
1374 let file = File::open(&chunk.path)?;
1375 let arr: Array1<i64> = Array1::read_npy(file)?;
1376 for &val in arr.iter() {
1377 writer.write_all(&val.to_le_bytes())?;
1378 }
1379 written_rows += arr.len();
1380 }
1381
1382 for _ in 0..padding_rows {
1384 writer.write_all(&0i64.to_le_bytes())?;
1385 }
1386 written_rows += padding_rows;
1387
1388 writer
1390 .flush()
1391 .map_err(|e| Error::IndexLoad(format!("Failed to flush merged file: {}", e)))?;
1392 let file = writer
1393 .into_inner()
1394 .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1395 file.sync_all()
1396 .map_err(|e| Error::IndexLoad(format!("Failed to sync merged file to disk: {}", e)))?;
1397
1398 let expected_size = header_size + written_rows * 8;
1400 let actual_size = fs::metadata(&temp_path)
1401 .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1402 .len() as usize;
1403
1404 if actual_size != expected_size {
1405 let _ = fs::remove_file(&temp_path);
1407 return Err(Error::IndexLoad(format!(
1408 "Merged codes file size mismatch: expected {} bytes, got {} bytes",
1409 expected_size, actual_size
1410 )));
1411 }
1412
1413 fs::rename(&temp_path, &merged_path)
1415 .map_err(|e| Error::IndexLoad(format!("Failed to rename merged file: {}", e)))?;
1416 } else {
1417 if merged_path.exists() {
1419 let file_size = fs::metadata(&merged_path)
1420 .map_err(|e| {
1421 Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1422 })?
1423 .len() as usize;
1424
1425 let min_expected_size = 64 + final_rows * 8;
1427 if file_size < min_expected_size {
1428 let _ = fs::remove_file(&merged_path);
1430 let _ = fs::remove_file(&manifest_path);
1431 drop(_lock);
1433 return merge_codes_chunks(index_path, num_chunks, padding_rows);
1434 }
1435 }
1436 }
1437
1438 let mut chunk_map = HashMap::new();
1440 for chunk in &chunks {
1441 chunk_map.insert(
1442 chunk.filename.clone(),
1443 ChunkManifestEntry {
1444 rows: chunk.rows,
1445 mtime: chunk.mtime,
1446 },
1447 );
1448 }
1449 let new_manifest = MergeManifest {
1450 chunks: chunk_map,
1451 padding_rows,
1452 total_rows: final_rows,
1453 ncols: 0, num_chunks,
1455 metadata_mtime: current_metadata_mtime,
1456 };
1457 save_merge_manifest(&manifest_path, &new_manifest)?;
1458
1459 Ok(merged_path)
1460}
1461
1462pub fn merge_residuals_chunks(
1467 index_path: &Path,
1468 num_chunks: usize,
1469 padding_rows: usize,
1470) -> Result<std::path::PathBuf> {
1471 use ndarray_npy::ReadNpyExt;
1472
1473 let merged_path = index_path.join("merged_residuals.npy");
1474 let manifest_path = index_path.join("merged_residuals.manifest.json");
1475 let temp_path = index_path.join("merged_residuals.npy.tmp");
1476 let lock_path = index_path.join("merged_residuals.lock");
1477
1478 let metadata_json_path = index_path.join("metadata.json");
1481 let current_metadata_mtime = get_mtime(&metadata_json_path).unwrap_or(0.0);
1482 if let Some(ref manifest) = load_merge_manifest(&manifest_path) {
1483 if manifest.num_chunks == num_chunks
1484 && manifest.padding_rows == padding_rows
1485 && manifest.chunks.len() == num_chunks
1486 && manifest.total_rows > 0
1487 && manifest.ncols > 0
1488 && manifest.metadata_mtime > 0.0
1489 && (manifest.metadata_mtime - current_metadata_mtime).abs() < 0.001
1490 && merged_path.exists()
1491 {
1492 if let Ok(meta) = std::fs::metadata(&merged_path) {
1493 let expected_size = npy_header_size_2d(manifest.total_rows, manifest.ncols, "|u1")
1494 + manifest.total_rows * manifest.ncols;
1495 if meta.len() == expected_size as u64 {
1496 return Ok(merged_path);
1497 }
1498 }
1499 }
1500 }
1501
1502 let _lock = FileLockGuard::acquire(&lock_path)?;
1505
1506 let old_manifest = load_merge_manifest(&manifest_path);
1511
1512 let mut chunks: Vec<ChunkInfo> = Vec::new();
1514 let mut total_rows = 0usize;
1515 let mut ncols = 0usize;
1516 let mut chain_broken = false;
1517
1518 for i in 0..num_chunks {
1519 let filename = format!("{}.residuals.npy", i);
1520 let path = index_path.join(&filename);
1521
1522 if path.exists() {
1523 let mtime = get_mtime(&path)?;
1524
1525 let file = File::open(&path)?;
1527 let arr: Array2<u8> = Array2::read_npy(file)?;
1528 let rows = arr.nrows();
1529 ncols = arr.ncols();
1530
1531 if rows > 0 {
1532 total_rows += rows;
1533
1534 let is_clean = if let Some(ref manifest) = old_manifest {
1535 manifest
1536 .chunks
1537 .get(&filename)
1538 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1539 } else {
1540 false
1541 };
1542
1543 if !is_clean {
1544 chain_broken = true;
1545 }
1546
1547 chunks.push(ChunkInfo {
1548 path,
1549 filename,
1550 rows,
1551 mtime,
1552 });
1553 }
1554 }
1555 }
1556
1557 if total_rows == 0 || ncols == 0 {
1558 return Err(Error::IndexLoad("No residual data to merge".into()));
1559 }
1560
1561 let final_rows = total_rows + padding_rows;
1562
1563 let padding_changed = old_manifest
1569 .as_ref()
1570 .map(|m| m.padding_rows != padding_rows)
1571 .unwrap_or(true);
1572 let total_rows_mismatch = old_manifest
1573 .as_ref()
1574 .map(|m| m.total_rows != final_rows)
1575 .unwrap_or(true);
1576 let ncols_mismatch = old_manifest
1577 .as_ref()
1578 .map(|m| m.ncols != ncols && m.ncols != 0)
1579 .unwrap_or(false);
1580
1581 let needs_full_rewrite = !merged_path.exists()
1582 || chain_broken
1583 || padding_changed
1584 || total_rows_mismatch
1585 || ncols_mismatch;
1586
1587 if needs_full_rewrite {
1588 let file = File::create(&temp_path)
1590 .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1591 let mut writer = BufWriter::new(file);
1592
1593 let header_size = write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1595
1596 let mut written_rows = 0usize;
1598 for chunk in &chunks {
1599 let file = File::open(&chunk.path)?;
1600 let arr: Array2<u8> = Array2::read_npy(file)?;
1601 for row in arr.rows() {
1602 writer.write_all(row.as_slice().unwrap())?;
1603 }
1604 written_rows += arr.nrows();
1605 }
1606
1607 let zero_row = vec![0u8; ncols];
1609 for _ in 0..padding_rows {
1610 writer.write_all(&zero_row)?;
1611 }
1612 written_rows += padding_rows;
1613
1614 writer
1616 .flush()
1617 .map_err(|e| Error::IndexLoad(format!("Failed to flush merged residuals: {}", e)))?;
1618 let file = writer
1619 .into_inner()
1620 .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1621 file.sync_all().map_err(|e| {
1622 Error::IndexLoad(format!("Failed to sync merged residuals to disk: {}", e))
1623 })?;
1624
1625 let expected_size = header_size + written_rows * ncols;
1627 let actual_size = fs::metadata(&temp_path)
1628 .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1629 .len() as usize;
1630
1631 if actual_size != expected_size {
1632 let _ = fs::remove_file(&temp_path);
1634 return Err(Error::IndexLoad(format!(
1635 "Merged residuals file size mismatch: expected {} bytes, got {} bytes",
1636 expected_size, actual_size
1637 )));
1638 }
1639
1640 fs::rename(&temp_path, &merged_path)
1642 .map_err(|e| Error::IndexLoad(format!("Failed to rename merged residuals: {}", e)))?;
1643 } else {
1644 if merged_path.exists() {
1646 let file_size = fs::metadata(&merged_path)
1647 .map_err(|e| {
1648 Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1649 })?
1650 .len() as usize;
1651
1652 let min_expected_size = 64 + final_rows * ncols;
1654 if file_size < min_expected_size {
1655 let _ = fs::remove_file(&merged_path);
1657 let _ = fs::remove_file(&manifest_path);
1658 drop(_lock);
1660 return merge_residuals_chunks(index_path, num_chunks, padding_rows);
1661 }
1662 }
1663 }
1664
1665 let mut chunk_map = HashMap::new();
1667 for chunk in &chunks {
1668 chunk_map.insert(
1669 chunk.filename.clone(),
1670 ChunkManifestEntry {
1671 rows: chunk.rows,
1672 mtime: chunk.mtime,
1673 },
1674 );
1675 }
1676 let new_manifest = MergeManifest {
1677 chunks: chunk_map,
1678 padding_rows,
1679 total_rows: final_rows,
1680 ncols,
1681 num_chunks,
1682 metadata_mtime: current_metadata_mtime,
1683 };
1684 save_merge_manifest(&manifest_path, &new_manifest)?;
1685
1686 Ok(merged_path)
1687}
1688
1689pub fn clear_merged_files(index_path: &Path) -> Result<()> {
1698 let codes_lock_path = index_path.join("merged_codes.lock");
1702 let residuals_lock_path = index_path.join("merged_residuals.lock");
1703 let _codes_lock = FileLockGuard::acquire(&codes_lock_path)?;
1704 let _residuals_lock = FileLockGuard::acquire(&residuals_lock_path)?;
1705
1706 let files_to_remove = [
1707 "merged_codes.npy",
1708 "merged_codes.npy.tmp",
1709 "merged_codes.manifest.json",
1710 "merged_codes.manifest.json.tmp",
1711 "merged_residuals.npy",
1712 "merged_residuals.npy.tmp",
1713 "merged_residuals.manifest.json",
1714 "merged_residuals.manifest.json.tmp",
1715 ];
1716
1717 for filename in files_to_remove {
1718 let path = index_path.join(filename);
1719 if path.exists() {
1720 fs::remove_file(&path)
1721 .map_err(|e| Error::IndexLoad(format!("Failed to remove {}: {}", filename, e)))?;
1722 }
1723 }
1724
1725 Ok(())
1726}
1727
1728pub fn convert_fastplaid_to_nextplaid(index_path: &Path) -> Result<bool> {
1741 let mut converted = false;
1742
1743 let float_files = [
1745 "centroids.npy",
1746 "avg_residual.npy",
1747 "bucket_cutoffs.npy",
1748 "bucket_weights.npy",
1749 ];
1750
1751 for filename in float_files {
1752 let path = index_path.join(filename);
1753 if path.exists() {
1754 let dtype = detect_npy_dtype(&path)?;
1755 if dtype == "<f2" {
1756 eprintln!(" Converting {} from float16 to float32", filename);
1757 convert_f16_to_f32_npy(&path)?;
1758 converted = true;
1759 }
1760 }
1761 }
1762
1763 let ivf_lengths_path = index_path.join("ivf_lengths.npy");
1765 if ivf_lengths_path.exists() {
1766 let dtype = detect_npy_dtype(&ivf_lengths_path)?;
1767 if dtype == "<i8" {
1768 eprintln!(" Converting ivf_lengths.npy from int64 to int32");
1769 convert_i64_to_i32_npy(&ivf_lengths_path)?;
1770 converted = true;
1771 }
1772 }
1773
1774 for entry in fs::read_dir(index_path)? {
1777 let entry = entry?;
1778 let filename = entry.file_name().to_string_lossy().to_string();
1779 if filename.ends_with(".residuals.npy") {
1780 let path = entry.path();
1781 let dtype = detect_npy_dtype(&path)?;
1782 if dtype == "<u1" {
1783 eprintln!(
1784 " Normalizing {} dtype descriptor from <u1 to |u1",
1785 filename
1786 );
1787 normalize_u8_npy(&path)?;
1788 converted = true;
1789 }
1790 }
1791 }
1792
1793 Ok(converted)
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798 use super::*;
1799 use std::io::Write;
1800 use tempfile::NamedTempFile;
1801
1802 #[test]
1803 fn test_mmap_array2_f32() {
1804 let mut file = NamedTempFile::new().unwrap();
1806
1807 file.write_all(&3i64.to_le_bytes()).unwrap();
1809 file.write_all(&2i64.to_le_bytes()).unwrap();
1810
1811 for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1813 file.write_all(&val.to_le_bytes()).unwrap();
1814 }
1815
1816 file.flush().unwrap();
1817
1818 let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1820 assert_eq!(mmap.shape(), (3, 2));
1821
1822 let row0 = mmap.row(0);
1823 assert_eq!(row0[0], 1.0);
1824 assert_eq!(row0[1], 2.0);
1825
1826 let owned = mmap.to_owned();
1827 assert_eq!(owned[[2, 0]], 5.0);
1828 assert_eq!(owned[[2, 1]], 6.0);
1829 }
1830
1831 #[test]
1832 fn test_mmap_array1_i64() {
1833 let mut file = NamedTempFile::new().unwrap();
1834
1835 file.write_all(&4i64.to_le_bytes()).unwrap();
1837
1838 for val in [10i64, 20, 30, 40] {
1840 file.write_all(&val.to_le_bytes()).unwrap();
1841 }
1842
1843 file.flush().unwrap();
1844
1845 let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1846 assert_eq!(mmap.len(), 4);
1847 assert_eq!(mmap.get(0), 10);
1848 assert_eq!(mmap.get(3), 40);
1849
1850 let owned = mmap.to_owned();
1851 assert_eq!(owned[1], 20);
1852 assert_eq!(owned[2], 30);
1853 }
1854
1855 #[test]
1856 fn test_write_read_roundtrip() {
1857 let file = NamedTempFile::new().unwrap();
1858 let path = file.path();
1859
1860 let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1862
1863 write_array2_f32(&array, path).unwrap();
1865
1866 let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1868 let loaded = mmap.to_owned();
1869
1870 assert_eq!(array, loaded);
1871 }
1872}