1use std::fs::File;
11use std::path::Path;
12
13#[cfg(feature = "npy")]
14use std::collections::HashMap;
15#[cfg(feature = "npy")]
16use std::fs;
17#[cfg(feature = "npy")]
18use std::io::{BufReader, BufWriter, Write};
19
20use byteorder::{LittleEndian, ReadBytesExt};
21use memmap2::Mmap;
22use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
23
24use crate::error::{Error, Result};
25
26pub struct MmapArray2F32 {
30 _mmap: Mmap,
31 shape: (usize, usize),
32 data_offset: usize,
33}
34
35impl MmapArray2F32 {
36 pub fn from_raw_file(path: &Path) -> Result<Self> {
43 let file = File::open(path)
44 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
45
46 let mmap = unsafe {
47 Mmap::map(&file)
48 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
49 };
50
51 if mmap.len() < 16 {
52 return Err(Error::IndexLoad("File too small for header".into()));
53 }
54
55 let mut cursor = std::io::Cursor::new(&mmap[..16]);
57 let nrows = cursor
58 .read_i64::<LittleEndian>()
59 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
60 as usize;
61 let ncols = cursor
62 .read_i64::<LittleEndian>()
63 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
64 as usize;
65
66 let expected_size = 16 + nrows * ncols * 4;
67 if mmap.len() < expected_size {
68 return Err(Error::IndexLoad(format!(
69 "File size {} too small for shape ({}, {})",
70 mmap.len(),
71 nrows,
72 ncols
73 )));
74 }
75
76 Ok(Self {
77 _mmap: mmap,
78 shape: (nrows, ncols),
79 data_offset: 16,
80 })
81 }
82
83 pub fn shape(&self) -> (usize, usize) {
85 self.shape
86 }
87
88 pub fn nrows(&self) -> usize {
90 self.shape.0
91 }
92
93 pub fn ncols(&self) -> usize {
95 self.shape.1
96 }
97
98 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
100 let start = self.data_offset + idx * self.shape.1 * 4;
101 let bytes = &self._mmap[start..start + self.shape.1 * 4];
102
103 let data =
105 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
106
107 ArrayView1::from_shape(self.shape.1, data).unwrap()
108 }
109
110 pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
112 let nrows = end - start;
113 let byte_start = self.data_offset + start * self.shape.1 * 4;
114 let byte_end = self.data_offset + end * self.shape.1 * 4;
115 let bytes = &self._mmap[byte_start..byte_end];
116
117 let data = unsafe {
119 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
120 };
121
122 Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
123 }
124
125 pub fn to_owned(&self) -> Array2<f32> {
127 self.load_rows(0, self.shape.0)
128 }
129}
130
131pub struct MmapArray2U8 {
133 _mmap: Mmap,
134 shape: (usize, usize),
135 data_offset: usize,
136}
137
138impl MmapArray2U8 {
139 pub fn from_raw_file(path: &Path) -> Result<Self> {
141 let file = File::open(path)
142 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
143
144 let mmap = unsafe {
145 Mmap::map(&file)
146 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
147 };
148
149 if mmap.len() < 16 {
150 return Err(Error::IndexLoad("File too small for header".into()));
151 }
152
153 let mut cursor = std::io::Cursor::new(&mmap[..16]);
154 let nrows = cursor
155 .read_i64::<LittleEndian>()
156 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
157 as usize;
158 let ncols = cursor
159 .read_i64::<LittleEndian>()
160 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
161 as usize;
162
163 let expected_size = 16 + nrows * ncols;
164 if mmap.len() < expected_size {
165 return Err(Error::IndexLoad(format!(
166 "File size {} too small for shape ({}, {})",
167 mmap.len(),
168 nrows,
169 ncols
170 )));
171 }
172
173 Ok(Self {
174 _mmap: mmap,
175 shape: (nrows, ncols),
176 data_offset: 16,
177 })
178 }
179
180 pub fn shape(&self) -> (usize, usize) {
182 self.shape
183 }
184
185 pub fn view(&self) -> ArrayView2<'_, u8> {
187 let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
188 ArrayView2::from_shape(self.shape, bytes).unwrap()
189 }
190
191 pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
193 let nrows = end - start;
194 let byte_start = self.data_offset + start * self.shape.1;
195 let byte_end = self.data_offset + end * self.shape.1;
196 let bytes = &self._mmap[byte_start..byte_end];
197
198 Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
199 }
200
201 pub fn to_owned(&self) -> Array2<u8> {
203 self.load_rows(0, self.shape.0)
204 }
205}
206
207pub struct MmapArray1I64 {
209 _mmap: Mmap,
210 len: usize,
211 data_offset: usize,
212}
213
214impl MmapArray1I64 {
215 pub fn from_raw_file(path: &Path) -> Result<Self> {
217 let file = File::open(path)
218 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
219
220 let mmap = unsafe {
221 Mmap::map(&file)
222 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
223 };
224
225 if mmap.len() < 8 {
226 return Err(Error::IndexLoad("File too small for header".into()));
227 }
228
229 let mut cursor = std::io::Cursor::new(&mmap[..8]);
230 let len = cursor
231 .read_i64::<LittleEndian>()
232 .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
233 as usize;
234
235 let expected_size = 8 + len * 8;
236 if mmap.len() < expected_size {
237 return Err(Error::IndexLoad(format!(
238 "File size {} too small for length {}",
239 mmap.len(),
240 len
241 )));
242 }
243
244 Ok(Self {
245 _mmap: mmap,
246 len,
247 data_offset: 8,
248 })
249 }
250
251 pub fn len(&self) -> usize {
253 self.len
254 }
255
256 pub fn is_empty(&self) -> bool {
258 self.len == 0
259 }
260
261 pub fn get(&self, idx: usize) -> i64 {
263 let start = self.data_offset + idx * 8;
264 let bytes = &self._mmap[start..start + 8];
265 i64::from_le_bytes(bytes.try_into().unwrap())
266 }
267
268 pub fn to_owned(&self) -> Array1<i64> {
270 let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
271
272 let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
274
275 Array1::from_vec(data.to_vec())
276 }
277}
278
279pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
281 use std::io::Write;
282
283 let file = File::create(path)
284 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
285 let mut writer = std::io::BufWriter::new(file);
286
287 let nrows = array.nrows() as i64;
288 let ncols = array.ncols() as i64;
289
290 writer
291 .write_all(&nrows.to_le_bytes())
292 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
293 writer
294 .write_all(&ncols.to_le_bytes())
295 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
296
297 for val in array.iter() {
298 writer
299 .write_all(&val.to_le_bytes())
300 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
301 }
302
303 writer
304 .flush()
305 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
306
307 Ok(())
308}
309
310pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
312 use std::io::Write;
313
314 let file = File::create(path)
315 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
316 let mut writer = std::io::BufWriter::new(file);
317
318 let nrows = array.nrows() as i64;
319 let ncols = array.ncols() as i64;
320
321 writer
322 .write_all(&nrows.to_le_bytes())
323 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
324 writer
325 .write_all(&ncols.to_le_bytes())
326 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
327
328 for row in array.rows() {
329 writer
330 .write_all(row.as_slice().unwrap())
331 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
332 }
333
334 writer
335 .flush()
336 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
337
338 Ok(())
339}
340
341pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
343 use std::io::Write;
344
345 let file = File::create(path)
346 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
347 let mut writer = std::io::BufWriter::new(file);
348
349 let len = array.len() as i64;
350
351 writer
352 .write_all(&len.to_le_bytes())
353 .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
354
355 for val in array.iter() {
356 writer
357 .write_all(&val.to_le_bytes())
358 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
359 }
360
361 writer
362 .flush()
363 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
364
365 Ok(())
366}
367
368const NPY_MAGIC: &[u8] = b"\x93NUMPY";
374
375fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
377 if mmap.len() < 10 {
378 return Err(Error::IndexLoad("NPY file too small".into()));
379 }
380
381 if &mmap[..6] != NPY_MAGIC {
383 return Err(Error::IndexLoad("Invalid NPY magic".into()));
384 }
385
386 let major_version = mmap[6];
387 let _minor_version = mmap[7];
388
389 let header_len = if major_version == 1 {
391 u16::from_le_bytes([mmap[8], mmap[9]]) as usize
392 } else if major_version == 2 {
393 if mmap.len() < 12 {
394 return Err(Error::IndexLoad("NPY v2 file too small".into()));
395 }
396 u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
397 } else {
398 return Err(Error::IndexLoad(format!(
399 "Unsupported NPY version: {}",
400 major_version
401 )));
402 };
403
404 let header_start = if major_version == 1 { 10 } else { 12 };
405 let header_end = header_start + header_len;
406
407 if mmap.len() < header_end {
408 return Err(Error::IndexLoad("NPY header exceeds file size".into()));
409 }
410
411 let header_str = std::str::from_utf8(&mmap[header_start..header_end])
413 .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
414
415 let shape = parse_shape_from_header(header_str)?;
417 let fortran_order = header_str.contains("'fortran_order': True");
418
419 Ok((shape, header_end, fortran_order))
420}
421
422fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
424 let shape_start = header
426 .find("'shape':")
427 .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
428
429 let after_shape = &header[shape_start + 8..];
430 let paren_start = after_shape
431 .find('(')
432 .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
433 let paren_end = after_shape
434 .find(')')
435 .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
436
437 let shape_content = &after_shape[paren_start + 1..paren_end];
438
439 let mut shape = Vec::new();
441 for part in shape_content.split(',') {
442 let trimmed = part.trim();
443 if !trimmed.is_empty() {
444 let dim: usize = trimmed.parse().map_err(|e| {
445 Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
446 })?;
447 shape.push(dim);
448 }
449 }
450
451 Ok(shape)
452}
453
454pub struct MmapNpyArray1I64 {
458 _mmap: Mmap,
459 len: usize,
460 data_offset: usize,
461}
462
463impl MmapNpyArray1I64 {
464 pub fn from_npy_file(path: &Path) -> Result<Self> {
466 let file = File::open(path)
467 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
468
469 let mmap = unsafe {
470 Mmap::map(&file).map_err(|e| {
471 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
472 })?
473 };
474
475 let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
476
477 if shape.is_empty() {
478 return Err(Error::IndexLoad("Empty shape in NPY file".into()));
479 }
480
481 let len = shape[0];
482
483 let expected_size = data_offset + len * 8;
485 if mmap.len() < expected_size {
486 return Err(Error::IndexLoad(format!(
487 "NPY file size {} too small for {} elements",
488 mmap.len(),
489 len
490 )));
491 }
492
493 Ok(Self {
494 _mmap: mmap,
495 len,
496 data_offset,
497 })
498 }
499
500 pub fn len(&self) -> usize {
502 self.len
503 }
504
505 pub fn is_empty(&self) -> bool {
507 self.len == 0
508 }
509
510 pub fn slice(&self, start: usize, end: usize) -> &[i64] {
515 let byte_start = self.data_offset + start * 8;
516 let byte_end = self.data_offset + end * 8;
517 let bytes = &self._mmap[byte_start..byte_end];
518
519 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, end - start) }
521 }
522
523 pub fn get(&self, idx: usize) -> i64 {
525 let start = self.data_offset + idx * 8;
526 let bytes = &self._mmap[start..start + 8];
527 i64::from_le_bytes(bytes.try_into().unwrap())
528 }
529}
530
531pub struct MmapNpyArray2F32 {
537 _mmap: Mmap,
538 shape: (usize, usize),
539 data_offset: usize,
540}
541
542impl MmapNpyArray2F32 {
543 pub fn from_npy_file(path: &Path) -> Result<Self> {
545 let file = File::open(path)
546 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
547
548 let mmap = unsafe {
549 Mmap::map(&file).map_err(|e| {
550 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
551 })?
552 };
553
554 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
555
556 if shape_vec.len() != 2 {
557 return Err(Error::IndexLoad(format!(
558 "Expected 2D array, got {}D",
559 shape_vec.len()
560 )));
561 }
562
563 let shape = (shape_vec[0], shape_vec[1]);
564
565 let expected_size = data_offset + shape.0 * shape.1 * 4;
567 if mmap.len() < expected_size {
568 return Err(Error::IndexLoad(format!(
569 "NPY file size {} too small for shape {:?}",
570 mmap.len(),
571 shape
572 )));
573 }
574
575 Ok(Self {
576 _mmap: mmap,
577 shape,
578 data_offset,
579 })
580 }
581
582 pub fn shape(&self) -> (usize, usize) {
584 self.shape
585 }
586
587 pub fn nrows(&self) -> usize {
589 self.shape.0
590 }
591
592 pub fn ncols(&self) -> usize {
594 self.shape.1
595 }
596
597 pub fn view(&self) -> ArrayView2<'_, f32> {
601 let byte_start = self.data_offset;
602 let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
603 let bytes = &self._mmap[byte_start..byte_end];
604
605 let data = unsafe {
607 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
608 };
609
610 ArrayView2::from_shape(self.shape, data).unwrap()
611 }
612
613 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
615 let byte_start = self.data_offset + idx * self.shape.1 * 4;
616 let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
617
618 let data =
620 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
621
622 ArrayView1::from_shape(self.shape.1, data).unwrap()
623 }
624
625 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
627 let nrows = end - start;
628 let byte_start = self.data_offset + start * self.shape.1 * 4;
629 let byte_end = self.data_offset + end * self.shape.1 * 4;
630 let bytes = &self._mmap[byte_start..byte_end];
631
632 let data = unsafe {
634 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
635 };
636
637 ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
638 }
639
640 pub fn to_owned(&self) -> Array2<f32> {
644 self.view().to_owned()
645 }
646}
647
648pub struct MmapNpyArray2U8 {
652 _mmap: Mmap,
653 shape: (usize, usize),
654 data_offset: usize,
655}
656
657impl MmapNpyArray2U8 {
658 pub fn from_npy_file(path: &Path) -> Result<Self> {
660 let file = File::open(path)
661 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
662
663 let mmap = unsafe {
664 Mmap::map(&file).map_err(|e| {
665 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
666 })?
667 };
668
669 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
670
671 if shape_vec.len() != 2 {
672 return Err(Error::IndexLoad(format!(
673 "Expected 2D array, got {}D",
674 shape_vec.len()
675 )));
676 }
677
678 let shape = (shape_vec[0], shape_vec[1]);
679
680 let expected_size = data_offset + shape.0 * shape.1;
682 if mmap.len() < expected_size {
683 return Err(Error::IndexLoad(format!(
684 "NPY file size {} too small for shape {:?}",
685 mmap.len(),
686 shape
687 )));
688 }
689
690 Ok(Self {
691 _mmap: mmap,
692 shape,
693 data_offset,
694 })
695 }
696
697 pub fn shape(&self) -> (usize, usize) {
699 self.shape
700 }
701
702 pub fn nrows(&self) -> usize {
704 self.shape.0
705 }
706
707 pub fn ncols(&self) -> usize {
709 self.shape.1
710 }
711
712 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
714 let nrows = end - start;
715 let byte_start = self.data_offset + start * self.shape.1;
716 let byte_end = self.data_offset + end * self.shape.1;
717 let bytes = &self._mmap[byte_start..byte_end];
718
719 ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
720 }
721
722 pub fn view(&self) -> ArrayView2<'_, u8> {
724 self.slice_rows(0, self.shape.0)
725 }
726
727 pub fn row(&self, idx: usize) -> &[u8] {
729 let byte_start = self.data_offset + idx * self.shape.1;
730 let byte_end = byte_start + self.shape.1;
731 &self._mmap[byte_start..byte_end]
732 }
733}
734
735#[cfg(feature = "npy")]
741#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
742pub struct ChunkManifestEntry {
743 pub rows: usize,
744 pub mtime: f64,
745}
746
747#[cfg(feature = "npy")]
749pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
750
751#[cfg(feature = "npy")]
753fn load_manifest(manifest_path: &Path) -> Option<ChunkManifest> {
754 if manifest_path.exists() {
755 if let Ok(file) = File::open(manifest_path) {
756 if let Ok(manifest) = serde_json::from_reader(BufReader::new(file)) {
757 return Some(manifest);
758 }
759 }
760 }
761 None
762}
763
764#[cfg(feature = "npy")]
766fn save_manifest(manifest_path: &Path, manifest: &ChunkManifest) -> Result<()> {
767 let file = File::create(manifest_path)
768 .map_err(|e| Error::IndexLoad(format!("Failed to create manifest: {}", e)))?;
769 serde_json::to_writer(BufWriter::new(file), manifest)
770 .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
771 Ok(())
772}
773
774#[cfg(feature = "npy")]
776fn get_mtime(path: &Path) -> Result<f64> {
777 let metadata = fs::metadata(path)
778 .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
779 let mtime = metadata
780 .modified()
781 .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
782 let duration = mtime
783 .duration_since(std::time::UNIX_EPOCH)
784 .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
785 Ok(duration.as_secs_f64())
786}
787
788#[cfg(feature = "npy")]
790fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
791 let header_dict = format!(
793 "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
794 dtype, len
795 );
796
797 let header_len = header_dict.len();
799 let padding = (64 - ((10 + header_len) % 64)) % 64;
800 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
801
802 writer
804 .write_all(NPY_MAGIC)
805 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
806 writer
807 .write_all(&[1, 0])
808 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?; let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
812 writer
813 .write_all(&header_len_bytes)
814 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
815
816 writer
818 .write_all(padded_header.as_bytes())
819 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
820
821 Ok(10 + padded_header.len())
822}
823
824#[cfg(feature = "npy")]
826fn write_npy_header_2d(
827 writer: &mut impl Write,
828 nrows: usize,
829 ncols: usize,
830 dtype: &str,
831) -> Result<usize> {
832 let header_dict = format!(
834 "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
835 dtype, nrows, ncols
836 );
837
838 let header_len = header_dict.len();
840 let padding = (64 - ((10 + header_len) % 64)) % 64;
841 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
842
843 writer
845 .write_all(NPY_MAGIC)
846 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
847 writer
848 .write_all(&[1, 0])
849 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
850
851 let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
853 writer
854 .write_all(&header_len_bytes)
855 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
856
857 writer
859 .write_all(padded_header.as_bytes())
860 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
861
862 Ok(10 + padded_header.len())
863}
864
865#[cfg(feature = "npy")]
867struct ChunkInfo {
868 path: std::path::PathBuf,
869 filename: String,
870 rows: usize,
871 mtime: f64,
872 #[allow(dead_code)]
873 needs_write: bool,
874}
875
876#[cfg(feature = "npy")]
881pub fn merge_codes_chunks(
882 index_path: &Path,
883 num_chunks: usize,
884 padding_rows: usize,
885) -> Result<std::path::PathBuf> {
886 use ndarray_npy::ReadNpyExt;
887
888 let merged_path = index_path.join("merged_codes.npy");
889 let manifest_path = index_path.join("merged_codes.manifest.json");
890
891 let old_manifest = load_manifest(&manifest_path);
893
894 let mut chunks: Vec<ChunkInfo> = Vec::new();
896 let mut total_rows = 0usize;
897 let mut chain_broken = false;
898
899 for i in 0..num_chunks {
900 let filename = format!("{}.codes.npy", i);
901 let path = index_path.join(&filename);
902
903 if path.exists() {
904 let mtime = get_mtime(&path)?;
905
906 let file = File::open(&path)?;
908 let arr: Array1<i64> = Array1::read_npy(file)?;
909 let rows = arr.len();
910
911 if rows > 0 {
912 total_rows += rows;
913
914 let is_clean = if let Some(ref manifest) = old_manifest {
916 manifest
917 .get(&filename)
918 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
919 } else {
920 false
921 };
922
923 let needs_write = if chain_broken || !is_clean {
924 chain_broken = true;
925 true
926 } else {
927 false
928 };
929
930 chunks.push(ChunkInfo {
931 path,
932 filename,
933 rows,
934 mtime,
935 needs_write,
936 });
937 }
938 }
939 }
940
941 if total_rows == 0 {
942 return Err(Error::IndexLoad("No data to merge".into()));
943 }
944
945 let final_rows = total_rows + padding_rows;
946
947 let needs_full_rewrite = !merged_path.exists() || chain_broken;
949
950 if needs_full_rewrite {
951 let file = File::create(&merged_path)?;
953 let mut writer = BufWriter::new(file);
954
955 write_npy_header_1d(&mut writer, final_rows, "<i8")?;
957
958 for chunk in &chunks {
960 let file = File::open(&chunk.path)?;
961 let arr: Array1<i64> = Array1::read_npy(file)?;
962 for &val in arr.iter() {
963 writer.write_all(&val.to_le_bytes())?;
964 }
965 }
966
967 for _ in 0..padding_rows {
969 writer.write_all(&0i64.to_le_bytes())?;
970 }
971
972 writer.flush()?;
973 }
974
975 let mut new_manifest = ChunkManifest::new();
977 for chunk in &chunks {
978 new_manifest.insert(
979 chunk.filename.clone(),
980 ChunkManifestEntry {
981 rows: chunk.rows,
982 mtime: chunk.mtime,
983 },
984 );
985 }
986 save_manifest(&manifest_path, &new_manifest)?;
987
988 Ok(merged_path)
989}
990
991#[cfg(feature = "npy")]
993pub fn merge_residuals_chunks(
994 index_path: &Path,
995 num_chunks: usize,
996 padding_rows: usize,
997) -> Result<std::path::PathBuf> {
998 use ndarray_npy::ReadNpyExt;
999
1000 let merged_path = index_path.join("merged_residuals.npy");
1001 let manifest_path = index_path.join("merged_residuals.manifest.json");
1002
1003 let old_manifest = load_manifest(&manifest_path);
1005
1006 let mut chunks: Vec<ChunkInfo> = Vec::new();
1008 let mut total_rows = 0usize;
1009 let mut ncols = 0usize;
1010 let mut chain_broken = false;
1011
1012 for i in 0..num_chunks {
1013 let filename = format!("{}.residuals.npy", i);
1014 let path = index_path.join(&filename);
1015
1016 if path.exists() {
1017 let mtime = get_mtime(&path)?;
1018
1019 let file = File::open(&path)?;
1021 let arr: Array2<u8> = Array2::read_npy(file)?;
1022 let rows = arr.nrows();
1023 ncols = arr.ncols();
1024
1025 if rows > 0 {
1026 total_rows += rows;
1027
1028 let is_clean = if let Some(ref manifest) = old_manifest {
1029 manifest
1030 .get(&filename)
1031 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1032 } else {
1033 false
1034 };
1035
1036 let needs_write = if chain_broken || !is_clean {
1037 chain_broken = true;
1038 true
1039 } else {
1040 false
1041 };
1042
1043 chunks.push(ChunkInfo {
1044 path,
1045 filename,
1046 rows,
1047 mtime,
1048 needs_write,
1049 });
1050 }
1051 }
1052 }
1053
1054 if total_rows == 0 || ncols == 0 {
1055 return Err(Error::IndexLoad("No residual data to merge".into()));
1056 }
1057
1058 let final_rows = total_rows + padding_rows;
1059
1060 let needs_full_rewrite = !merged_path.exists() || chain_broken;
1061
1062 if needs_full_rewrite {
1063 let file = File::create(&merged_path)?;
1064 let mut writer = BufWriter::new(file);
1065
1066 write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1068
1069 for chunk in &chunks {
1071 let file = File::open(&chunk.path)?;
1072 let arr: Array2<u8> = Array2::read_npy(file)?;
1073 for row in arr.rows() {
1074 writer.write_all(row.as_slice().unwrap())?;
1075 }
1076 }
1077
1078 let zero_row = vec![0u8; ncols];
1080 for _ in 0..padding_rows {
1081 writer.write_all(&zero_row)?;
1082 }
1083
1084 writer.flush()?;
1085 }
1086
1087 let mut new_manifest = ChunkManifest::new();
1089 for chunk in &chunks {
1090 new_manifest.insert(
1091 chunk.filename.clone(),
1092 ChunkManifestEntry {
1093 rows: chunk.rows,
1094 mtime: chunk.mtime,
1095 },
1096 );
1097 }
1098 save_manifest(&manifest_path, &new_manifest)?;
1099
1100 Ok(merged_path)
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use super::*;
1106 use std::io::Write;
1107 use tempfile::NamedTempFile;
1108
1109 #[test]
1110 fn test_mmap_array2_f32() {
1111 let mut file = NamedTempFile::new().unwrap();
1113
1114 file.write_all(&3i64.to_le_bytes()).unwrap();
1116 file.write_all(&2i64.to_le_bytes()).unwrap();
1117
1118 for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1120 file.write_all(&val.to_le_bytes()).unwrap();
1121 }
1122
1123 file.flush().unwrap();
1124
1125 let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1127 assert_eq!(mmap.shape(), (3, 2));
1128
1129 let row0 = mmap.row(0);
1130 assert_eq!(row0[0], 1.0);
1131 assert_eq!(row0[1], 2.0);
1132
1133 let owned = mmap.to_owned();
1134 assert_eq!(owned[[2, 0]], 5.0);
1135 assert_eq!(owned[[2, 1]], 6.0);
1136 }
1137
1138 #[test]
1139 fn test_mmap_array1_i64() {
1140 let mut file = NamedTempFile::new().unwrap();
1141
1142 file.write_all(&4i64.to_le_bytes()).unwrap();
1144
1145 for val in [10i64, 20, 30, 40] {
1147 file.write_all(&val.to_le_bytes()).unwrap();
1148 }
1149
1150 file.flush().unwrap();
1151
1152 let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1153 assert_eq!(mmap.len(), 4);
1154 assert_eq!(mmap.get(0), 10);
1155 assert_eq!(mmap.get(3), 40);
1156
1157 let owned = mmap.to_owned();
1158 assert_eq!(owned[1], 20);
1159 assert_eq!(owned[2], 30);
1160 }
1161
1162 #[test]
1163 fn test_write_read_roundtrip() {
1164 let file = NamedTempFile::new().unwrap();
1165 let path = file.path();
1166
1167 let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1169
1170 write_array2_f32(&array, path).unwrap();
1172
1173 let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1175 let loaded = mmap.to_owned();
1176
1177 assert_eq!(array, loaded);
1178 }
1179}