1use std::collections::HashMap;
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, BufWriter, Write};
14use std::path::Path;
15
16use byteorder::{LittleEndian, ReadBytesExt};
17use memmap2::Mmap;
18use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
19
20use crate::error::{Error, Result};
21
22pub struct MmapArray2F32 {
26 _mmap: Mmap,
27 shape: (usize, usize),
28 data_offset: usize,
29}
30
31impl MmapArray2F32 {
32 pub fn from_raw_file(path: &Path) -> Result<Self> {
39 let file = File::open(path)
40 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
41
42 let mmap = unsafe {
43 Mmap::map(&file)
44 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
45 };
46
47 if mmap.len() < 16 {
48 return Err(Error::IndexLoad("File too small for header".into()));
49 }
50
51 let mut cursor = std::io::Cursor::new(&mmap[..16]);
53 let nrows = cursor
54 .read_i64::<LittleEndian>()
55 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
56 as usize;
57 let ncols = cursor
58 .read_i64::<LittleEndian>()
59 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
60 as usize;
61
62 let expected_size = 16 + nrows * ncols * 4;
63 if mmap.len() < expected_size {
64 return Err(Error::IndexLoad(format!(
65 "File size {} too small for shape ({}, {})",
66 mmap.len(),
67 nrows,
68 ncols
69 )));
70 }
71
72 Ok(Self {
73 _mmap: mmap,
74 shape: (nrows, ncols),
75 data_offset: 16,
76 })
77 }
78
79 pub fn shape(&self) -> (usize, usize) {
81 self.shape
82 }
83
84 pub fn nrows(&self) -> usize {
86 self.shape.0
87 }
88
89 pub fn ncols(&self) -> usize {
91 self.shape.1
92 }
93
94 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
96 let start = self.data_offset + idx * self.shape.1 * 4;
97 let bytes = &self._mmap[start..start + self.shape.1 * 4];
98
99 let data =
101 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
102
103 ArrayView1::from_shape(self.shape.1, data).unwrap()
104 }
105
106 pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
108 let nrows = end - start;
109 let byte_start = self.data_offset + start * self.shape.1 * 4;
110 let byte_end = self.data_offset + end * self.shape.1 * 4;
111 let bytes = &self._mmap[byte_start..byte_end];
112
113 let data = unsafe {
115 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
116 };
117
118 Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
119 }
120
121 pub fn to_owned(&self) -> Array2<f32> {
123 self.load_rows(0, self.shape.0)
124 }
125}
126
127pub struct MmapArray2U8 {
129 _mmap: Mmap,
130 shape: (usize, usize),
131 data_offset: usize,
132}
133
134impl MmapArray2U8 {
135 pub fn from_raw_file(path: &Path) -> Result<Self> {
137 let file = File::open(path)
138 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
139
140 let mmap = unsafe {
141 Mmap::map(&file)
142 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
143 };
144
145 if mmap.len() < 16 {
146 return Err(Error::IndexLoad("File too small for header".into()));
147 }
148
149 let mut cursor = std::io::Cursor::new(&mmap[..16]);
150 let nrows = cursor
151 .read_i64::<LittleEndian>()
152 .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
153 as usize;
154 let ncols = cursor
155 .read_i64::<LittleEndian>()
156 .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
157 as usize;
158
159 let expected_size = 16 + nrows * ncols;
160 if mmap.len() < expected_size {
161 return Err(Error::IndexLoad(format!(
162 "File size {} too small for shape ({}, {})",
163 mmap.len(),
164 nrows,
165 ncols
166 )));
167 }
168
169 Ok(Self {
170 _mmap: mmap,
171 shape: (nrows, ncols),
172 data_offset: 16,
173 })
174 }
175
176 pub fn shape(&self) -> (usize, usize) {
178 self.shape
179 }
180
181 pub fn view(&self) -> ArrayView2<'_, u8> {
183 let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
184 ArrayView2::from_shape(self.shape, bytes).unwrap()
185 }
186
187 pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
189 let nrows = end - start;
190 let byte_start = self.data_offset + start * self.shape.1;
191 let byte_end = self.data_offset + end * self.shape.1;
192 let bytes = &self._mmap[byte_start..byte_end];
193
194 Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
195 }
196
197 pub fn to_owned(&self) -> Array2<u8> {
199 self.load_rows(0, self.shape.0)
200 }
201}
202
203pub struct MmapArray1I64 {
205 _mmap: Mmap,
206 len: usize,
207 data_offset: usize,
208}
209
210impl MmapArray1I64 {
211 pub fn from_raw_file(path: &Path) -> Result<Self> {
213 let file = File::open(path)
214 .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
215
216 let mmap = unsafe {
217 Mmap::map(&file)
218 .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
219 };
220
221 if mmap.len() < 8 {
222 return Err(Error::IndexLoad("File too small for header".into()));
223 }
224
225 let mut cursor = std::io::Cursor::new(&mmap[..8]);
226 let len = cursor
227 .read_i64::<LittleEndian>()
228 .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
229 as usize;
230
231 let expected_size = 8 + len * 8;
232 if mmap.len() < expected_size {
233 return Err(Error::IndexLoad(format!(
234 "File size {} too small for length {}",
235 mmap.len(),
236 len
237 )));
238 }
239
240 Ok(Self {
241 _mmap: mmap,
242 len,
243 data_offset: 8,
244 })
245 }
246
247 pub fn len(&self) -> usize {
249 self.len
250 }
251
252 pub fn is_empty(&self) -> bool {
254 self.len == 0
255 }
256
257 pub fn get(&self, idx: usize) -> i64 {
259 let start = self.data_offset + idx * 8;
260 let bytes = &self._mmap[start..start + 8];
261 i64::from_le_bytes(bytes.try_into().unwrap())
262 }
263
264 pub fn to_owned(&self) -> Array1<i64> {
266 let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
267
268 let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
270
271 Array1::from_vec(data.to_vec())
272 }
273}
274
275pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
277 use std::io::Write;
278
279 let file = File::create(path)
280 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
281 let mut writer = std::io::BufWriter::new(file);
282
283 let nrows = array.nrows() as i64;
284 let ncols = array.ncols() as i64;
285
286 writer
287 .write_all(&nrows.to_le_bytes())
288 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
289 writer
290 .write_all(&ncols.to_le_bytes())
291 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
292
293 for val in array.iter() {
294 writer
295 .write_all(&val.to_le_bytes())
296 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
297 }
298
299 writer
300 .flush()
301 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
302
303 Ok(())
304}
305
306pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
308 use std::io::Write;
309
310 let file = File::create(path)
311 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
312 let mut writer = std::io::BufWriter::new(file);
313
314 let nrows = array.nrows() as i64;
315 let ncols = array.ncols() as i64;
316
317 writer
318 .write_all(&nrows.to_le_bytes())
319 .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
320 writer
321 .write_all(&ncols.to_le_bytes())
322 .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
323
324 for row in array.rows() {
325 writer
326 .write_all(row.as_slice().unwrap())
327 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
328 }
329
330 writer
331 .flush()
332 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
333
334 Ok(())
335}
336
337pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
339 use std::io::Write;
340
341 let file = File::create(path)
342 .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
343 let mut writer = std::io::BufWriter::new(file);
344
345 let len = array.len() as i64;
346
347 writer
348 .write_all(&len.to_le_bytes())
349 .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
350
351 for val in array.iter() {
352 writer
353 .write_all(&val.to_le_bytes())
354 .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
355 }
356
357 writer
358 .flush()
359 .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
360
361 Ok(())
362}
363
364const NPY_MAGIC: &[u8] = b"\x93NUMPY";
370
371fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
373 if mmap.len() < 10 {
374 return Err(Error::IndexLoad("NPY file too small".into()));
375 }
376
377 if &mmap[..6] != NPY_MAGIC {
379 return Err(Error::IndexLoad("Invalid NPY magic".into()));
380 }
381
382 let major_version = mmap[6];
383 let _minor_version = mmap[7];
384
385 let header_len = if major_version == 1 {
387 u16::from_le_bytes([mmap[8], mmap[9]]) as usize
388 } else if major_version == 2 {
389 if mmap.len() < 12 {
390 return Err(Error::IndexLoad("NPY v2 file too small".into()));
391 }
392 u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
393 } else {
394 return Err(Error::IndexLoad(format!(
395 "Unsupported NPY version: {}",
396 major_version
397 )));
398 };
399
400 let header_start = if major_version == 1 { 10 } else { 12 };
401 let header_end = header_start + header_len;
402
403 if mmap.len() < header_end {
404 return Err(Error::IndexLoad("NPY header exceeds file size".into()));
405 }
406
407 let header_str = std::str::from_utf8(&mmap[header_start..header_end])
409 .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
410
411 let shape = parse_shape_from_header(header_str)?;
413 let fortran_order = header_str.contains("'fortran_order': True");
414
415 Ok((shape, header_end, fortran_order))
416}
417
418fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
420 let shape_start = header
422 .find("'shape':")
423 .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
424
425 let after_shape = &header[shape_start + 8..];
426 let paren_start = after_shape
427 .find('(')
428 .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
429 let paren_end = after_shape
430 .find(')')
431 .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
432
433 let shape_content = &after_shape[paren_start + 1..paren_end];
434
435 let mut shape = Vec::new();
437 for part in shape_content.split(',') {
438 let trimmed = part.trim();
439 if !trimmed.is_empty() {
440 let dim: usize = trimmed.parse().map_err(|e| {
441 Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
442 })?;
443 shape.push(dim);
444 }
445 }
446
447 Ok(shape)
448}
449
450pub struct MmapNpyArray1I64 {
454 _mmap: Mmap,
455 len: usize,
456 data_offset: usize,
457}
458
459impl MmapNpyArray1I64 {
460 pub fn from_npy_file(path: &Path) -> Result<Self> {
462 let file = File::open(path)
463 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
464
465 let mmap = unsafe {
466 Mmap::map(&file).map_err(|e| {
467 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
468 })?
469 };
470
471 let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
472
473 if shape.is_empty() {
474 return Err(Error::IndexLoad("Empty shape in NPY file".into()));
475 }
476
477 let len = shape[0];
478
479 let expected_size = data_offset + len * 8;
481 if mmap.len() < expected_size {
482 return Err(Error::IndexLoad(format!(
483 "NPY file size {} too small for {} elements",
484 mmap.len(),
485 len
486 )));
487 }
488
489 Ok(Self {
490 _mmap: mmap,
491 len,
492 data_offset,
493 })
494 }
495
496 pub fn len(&self) -> usize {
498 self.len
499 }
500
501 pub fn is_empty(&self) -> bool {
503 self.len == 0
504 }
505
506 pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
513 let count = end - start;
514 let mut result = Vec::with_capacity(count);
515
516 for i in start..end {
517 result.push(self.get(i));
518 }
519
520 result
521 }
522
523 pub fn get(&self, idx: usize) -> i64 {
525 let start = self.data_offset + idx * 8;
526 let bytes = &self._mmap[start..start + 8];
527 i64::from_le_bytes(bytes.try_into().unwrap())
528 }
529}
530
531pub struct MmapNpyArray2F32 {
537 _mmap: Mmap,
538 shape: (usize, usize),
539 data_offset: usize,
540}
541
542impl MmapNpyArray2F32 {
543 pub fn from_npy_file(path: &Path) -> Result<Self> {
545 let file = File::open(path)
546 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
547
548 let mmap = unsafe {
549 Mmap::map(&file).map_err(|e| {
550 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
551 })?
552 };
553
554 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
555
556 if shape_vec.len() != 2 {
557 return Err(Error::IndexLoad(format!(
558 "Expected 2D array, got {}D",
559 shape_vec.len()
560 )));
561 }
562
563 let shape = (shape_vec[0], shape_vec[1]);
564
565 let expected_size = data_offset + shape.0 * shape.1 * 4;
567 if mmap.len() < expected_size {
568 return Err(Error::IndexLoad(format!(
569 "NPY file size {} too small for shape {:?}",
570 mmap.len(),
571 shape
572 )));
573 }
574
575 Ok(Self {
576 _mmap: mmap,
577 shape,
578 data_offset,
579 })
580 }
581
582 pub fn shape(&self) -> (usize, usize) {
584 self.shape
585 }
586
587 pub fn nrows(&self) -> usize {
589 self.shape.0
590 }
591
592 pub fn ncols(&self) -> usize {
594 self.shape.1
595 }
596
597 pub fn view(&self) -> ArrayView2<'_, f32> {
601 let byte_start = self.data_offset;
602 let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
603 let bytes = &self._mmap[byte_start..byte_end];
604
605 let data = unsafe {
607 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
608 };
609
610 ArrayView2::from_shape(self.shape, data).unwrap()
611 }
612
613 pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
615 let byte_start = self.data_offset + idx * self.shape.1 * 4;
616 let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
617
618 let data =
620 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
621
622 ArrayView1::from_shape(self.shape.1, data).unwrap()
623 }
624
625 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
627 let nrows = end - start;
628 let byte_start = self.data_offset + start * self.shape.1 * 4;
629 let byte_end = self.data_offset + end * self.shape.1 * 4;
630 let bytes = &self._mmap[byte_start..byte_end];
631
632 let data = unsafe {
634 std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
635 };
636
637 ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
638 }
639
640 pub fn to_owned(&self) -> Array2<f32> {
644 self.view().to_owned()
645 }
646}
647
648pub struct MmapNpyArray2U8 {
652 _mmap: Mmap,
653 shape: (usize, usize),
654 data_offset: usize,
655}
656
657impl MmapNpyArray2U8 {
658 pub fn from_npy_file(path: &Path) -> Result<Self> {
660 let file = File::open(path)
661 .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
662
663 let mmap = unsafe {
664 Mmap::map(&file).map_err(|e| {
665 Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
666 })?
667 };
668
669 let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
670
671 if shape_vec.len() != 2 {
672 return Err(Error::IndexLoad(format!(
673 "Expected 2D array, got {}D",
674 shape_vec.len()
675 )));
676 }
677
678 let shape = (shape_vec[0], shape_vec[1]);
679
680 let expected_size = data_offset + shape.0 * shape.1;
682 if mmap.len() < expected_size {
683 return Err(Error::IndexLoad(format!(
684 "NPY file size {} too small for shape {:?}",
685 mmap.len(),
686 shape
687 )));
688 }
689
690 Ok(Self {
691 _mmap: mmap,
692 shape,
693 data_offset,
694 })
695 }
696
697 pub fn shape(&self) -> (usize, usize) {
699 self.shape
700 }
701
702 pub fn nrows(&self) -> usize {
704 self.shape.0
705 }
706
707 pub fn ncols(&self) -> usize {
709 self.shape.1
710 }
711
712 pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
714 let nrows = end - start;
715 let byte_start = self.data_offset + start * self.shape.1;
716 let byte_end = self.data_offset + end * self.shape.1;
717 let bytes = &self._mmap[byte_start..byte_end];
718
719 ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
720 }
721
722 pub fn view(&self) -> ArrayView2<'_, u8> {
724 self.slice_rows(0, self.shape.0)
725 }
726
727 pub fn row(&self, idx: usize) -> &[u8] {
729 let byte_start = self.data_offset + idx * self.shape.1;
730 let byte_end = byte_start + self.shape.1;
731 &self._mmap[byte_start..byte_end]
732 }
733}
734
735#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
741pub struct ChunkManifestEntry {
742 pub rows: usize,
743 pub mtime: f64,
744}
745
746pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
748
749fn load_manifest(manifest_path: &Path) -> Option<ChunkManifest> {
751 if manifest_path.exists() {
752 if let Ok(file) = File::open(manifest_path) {
753 if let Ok(manifest) = serde_json::from_reader(BufReader::new(file)) {
754 return Some(manifest);
755 }
756 }
757 }
758 None
759}
760
761fn save_manifest(manifest_path: &Path, manifest: &ChunkManifest) -> Result<()> {
763 let file = File::create(manifest_path)
764 .map_err(|e| Error::IndexLoad(format!("Failed to create manifest: {}", e)))?;
765 serde_json::to_writer(BufWriter::new(file), manifest)
766 .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
767 Ok(())
768}
769
770fn get_mtime(path: &Path) -> Result<f64> {
772 let metadata = fs::metadata(path)
773 .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
774 let mtime = metadata
775 .modified()
776 .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
777 let duration = mtime
778 .duration_since(std::time::UNIX_EPOCH)
779 .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
780 Ok(duration.as_secs_f64())
781}
782
783fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
785 let header_dict = format!(
787 "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
788 dtype, len
789 );
790
791 let header_len = header_dict.len();
793 let padding = (64 - ((10 + header_len) % 64)) % 64;
794 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
795
796 writer
798 .write_all(NPY_MAGIC)
799 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
800 writer
801 .write_all(&[1, 0])
802 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?; let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
806 writer
807 .write_all(&header_len_bytes)
808 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
809
810 writer
812 .write_all(padded_header.as_bytes())
813 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
814
815 Ok(10 + padded_header.len())
816}
817
818fn write_npy_header_2d(
820 writer: &mut impl Write,
821 nrows: usize,
822 ncols: usize,
823 dtype: &str,
824) -> Result<usize> {
825 let header_dict = format!(
827 "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
828 dtype, nrows, ncols
829 );
830
831 let header_len = header_dict.len();
833 let padding = (64 - ((10 + header_len) % 64)) % 64;
834 let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
835
836 writer
838 .write_all(NPY_MAGIC)
839 .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
840 writer
841 .write_all(&[1, 0])
842 .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
843
844 let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
846 writer
847 .write_all(&header_len_bytes)
848 .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
849
850 writer
852 .write_all(padded_header.as_bytes())
853 .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
854
855 Ok(10 + padded_header.len())
856}
857
858struct ChunkInfo {
860 path: std::path::PathBuf,
861 filename: String,
862 rows: usize,
863 mtime: f64,
864}
865
866pub fn merge_codes_chunks(
871 index_path: &Path,
872 num_chunks: usize,
873 padding_rows: usize,
874) -> Result<std::path::PathBuf> {
875 use ndarray_npy::ReadNpyExt;
876
877 let merged_path = index_path.join("merged_codes.npy");
878 let manifest_path = index_path.join("merged_codes.manifest.json");
879
880 let old_manifest = load_manifest(&manifest_path);
882
883 let mut chunks: Vec<ChunkInfo> = Vec::new();
885 let mut total_rows = 0usize;
886 let mut chain_broken = false;
887
888 for i in 0..num_chunks {
889 let filename = format!("{}.codes.npy", i);
890 let path = index_path.join(&filename);
891
892 if path.exists() {
893 let mtime = get_mtime(&path)?;
894
895 let file = File::open(&path)?;
897 let arr: Array1<i64> = Array1::read_npy(file)?;
898 let rows = arr.len();
899
900 if rows > 0 {
901 total_rows += rows;
902
903 let is_clean = if let Some(ref manifest) = old_manifest {
905 manifest
906 .get(&filename)
907 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
908 } else {
909 false
910 };
911
912 if !is_clean {
913 chain_broken = true;
914 }
915
916 chunks.push(ChunkInfo {
917 path,
918 filename,
919 rows,
920 mtime,
921 });
922 }
923 }
924 }
925
926 if total_rows == 0 {
927 return Err(Error::IndexLoad("No data to merge".into()));
928 }
929
930 let final_rows = total_rows + padding_rows;
931
932 let needs_full_rewrite = !merged_path.exists() || chain_broken;
934
935 if needs_full_rewrite {
936 let file = File::create(&merged_path)?;
938 let mut writer = BufWriter::new(file);
939
940 write_npy_header_1d(&mut writer, final_rows, "<i8")?;
942
943 for chunk in &chunks {
945 let file = File::open(&chunk.path)?;
946 let arr: Array1<i64> = Array1::read_npy(file)?;
947 for &val in arr.iter() {
948 writer.write_all(&val.to_le_bytes())?;
949 }
950 }
951
952 for _ in 0..padding_rows {
954 writer.write_all(&0i64.to_le_bytes())?;
955 }
956
957 writer.flush()?;
958 }
959
960 let mut new_manifest = ChunkManifest::new();
962 for chunk in &chunks {
963 new_manifest.insert(
964 chunk.filename.clone(),
965 ChunkManifestEntry {
966 rows: chunk.rows,
967 mtime: chunk.mtime,
968 },
969 );
970 }
971 save_manifest(&manifest_path, &new_manifest)?;
972
973 Ok(merged_path)
974}
975
976pub fn merge_residuals_chunks(
978 index_path: &Path,
979 num_chunks: usize,
980 padding_rows: usize,
981) -> Result<std::path::PathBuf> {
982 use ndarray_npy::ReadNpyExt;
983
984 let merged_path = index_path.join("merged_residuals.npy");
985 let manifest_path = index_path.join("merged_residuals.manifest.json");
986
987 let old_manifest = load_manifest(&manifest_path);
989
990 let mut chunks: Vec<ChunkInfo> = Vec::new();
992 let mut total_rows = 0usize;
993 let mut ncols = 0usize;
994 let mut chain_broken = false;
995
996 for i in 0..num_chunks {
997 let filename = format!("{}.residuals.npy", i);
998 let path = index_path.join(&filename);
999
1000 if path.exists() {
1001 let mtime = get_mtime(&path)?;
1002
1003 let file = File::open(&path)?;
1005 let arr: Array2<u8> = Array2::read_npy(file)?;
1006 let rows = arr.nrows();
1007 ncols = arr.ncols();
1008
1009 if rows > 0 {
1010 total_rows += rows;
1011
1012 let is_clean = if let Some(ref manifest) = old_manifest {
1013 manifest
1014 .get(&filename)
1015 .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1016 } else {
1017 false
1018 };
1019
1020 if !is_clean {
1021 chain_broken = true;
1022 }
1023
1024 chunks.push(ChunkInfo {
1025 path,
1026 filename,
1027 rows,
1028 mtime,
1029 });
1030 }
1031 }
1032 }
1033
1034 if total_rows == 0 || ncols == 0 {
1035 return Err(Error::IndexLoad("No residual data to merge".into()));
1036 }
1037
1038 let final_rows = total_rows + padding_rows;
1039
1040 let needs_full_rewrite = !merged_path.exists() || chain_broken;
1041
1042 if needs_full_rewrite {
1043 let file = File::create(&merged_path)?;
1044 let mut writer = BufWriter::new(file);
1045
1046 write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1048
1049 for chunk in &chunks {
1051 let file = File::open(&chunk.path)?;
1052 let arr: Array2<u8> = Array2::read_npy(file)?;
1053 for row in arr.rows() {
1054 writer.write_all(row.as_slice().unwrap())?;
1055 }
1056 }
1057
1058 let zero_row = vec![0u8; ncols];
1060 for _ in 0..padding_rows {
1061 writer.write_all(&zero_row)?;
1062 }
1063
1064 writer.flush()?;
1065 }
1066
1067 let mut new_manifest = ChunkManifest::new();
1069 for chunk in &chunks {
1070 new_manifest.insert(
1071 chunk.filename.clone(),
1072 ChunkManifestEntry {
1073 rows: chunk.rows,
1074 mtime: chunk.mtime,
1075 },
1076 );
1077 }
1078 save_manifest(&manifest_path, &new_manifest)?;
1079
1080 Ok(merged_path)
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085 use super::*;
1086 use std::io::Write;
1087 use tempfile::NamedTempFile;
1088
1089 #[test]
1090 fn test_mmap_array2_f32() {
1091 let mut file = NamedTempFile::new().unwrap();
1093
1094 file.write_all(&3i64.to_le_bytes()).unwrap();
1096 file.write_all(&2i64.to_le_bytes()).unwrap();
1097
1098 for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1100 file.write_all(&val.to_le_bytes()).unwrap();
1101 }
1102
1103 file.flush().unwrap();
1104
1105 let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1107 assert_eq!(mmap.shape(), (3, 2));
1108
1109 let row0 = mmap.row(0);
1110 assert_eq!(row0[0], 1.0);
1111 assert_eq!(row0[1], 2.0);
1112
1113 let owned = mmap.to_owned();
1114 assert_eq!(owned[[2, 0]], 5.0);
1115 assert_eq!(owned[[2, 1]], 6.0);
1116 }
1117
1118 #[test]
1119 fn test_mmap_array1_i64() {
1120 let mut file = NamedTempFile::new().unwrap();
1121
1122 file.write_all(&4i64.to_le_bytes()).unwrap();
1124
1125 for val in [10i64, 20, 30, 40] {
1127 file.write_all(&val.to_le_bytes()).unwrap();
1128 }
1129
1130 file.flush().unwrap();
1131
1132 let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1133 assert_eq!(mmap.len(), 4);
1134 assert_eq!(mmap.get(0), 10);
1135 assert_eq!(mmap.get(3), 40);
1136
1137 let owned = mmap.to_owned();
1138 assert_eq!(owned[1], 20);
1139 assert_eq!(owned[2], 30);
1140 }
1141
1142 #[test]
1143 fn test_write_read_roundtrip() {
1144 let file = NamedTempFile::new().unwrap();
1145 let path = file.path();
1146
1147 let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1149
1150 write_array2_f32(&array, path).unwrap();
1152
1153 let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1155 let loaded = mmap.to_owned();
1156
1157 assert_eq!(array, loaded);
1158 }
1159}