Skip to main content

next_plaid/
mmap.rs

1//! Memory-mapped file support for efficient large index loading.
2//!
3//! This module provides utilities for loading large arrays from disk using
4//! memory-mapped files, avoiding the need to load entire arrays into RAM.
5//!
6//! Two formats are supported:
7//! - Custom raw binary format (legacy): 8-byte header with shape, then raw data
8//! - NPY format: Standard NumPy format with header, used for index files
9
10use std::collections::HashMap;
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, BufWriter, Write};
14use std::path::Path;
15
16use byteorder::{LittleEndian, ReadBytesExt};
17use fs2::FileExt;
18use memmap2::{Mmap, MmapMut};
19use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
20
21use crate::error::{Error, Result};
22
23/// RAII guard for file-based locking to coordinate concurrent processes.
24/// The lock is released when this guard is dropped.
25struct FileLockGuard {
26    _file: File,
27}
28
29impl FileLockGuard {
30    /// Acquire an exclusive lock on the given lock file path.
31    /// Creates the lock file if it doesn't exist.
32    /// Blocks until the lock is acquired.
33    fn acquire(lock_path: &Path) -> Result<Self> {
34        let file = std::fs::OpenOptions::new()
35            .read(true)
36            .write(true)
37            .create(true)
38            .truncate(false)
39            .open(lock_path)
40            .map_err(|e| {
41                Error::IndexLoad(format!("Failed to open lock file {:?}: {}", lock_path, e))
42            })?;
43
44        file.lock_exclusive().map_err(|e| {
45            Error::IndexLoad(format!("Failed to acquire lock on {:?}: {}", lock_path, e))
46        })?;
47
48        Ok(Self { _file: file })
49    }
50}
51
52impl Drop for FileLockGuard {
53    fn drop(&mut self) {
54        // Lock is automatically released when file is closed
55        let _ = self._file.unlock();
56    }
57}
58
59/// A memory-mapped array of f32 values.
60///
61/// This struct provides zero-copy access to large arrays stored on disk.
62pub struct MmapArray2F32 {
63    _mmap: Mmap,
64    shape: (usize, usize),
65    data_offset: usize,
66}
67
68impl MmapArray2F32 {
69    /// Load a 2D f32 array from a raw binary file.
70    ///
71    /// The file format is:
72    /// - 8 bytes: nrows (i64 little-endian)
73    /// - 8 bytes: ncols (i64 little-endian)
74    /// - nrows * ncols * 4 bytes: f32 data (little-endian)
75    pub fn from_raw_file(path: &Path) -> Result<Self> {
76        let file = File::open(path)
77            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
78
79        let mmap = unsafe {
80            Mmap::map(&file)
81                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
82        };
83
84        if mmap.len() < 16 {
85            return Err(Error::IndexLoad("File too small for header".into()));
86        }
87
88        // Read shape from header
89        let mut cursor = std::io::Cursor::new(&mmap[..16]);
90        let nrows = cursor
91            .read_i64::<LittleEndian>()
92            .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
93            as usize;
94        let ncols = cursor
95            .read_i64::<LittleEndian>()
96            .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
97            as usize;
98
99        let expected_size = 16 + nrows * ncols * 4;
100        if mmap.len() < expected_size {
101            return Err(Error::IndexLoad(format!(
102                "File size {} too small for shape ({}, {})",
103                mmap.len(),
104                nrows,
105                ncols
106            )));
107        }
108
109        Ok(Self {
110            _mmap: mmap,
111            shape: (nrows, ncols),
112            data_offset: 16,
113        })
114    }
115
116    /// Get the shape of the array.
117    pub fn shape(&self) -> (usize, usize) {
118        self.shape
119    }
120
121    /// Get the number of rows.
122    pub fn nrows(&self) -> usize {
123        self.shape.0
124    }
125
126    /// Get the number of columns.
127    pub fn ncols(&self) -> usize {
128        self.shape.1
129    }
130
131    /// Get a view of a row.
132    pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
133        let start = self.data_offset + idx * self.shape.1 * 4;
134        let bytes = &self._mmap[start..start + self.shape.1 * 4];
135
136        // Safety: We've verified the bounds and alignment
137        let data =
138            unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
139
140        ArrayView1::from_shape(self.shape.1, data).unwrap()
141    }
142
143    /// Load a range of rows into an owned Array2.
144    pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
145        let nrows = end - start;
146        let byte_start = self.data_offset + start * self.shape.1 * 4;
147        let byte_end = self.data_offset + end * self.shape.1 * 4;
148        let bytes = &self._mmap[byte_start..byte_end];
149
150        // Safety: We've verified the bounds
151        let data = unsafe {
152            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
153        };
154
155        Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
156    }
157
158    /// Convert to an owned Array2 (loads all data into memory).
159    pub fn to_owned(&self) -> Array2<f32> {
160        self.load_rows(0, self.shape.0)
161    }
162}
163
164/// A memory-mapped array of u8 values.
165pub struct MmapArray2U8 {
166    _mmap: Mmap,
167    shape: (usize, usize),
168    data_offset: usize,
169}
170
171impl MmapArray2U8 {
172    /// Load a 2D u8 array from a raw binary file.
173    pub fn from_raw_file(path: &Path) -> Result<Self> {
174        let file = File::open(path)
175            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
176
177        let mmap = unsafe {
178            Mmap::map(&file)
179                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
180        };
181
182        if mmap.len() < 16 {
183            return Err(Error::IndexLoad("File too small for header".into()));
184        }
185
186        let mut cursor = std::io::Cursor::new(&mmap[..16]);
187        let nrows = cursor
188            .read_i64::<LittleEndian>()
189            .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
190            as usize;
191        let ncols = cursor
192            .read_i64::<LittleEndian>()
193            .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
194            as usize;
195
196        let expected_size = 16 + nrows * ncols;
197        if mmap.len() < expected_size {
198            return Err(Error::IndexLoad(format!(
199                "File size {} too small for shape ({}, {})",
200                mmap.len(),
201                nrows,
202                ncols
203            )));
204        }
205
206        Ok(Self {
207            _mmap: mmap,
208            shape: (nrows, ncols),
209            data_offset: 16,
210        })
211    }
212
213    /// Get the shape of the array.
214    pub fn shape(&self) -> (usize, usize) {
215        self.shape
216    }
217
218    /// Get a view of the data as ArrayView2.
219    pub fn view(&self) -> ArrayView2<'_, u8> {
220        let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
221        ArrayView2::from_shape(self.shape, bytes).unwrap()
222    }
223
224    /// Load a range of rows into an owned Array2.
225    pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
226        let nrows = end - start;
227        let byte_start = self.data_offset + start * self.shape.1;
228        let byte_end = self.data_offset + end * self.shape.1;
229        let bytes = &self._mmap[byte_start..byte_end];
230
231        Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
232    }
233
234    /// Convert to an owned Array2.
235    pub fn to_owned(&self) -> Array2<u8> {
236        self.load_rows(0, self.shape.0)
237    }
238}
239
240/// A memory-mapped array of i64 values.
241pub struct MmapArray1I64 {
242    _mmap: Mmap,
243    len: usize,
244    data_offset: usize,
245}
246
247impl MmapArray1I64 {
248    /// Load a 1D i64 array from a raw binary file.
249    pub fn from_raw_file(path: &Path) -> Result<Self> {
250        let file = File::open(path)
251            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
252
253        let mmap = unsafe {
254            Mmap::map(&file)
255                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
256        };
257
258        if mmap.len() < 8 {
259            return Err(Error::IndexLoad("File too small for header".into()));
260        }
261
262        let mut cursor = std::io::Cursor::new(&mmap[..8]);
263        let len = cursor
264            .read_i64::<LittleEndian>()
265            .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
266            as usize;
267
268        let expected_size = 8 + len * 8;
269        if mmap.len() < expected_size {
270            return Err(Error::IndexLoad(format!(
271                "File size {} too small for length {}",
272                mmap.len(),
273                len
274            )));
275        }
276
277        Ok(Self {
278            _mmap: mmap,
279            len,
280            data_offset: 8,
281        })
282    }
283
284    /// Get the length of the array.
285    pub fn len(&self) -> usize {
286        self.len
287    }
288
289    /// Returns true if the array is empty.
290    pub fn is_empty(&self) -> bool {
291        self.len == 0
292    }
293
294    /// Get a value at an index.
295    pub fn get(&self, idx: usize) -> i64 {
296        let start = self.data_offset + idx * 8;
297        let bytes = &self._mmap[start..start + 8];
298        i64::from_le_bytes(bytes.try_into().unwrap())
299    }
300
301    /// Convert to an owned Array1.
302    pub fn to_owned(&self) -> Array1<i64> {
303        let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
304
305        // Safety: We've verified the bounds
306        let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
307
308        Array1::from_vec(data.to_vec())
309    }
310}
311
312/// Write an `Array2<f32>` to a raw binary file format.
313pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
314    use std::io::Write;
315
316    let file = File::create(path)
317        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
318    let mut writer = std::io::BufWriter::new(file);
319
320    let nrows = array.nrows() as i64;
321    let ncols = array.ncols() as i64;
322
323    writer
324        .write_all(&nrows.to_le_bytes())
325        .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
326    writer
327        .write_all(&ncols.to_le_bytes())
328        .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
329
330    for val in array.iter() {
331        writer
332            .write_all(&val.to_le_bytes())
333            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
334    }
335
336    writer
337        .flush()
338        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
339
340    Ok(())
341}
342
343/// Write an `Array2<u8>` to a raw binary file format.
344pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
345    use std::io::Write;
346
347    let file = File::create(path)
348        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
349    let mut writer = std::io::BufWriter::new(file);
350
351    let nrows = array.nrows() as i64;
352    let ncols = array.ncols() as i64;
353
354    writer
355        .write_all(&nrows.to_le_bytes())
356        .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
357    writer
358        .write_all(&ncols.to_le_bytes())
359        .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
360
361    for row in array.rows() {
362        writer
363            .write_all(row.as_slice().unwrap())
364            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
365    }
366
367    writer
368        .flush()
369        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
370
371    Ok(())
372}
373
374/// Write an `Array1<i64>` to a raw binary file format.
375pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
376    use std::io::Write;
377
378    let file = File::create(path)
379        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
380    let mut writer = std::io::BufWriter::new(file);
381
382    let len = array.len() as i64;
383
384    writer
385        .write_all(&len.to_le_bytes())
386        .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
387
388    for val in array.iter() {
389        writer
390            .write_all(&val.to_le_bytes())
391            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
392    }
393
394    writer
395        .flush()
396        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
397
398    Ok(())
399}
400
401// ============================================================================
402// NPY Format Memory-Mapped Arrays
403// ============================================================================
404
405/// NPY file magic bytes
406const NPY_MAGIC: &[u8] = b"\x93NUMPY";
407
408/// Parse dtype from NPY header string (e.g., "<f2" for float16, "<f4" for float32)
409fn parse_dtype_from_header(header: &str) -> Result<String> {
410    // Find 'descr': '...'
411    let descr_start = header
412        .find("'descr':")
413        .ok_or_else(|| Error::IndexLoad("No descr in NPY header".into()))?;
414
415    let after_descr = &header[descr_start + 8..];
416    let quote_start = after_descr
417        .find('\'')
418        .ok_or_else(|| Error::IndexLoad("No dtype quote in NPY header".into()))?;
419    let rest = &after_descr[quote_start + 1..];
420    let quote_end = rest
421        .find('\'')
422        .ok_or_else(|| Error::IndexLoad("Unclosed dtype quote in NPY header".into()))?;
423
424    Ok(rest[..quote_end].to_string())
425}
426
427/// Detect NPY file dtype without loading the entire file
428pub fn detect_npy_dtype(path: &Path) -> Result<String> {
429    let file = File::open(path)
430        .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
431
432    let mmap = unsafe {
433        Mmap::map(&file)
434            .map_err(|e| Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e)))?
435    };
436
437    if mmap.len() < 10 {
438        return Err(Error::IndexLoad("NPY file too small".into()));
439    }
440
441    // Check magic
442    if &mmap[..6] != NPY_MAGIC {
443        return Err(Error::IndexLoad("Invalid NPY magic".into()));
444    }
445
446    let major_version = mmap[6];
447
448    // Read header length
449    let header_len = if major_version == 1 {
450        u16::from_le_bytes([mmap[8], mmap[9]]) as usize
451    } else if major_version == 2 {
452        if mmap.len() < 12 {
453            return Err(Error::IndexLoad("NPY v2 file too small".into()));
454        }
455        u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
456    } else {
457        return Err(Error::IndexLoad(format!(
458            "Unsupported NPY version: {}",
459            major_version
460        )));
461    };
462
463    let header_start = if major_version == 1 { 10 } else { 12 };
464    let header_end = header_start + header_len;
465
466    if mmap.len() < header_end {
467        return Err(Error::IndexLoad("NPY header exceeds file size".into()));
468    }
469
470    let header_str = std::str::from_utf8(&mmap[header_start..header_end])
471        .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
472
473    parse_dtype_from_header(header_str)
474}
475
476/// Convert a float16 NPY file to float32 in place
477pub fn convert_f16_to_f32_npy(path: &Path) -> Result<()> {
478    use half::f16;
479    use std::io::Read;
480
481    // Read the entire file
482    let mut file = File::open(path)
483        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
484    let mut data = Vec::new();
485    file.read_to_end(&mut data)
486        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
487
488    if data.len() < 10 || &data[..6] != NPY_MAGIC {
489        return Err(Error::IndexLoad("Invalid NPY file".into()));
490    }
491
492    let major_version = data[6];
493    let header_start = if major_version == 1 { 10 } else { 12 };
494    let header_len = if major_version == 1 {
495        u16::from_le_bytes([data[8], data[9]]) as usize
496    } else {
497        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
498    };
499    let header_end = header_start + header_len;
500
501    // Parse header to get shape
502    let header_str = std::str::from_utf8(&data[header_start..header_end])
503        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
504    let shape = parse_shape_from_header(header_str)?;
505
506    // Calculate total elements
507    let total_elements: usize = shape.iter().product();
508    let f16_data = &data[header_end..header_end + total_elements * 2];
509
510    // Convert f16 to f32
511    let mut f32_data = Vec::with_capacity(total_elements * 4);
512    for chunk in f16_data.chunks(2) {
513        let f16_val = f16::from_le_bytes([chunk[0], chunk[1]]);
514        let f32_val: f32 = f16_val.to_f32();
515        f32_data.extend_from_slice(&f32_val.to_le_bytes());
516    }
517
518    // Write new file with f32 dtype
519    let file = File::create(path)
520        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
521    let mut writer = BufWriter::new(file);
522
523    if shape.len() == 1 {
524        write_npy_header_1d(&mut writer, shape[0], "<f4")?;
525    } else if shape.len() == 2 {
526        write_npy_header_2d(&mut writer, shape[0], shape[1], "<f4")?;
527    } else {
528        return Err(Error::IndexLoad("Unsupported shape dimensions".into()));
529    }
530
531    writer
532        .write_all(&f32_data)
533        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
534    writer.flush()?;
535
536    Ok(())
537}
538
539/// Convert an int64 NPY file to int32 in place
540pub fn convert_i64_to_i32_npy(path: &Path) -> Result<()> {
541    use std::io::Read;
542
543    // Read the entire file
544    let mut file = File::open(path)
545        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
546    let mut data = Vec::new();
547    file.read_to_end(&mut data)
548        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
549
550    if data.len() < 10 || &data[..6] != NPY_MAGIC {
551        return Err(Error::IndexLoad("Invalid NPY file".into()));
552    }
553
554    let major_version = data[6];
555    let header_start = if major_version == 1 { 10 } else { 12 };
556    let header_len = if major_version == 1 {
557        u16::from_le_bytes([data[8], data[9]]) as usize
558    } else {
559        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
560    };
561    let header_end = header_start + header_len;
562
563    // Parse header to get shape
564    let header_str = std::str::from_utf8(&data[header_start..header_end])
565        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
566    let shape = parse_shape_from_header(header_str)?;
567
568    if shape.len() != 1 {
569        return Err(Error::IndexLoad("Expected 1D array for i64->i32".into()));
570    }
571
572    let len = shape[0];
573    let i64_data = &data[header_end..header_end + len * 8];
574
575    // Convert i64 to i32
576    let mut i32_data = Vec::with_capacity(len * 4);
577    for chunk in i64_data.chunks(8) {
578        let i64_val = i64::from_le_bytes(chunk.try_into().unwrap());
579        let i32_val = i64_val as i32;
580        i32_data.extend_from_slice(&i32_val.to_le_bytes());
581    }
582
583    // Write new file with i32 dtype
584    let file = File::create(path)
585        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
586    let mut writer = BufWriter::new(file);
587
588    write_npy_header_1d(&mut writer, len, "<i4")?;
589
590    writer
591        .write_all(&i32_data)
592        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
593    writer.flush()?;
594
595    Ok(())
596}
597
598/// Re-save a u8 NPY file to ensure dtype descriptor is "|u1" (platform-independent)
599///
600/// Note: We can't use ndarray_npy::ReadNpyExt here because it doesn't accept "<u1"
601/// descriptor, so we manually read the raw data and resave with "|u1".
602pub fn normalize_u8_npy(path: &Path) -> Result<()> {
603    use std::io::Read;
604
605    // Read the entire file
606    let mut file = File::open(path)
607        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
608    let mut data = Vec::new();
609    file.read_to_end(&mut data)
610        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
611
612    if data.len() < 10 || &data[..6] != NPY_MAGIC {
613        return Err(Error::IndexLoad("Invalid NPY file".into()));
614    }
615
616    let major_version = data[6];
617    let header_start = if major_version == 1 { 10 } else { 12 };
618    let header_len = if major_version == 1 {
619        u16::from_le_bytes([data[8], data[9]]) as usize
620    } else {
621        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
622    };
623    let header_end = header_start + header_len;
624
625    // Parse header to get shape
626    let header_str = std::str::from_utf8(&data[header_start..header_end])
627        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
628    let shape = parse_shape_from_header(header_str)?;
629
630    if shape.len() != 2 {
631        return Err(Error::IndexLoad(
632            "Expected 2D array for u8 normalization".into(),
633        ));
634    }
635
636    let nrows = shape[0];
637    let ncols = shape[1];
638    let u8_data = &data[header_end..header_end + nrows * ncols];
639
640    // Re-write with explicit "|u1" dtype
641    let new_file = File::create(path)
642        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
643    let mut writer = BufWriter::new(new_file);
644
645    write_npy_header_2d(&mut writer, nrows, ncols, "|u1")?;
646
647    writer
648        .write_all(u8_data)
649        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
650    writer.flush()?;
651
652    Ok(())
653}
654
655/// Parse NPY header and return (shape, data_offset, is_fortran_order)
656fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
657    if mmap.len() < 10 {
658        return Err(Error::IndexLoad("NPY file too small".into()));
659    }
660
661    // Check magic
662    if &mmap[..6] != NPY_MAGIC {
663        return Err(Error::IndexLoad("Invalid NPY magic".into()));
664    }
665
666    let major_version = mmap[6];
667    let _minor_version = mmap[7];
668
669    // Read header length
670    let header_len = if major_version == 1 {
671        u16::from_le_bytes([mmap[8], mmap[9]]) as usize
672    } else if major_version == 2 {
673        if mmap.len() < 12 {
674            return Err(Error::IndexLoad("NPY v2 file too small".into()));
675        }
676        u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
677    } else {
678        return Err(Error::IndexLoad(format!(
679            "Unsupported NPY version: {}",
680            major_version
681        )));
682    };
683
684    let header_start = if major_version == 1 { 10 } else { 12 };
685    let header_end = header_start + header_len;
686
687    if mmap.len() < header_end {
688        return Err(Error::IndexLoad("NPY header exceeds file size".into()));
689    }
690
691    // Parse header dict (simplified Python dict parsing)
692    let header_str = std::str::from_utf8(&mmap[header_start..header_end])
693        .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
694
695    // Extract shape from header like: {'descr': '<i8', 'fortran_order': False, 'shape': (12345,), }
696    let shape = parse_shape_from_header(header_str)?;
697    let fortran_order = header_str.contains("'fortran_order': True");
698
699    Ok((shape, header_end, fortran_order))
700}
701
702/// Parse shape tuple from NPY header string
703fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
704    // Find 'shape': (...)
705    let shape_start = header
706        .find("'shape':")
707        .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
708
709    let after_shape = &header[shape_start + 8..];
710    let paren_start = after_shape
711        .find('(')
712        .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
713    let paren_end = after_shape
714        .find(')')
715        .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
716
717    let shape_content = &after_shape[paren_start + 1..paren_end];
718
719    // Parse comma-separated numbers
720    let mut shape = Vec::new();
721    for part in shape_content.split(',') {
722        let trimmed = part.trim();
723        if !trimmed.is_empty() {
724            let dim: usize = trimmed.parse().map_err(|e| {
725                Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
726            })?;
727            shape.push(dim);
728        }
729    }
730
731    Ok(shape)
732}
733
734/// Memory-mapped NPY array for i64 values (used for codes).
735///
736/// This struct provides zero-copy access to 1D i64 arrays stored in NPY format.
737pub struct MmapNpyArray1I64 {
738    _mmap: Mmap,
739    len: usize,
740    data_offset: usize,
741}
742
743impl MmapNpyArray1I64 {
744    /// Create an empty instance backed by an anonymous mmap (no file).
745    ///
746    /// Used to release file-backed mmap handles before file operations on Windows,
747    /// where deleting or renaming a memory-mapped file causes OS error 1224.
748    pub fn empty() -> Self {
749        let mmap = MmapMut::map_anon(1)
750            .expect("failed to create anonymous mmap")
751            .make_read_only()
752            .expect("failed to make anonymous mmap read-only");
753        Self {
754            _mmap: mmap,
755            len: 0,
756            data_offset: 0,
757        }
758    }
759
760    /// Load a 1D i64 array from an NPY file.
761    pub fn from_npy_file(path: &Path) -> Result<Self> {
762        let file = File::open(path)
763            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
764
765        let mmap = unsafe {
766            Mmap::map(&file).map_err(|e| {
767                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
768            })?
769        };
770
771        let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
772
773        if shape.is_empty() {
774            return Err(Error::IndexLoad("Empty shape in NPY file".into()));
775        }
776
777        let len = shape[0];
778
779        // Verify file size
780        let expected_size = data_offset + len * 8;
781        if mmap.len() < expected_size {
782            return Err(Error::IndexLoad(format!(
783                "NPY file size {} too small for {} elements",
784                mmap.len(),
785                len
786            )));
787        }
788
789        Ok(Self {
790            _mmap: mmap,
791            len,
792            data_offset,
793        })
794    }
795
796    /// Get the length of the array.
797    pub fn len(&self) -> usize {
798        self.len
799    }
800
801    /// Returns true if the array is empty.
802    pub fn is_empty(&self) -> bool {
803        self.len == 0
804    }
805
806    /// Get a slice of the data as &[i64].
807    ///
808    /// Returns a `Vec<i64>` instead of &[i64] to handle unaligned data safely.
809    ///
810    /// # Safety
811    /// The caller must ensure start <= end <= len.
812    pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
813        let count = end - start;
814        let mut result = Vec::with_capacity(count);
815
816        for i in start..end {
817            result.push(self.get(i));
818        }
819
820        result
821    }
822
823    /// Get a value at an index.
824    pub fn get(&self, idx: usize) -> i64 {
825        let start = self.data_offset + idx * 8;
826        let bytes = &self._mmap[start..start + 8];
827        i64::from_le_bytes(bytes.try_into().unwrap())
828    }
829}
830
831/// Memory-mapped NPY array for f32 values (used for centroids).
832///
833/// This struct provides zero-copy access to 2D f32 arrays stored in NPY format.
834/// Unlike loading into an owned `Array2<f32>`, this approach lets the OS manage
835/// paging, reducing resident memory usage for large centroid matrices.
836pub struct MmapNpyArray2F32 {
837    _mmap: Mmap,
838    shape: (usize, usize),
839    data_offset: usize,
840}
841
842impl MmapNpyArray2F32 {
843    /// Load a 2D f32 array from an NPY file.
844    pub fn from_npy_file(path: &Path) -> Result<Self> {
845        let file = File::open(path)
846            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
847
848        let mmap = unsafe {
849            Mmap::map(&file).map_err(|e| {
850                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
851            })?
852        };
853
854        let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
855
856        if shape_vec.len() != 2 {
857            return Err(Error::IndexLoad(format!(
858                "Expected 2D array, got {}D",
859                shape_vec.len()
860            )));
861        }
862
863        let shape = (shape_vec[0], shape_vec[1]);
864
865        // Verify file size (f32 = 4 bytes)
866        let expected_size = data_offset + shape.0 * shape.1 * 4;
867        if mmap.len() < expected_size {
868            return Err(Error::IndexLoad(format!(
869                "NPY file size {} too small for shape {:?}",
870                mmap.len(),
871                shape
872            )));
873        }
874
875        Ok(Self {
876            _mmap: mmap,
877            shape,
878            data_offset,
879        })
880    }
881
882    /// Get the shape of the array.
883    pub fn shape(&self) -> (usize, usize) {
884        self.shape
885    }
886
887    /// Get the number of rows.
888    pub fn nrows(&self) -> usize {
889        self.shape.0
890    }
891
892    /// Get the number of columns.
893    pub fn ncols(&self) -> usize {
894        self.shape.1
895    }
896
897    /// Get a view of the entire array as ArrayView2.
898    ///
899    /// This provides zero-copy access to the memory-mapped data.
900    pub fn view(&self) -> ArrayView2<'_, f32> {
901        let byte_start = self.data_offset;
902        let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
903        let bytes = &self._mmap[byte_start..byte_end];
904
905        // Safety: We've verified bounds and f32 is 4-byte aligned in NPY format
906        let data = unsafe {
907            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
908        };
909
910        ArrayView2::from_shape(self.shape, data).unwrap()
911    }
912
913    /// Get a view of a single row.
914    pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
915        let byte_start = self.data_offset + idx * self.shape.1 * 4;
916        let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
917
918        // Safety: We've verified bounds and alignment
919        let data =
920            unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
921
922        ArrayView1::from_shape(self.shape.1, data).unwrap()
923    }
924
925    /// Get a view of rows [start..end] as ArrayView2.
926    pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
927        let nrows = end - start;
928        let byte_start = self.data_offset + start * self.shape.1 * 4;
929        let byte_end = self.data_offset + end * self.shape.1 * 4;
930        let bytes = &self._mmap[byte_start..byte_end];
931
932        // Safety: We've verified bounds
933        let data = unsafe {
934            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
935        };
936
937        ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
938    }
939
940    /// Convert to an owned Array2 (loads all data into memory).
941    ///
942    /// Use this only when you need an owned copy; prefer `view()` for read-only access.
943    pub fn to_owned(&self) -> Array2<f32> {
944        self.view().to_owned()
945    }
946}
947
948/// Memory-mapped NPY array for u8 values (used for residuals).
949///
950/// This struct provides zero-copy access to 2D u8 arrays stored in NPY format.
951pub struct MmapNpyArray2U8 {
952    _mmap: Mmap,
953    shape: (usize, usize),
954    data_offset: usize,
955}
956
957impl MmapNpyArray2U8 {
958    /// Create an empty instance backed by an anonymous mmap (no file).
959    ///
960    /// Used to release file-backed mmap handles before file operations on Windows,
961    /// where deleting or renaming a memory-mapped file causes OS error 1224.
962    pub fn empty() -> Self {
963        let mmap = MmapMut::map_anon(1)
964            .expect("failed to create anonymous mmap")
965            .make_read_only()
966            .expect("failed to make anonymous mmap read-only");
967        Self {
968            _mmap: mmap,
969            shape: (0, 0),
970            data_offset: 0,
971        }
972    }
973
974    /// Load a 2D u8 array from an NPY file.
975    pub fn from_npy_file(path: &Path) -> Result<Self> {
976        let file = File::open(path)
977            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
978
979        let mmap = unsafe {
980            Mmap::map(&file).map_err(|e| {
981                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
982            })?
983        };
984
985        let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
986
987        if shape_vec.len() != 2 {
988            return Err(Error::IndexLoad(format!(
989                "Expected 2D array, got {}D",
990                shape_vec.len()
991            )));
992        }
993
994        let shape = (shape_vec[0], shape_vec[1]);
995
996        // Verify file size
997        let expected_size = data_offset + shape.0 * shape.1;
998        if mmap.len() < expected_size {
999            return Err(Error::IndexLoad(format!(
1000                "NPY file size {} too small for shape {:?}",
1001                mmap.len(),
1002                shape
1003            )));
1004        }
1005
1006        Ok(Self {
1007            _mmap: mmap,
1008            shape,
1009            data_offset,
1010        })
1011    }
1012
1013    /// Get the shape of the array.
1014    pub fn shape(&self) -> (usize, usize) {
1015        self.shape
1016    }
1017
1018    /// Get the number of rows.
1019    pub fn nrows(&self) -> usize {
1020        self.shape.0
1021    }
1022
1023    /// Get the number of columns.
1024    pub fn ncols(&self) -> usize {
1025        self.shape.1
1026    }
1027
1028    /// Get a view of rows [start..end] as ArrayView2.
1029    pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
1030        let nrows = end - start;
1031        let byte_start = self.data_offset + start * self.shape.1;
1032        let byte_end = self.data_offset + end * self.shape.1;
1033        let bytes = &self._mmap[byte_start..byte_end];
1034
1035        ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
1036    }
1037
1038    /// Get a view of the entire array.
1039    pub fn view(&self) -> ArrayView2<'_, u8> {
1040        self.slice_rows(0, self.shape.0)
1041    }
1042
1043    /// Get a single row as a slice.
1044    pub fn row(&self, idx: usize) -> &[u8] {
1045        let byte_start = self.data_offset + idx * self.shape.1;
1046        let byte_end = byte_start + self.shape.1;
1047        &self._mmap[byte_start..byte_end]
1048    }
1049}
1050
1051// ============================================================================
1052// Merged File Creation
1053// ============================================================================
1054
1055/// Manifest entry for tracking chunk files
1056#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1057pub struct ChunkManifestEntry {
1058    pub rows: usize,
1059    pub mtime: f64,
1060}
1061
1062/// Manifest for merged files, including metadata about the merge
1063#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1064pub struct MergeManifest {
1065    /// Chunk information
1066    pub chunks: HashMap<String, ChunkManifestEntry>,
1067    /// Number of padding rows used in the merge
1068    #[serde(default)]
1069    pub padding_rows: usize,
1070    /// Total rows in the merged file (including padding)
1071    #[serde(default)]
1072    pub total_rows: usize,
1073    /// Number of columns (for 2D arrays like residuals)
1074    #[serde(default)]
1075    pub ncols: usize,
1076}
1077
1078/// Legacy manifest type (for backwards compatibility during migration)
1079pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
1080
1081/// Load manifest from disk if it exists
1082/// Handles both new MergeManifest format and legacy ChunkManifest format
1083fn load_merge_manifest(manifest_path: &Path) -> Option<MergeManifest> {
1084    if manifest_path.exists() {
1085        if let Ok(file) = File::open(manifest_path) {
1086            // Try to load as new format first
1087            let reader = BufReader::new(file);
1088            if let Ok(manifest) = serde_json::from_reader::<_, MergeManifest>(reader) {
1089                return Some(manifest);
1090            }
1091            // Try legacy format
1092            if let Ok(file) = File::open(manifest_path) {
1093                if let Ok(chunks) =
1094                    serde_json::from_reader::<_, ChunkManifest>(BufReader::new(file))
1095                {
1096                    // Convert legacy format - missing padding info means we need to regenerate
1097                    return Some(MergeManifest {
1098                        chunks,
1099                        padding_rows: 0,
1100                        total_rows: 0,
1101                        ncols: 0,
1102                    });
1103                }
1104            }
1105        }
1106    }
1107    None
1108}
1109
1110/// Save manifest to disk atomically (write to temp file, then rename)
1111fn save_merge_manifest(manifest_path: &Path, manifest: &MergeManifest) -> Result<()> {
1112    let temp_path = manifest_path.with_extension("manifest.json.tmp");
1113
1114    // Write to temp file
1115    let file = File::create(&temp_path)
1116        .map_err(|e| Error::IndexLoad(format!("Failed to create temp manifest: {}", e)))?;
1117    let mut writer = BufWriter::new(file);
1118    serde_json::to_writer(&mut writer, manifest)
1119        .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
1120    writer
1121        .flush()
1122        .map_err(|e| Error::IndexLoad(format!("Failed to flush manifest: {}", e)))?;
1123
1124    // Sync to disk
1125    writer
1126        .into_inner()
1127        .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?
1128        .sync_all()
1129        .map_err(|e| Error::IndexLoad(format!("Failed to sync manifest: {}", e)))?;
1130
1131    // Atomic rename
1132    fs::rename(&temp_path, manifest_path)
1133        .map_err(|e| Error::IndexLoad(format!("Failed to rename manifest: {}", e)))?;
1134
1135    Ok(())
1136}
1137
1138/// Get file modification time as f64 seconds since epoch
1139fn get_mtime(path: &Path) -> Result<f64> {
1140    let metadata = fs::metadata(path)
1141        .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
1142    let mtime = metadata
1143        .modified()
1144        .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
1145    let duration = mtime
1146        .duration_since(std::time::UNIX_EPOCH)
1147        .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
1148    Ok(duration.as_secs_f64())
1149}
1150
1151/// Write NPY header for a 1D array
1152fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
1153    // Build header dict
1154    let header_dict = format!(
1155        "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
1156        dtype, len
1157    );
1158
1159    // Pad to 64-byte alignment (NPY requirement)
1160    let header_len = header_dict.len();
1161    let padding = (64 - ((10 + header_len) % 64)) % 64;
1162    let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1163
1164    // Write magic + version
1165    writer
1166        .write_all(NPY_MAGIC)
1167        .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1168    writer
1169        .write_all(&[1, 0])
1170        .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?; // v1.0
1171
1172    // Write header length (2 bytes for v1.0)
1173    let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1174    writer
1175        .write_all(&header_len_bytes)
1176        .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1177
1178    // Write header
1179    writer
1180        .write_all(padded_header.as_bytes())
1181        .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1182
1183    Ok(10 + padded_header.len())
1184}
1185
1186/// Write NPY header for a 2D array
1187fn write_npy_header_2d(
1188    writer: &mut impl Write,
1189    nrows: usize,
1190    ncols: usize,
1191    dtype: &str,
1192) -> Result<usize> {
1193    // Build header dict
1194    let header_dict = format!(
1195        "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
1196        dtype, nrows, ncols
1197    );
1198
1199    // Pad to 64-byte alignment
1200    let header_len = header_dict.len();
1201    let padding = (64 - ((10 + header_len) % 64)) % 64;
1202    let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1203
1204    // Write magic + version
1205    writer
1206        .write_all(NPY_MAGIC)
1207        .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1208    writer
1209        .write_all(&[1, 0])
1210        .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
1211
1212    // Write header length
1213    let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1214    writer
1215        .write_all(&header_len_bytes)
1216        .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1217
1218    // Write header
1219    writer
1220        .write_all(padded_header.as_bytes())
1221        .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1222
1223    Ok(10 + padded_header.len())
1224}
1225
1226/// Information about a chunk file for merging
1227struct ChunkInfo {
1228    path: std::path::PathBuf,
1229    filename: String,
1230    rows: usize,
1231    mtime: f64,
1232}
1233
1234/// Merge chunked codes NPY files into a single merged file.
1235///
1236/// Uses incremental persistence with manifest tracking to skip unchanged chunks.
1237/// Uses atomic writes to prevent corruption from interrupted writes.
1238/// Uses file-based locking to coordinate concurrent processes.
1239/// Returns the path to the merged file.
1240pub fn merge_codes_chunks(
1241    index_path: &Path,
1242    num_chunks: usize,
1243    padding_rows: usize,
1244) -> Result<std::path::PathBuf> {
1245    use ndarray_npy::ReadNpyExt;
1246
1247    let merged_path = index_path.join("merged_codes.npy");
1248    let manifest_path = index_path.join("merged_codes.manifest.json");
1249    let temp_path = index_path.join("merged_codes.npy.tmp");
1250    let lock_path = index_path.join("merged_codes.lock");
1251
1252    // Acquire exclusive lock to prevent concurrent merge operations.
1253    // This is critical for multi-process scenarios (e.g., multiple API workers).
1254    let _lock = FileLockGuard::acquire(&lock_path)?;
1255
1256    // After acquiring the lock, re-check if merge is still needed.
1257    // Another process might have completed the merge while we were waiting.
1258
1259    // Load previous manifest (re-read after acquiring lock)
1260    let old_manifest = load_merge_manifest(&manifest_path);
1261
1262    // Scan chunks and detect changes
1263    let mut chunks: Vec<ChunkInfo> = Vec::new();
1264    let mut total_rows = 0usize;
1265    let mut chain_broken = false;
1266
1267    for i in 0..num_chunks {
1268        let filename = format!("{}.codes.npy", i);
1269        let path = index_path.join(&filename);
1270
1271        if path.exists() {
1272            let mtime = get_mtime(&path)?;
1273
1274            // Get shape by reading header only
1275            let file = File::open(&path)?;
1276            let arr: Array1<i64> = Array1::read_npy(file)?;
1277            let rows = arr.len();
1278
1279            if rows > 0 {
1280                total_rows += rows;
1281
1282                // Check if this chunk changed
1283                let is_clean = if let Some(ref manifest) = old_manifest {
1284                    manifest
1285                        .chunks
1286                        .get(&filename)
1287                        .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1288                } else {
1289                    false
1290                };
1291
1292                if !is_clean {
1293                    chain_broken = true;
1294                }
1295
1296                chunks.push(ChunkInfo {
1297                    path,
1298                    filename,
1299                    rows,
1300                    mtime,
1301                });
1302            }
1303        }
1304    }
1305
1306    if total_rows == 0 {
1307        return Err(Error::IndexLoad("No data to merge".into()));
1308    }
1309
1310    let final_rows = total_rows + padding_rows;
1311
1312    // Check if we need to rewrite:
1313    // 1. Merged file doesn't exist
1314    // 2. Chunks have changed
1315    // 3. Padding has changed (stored in manifest)
1316    // 4. Total rows don't match (safety check)
1317    let padding_changed = old_manifest
1318        .as_ref()
1319        .map(|m| m.padding_rows != padding_rows)
1320        .unwrap_or(true);
1321    let total_rows_mismatch = old_manifest
1322        .as_ref()
1323        .map(|m| m.total_rows != final_rows)
1324        .unwrap_or(true);
1325
1326    let needs_full_rewrite =
1327        !merged_path.exists() || chain_broken || padding_changed || total_rows_mismatch;
1328
1329    if needs_full_rewrite {
1330        // Write to temp file first (atomic write pattern)
1331        let file = File::create(&temp_path)
1332            .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1333        let mut writer = BufWriter::new(file);
1334
1335        // Write header
1336        let header_size = write_npy_header_1d(&mut writer, final_rows, "<i8")?;
1337
1338        // Write chunk data
1339        let mut written_rows = 0usize;
1340        for chunk in &chunks {
1341            let file = File::open(&chunk.path)?;
1342            let arr: Array1<i64> = Array1::read_npy(file)?;
1343            for &val in arr.iter() {
1344                writer.write_all(&val.to_le_bytes())?;
1345            }
1346            written_rows += arr.len();
1347        }
1348
1349        // Write padding zeros
1350        for _ in 0..padding_rows {
1351            writer.write_all(&0i64.to_le_bytes())?;
1352        }
1353        written_rows += padding_rows;
1354
1355        // Flush and sync to disk
1356        writer
1357            .flush()
1358            .map_err(|e| Error::IndexLoad(format!("Failed to flush merged file: {}", e)))?;
1359        let file = writer
1360            .into_inner()
1361            .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1362        file.sync_all()
1363            .map_err(|e| Error::IndexLoad(format!("Failed to sync merged file to disk: {}", e)))?;
1364
1365        // Verify file size before renaming
1366        let expected_size = header_size + written_rows * 8;
1367        let actual_size = fs::metadata(&temp_path)
1368            .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1369            .len() as usize;
1370
1371        if actual_size != expected_size {
1372            // Clean up temp file and return error
1373            let _ = fs::remove_file(&temp_path);
1374            return Err(Error::IndexLoad(format!(
1375                "Merged codes file size mismatch: expected {} bytes, got {} bytes",
1376                expected_size, actual_size
1377            )));
1378        }
1379
1380        // Atomic rename (overwrites existing file)
1381        fs::rename(&temp_path, &merged_path)
1382            .map_err(|e| Error::IndexLoad(format!("Failed to rename merged file: {}", e)))?;
1383    } else {
1384        // Validate existing merged file before using it
1385        if merged_path.exists() {
1386            let file_size = fs::metadata(&merged_path)
1387                .map_err(|e| {
1388                    Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1389                })?
1390                .len() as usize;
1391
1392            // NPY header is at least 64 bytes, data is final_rows * 8 bytes
1393            let min_expected_size = 64 + final_rows * 8;
1394            if file_size < min_expected_size {
1395                // File is corrupted, force regeneration by recursing with empty manifest
1396                let _ = fs::remove_file(&merged_path);
1397                let _ = fs::remove_file(&manifest_path);
1398                // Lock is held, so we can safely drop it before recursing
1399                drop(_lock);
1400                return merge_codes_chunks(index_path, num_chunks, padding_rows);
1401            }
1402        }
1403    }
1404
1405    // Build and save manifest with full metadata
1406    let mut chunk_map = HashMap::new();
1407    for chunk in &chunks {
1408        chunk_map.insert(
1409            chunk.filename.clone(),
1410            ChunkManifestEntry {
1411                rows: chunk.rows,
1412                mtime: chunk.mtime,
1413            },
1414        );
1415    }
1416    let new_manifest = MergeManifest {
1417        chunks: chunk_map,
1418        padding_rows,
1419        total_rows: final_rows,
1420        ncols: 0, // Not used for 1D codes array
1421    };
1422    save_merge_manifest(&manifest_path, &new_manifest)?;
1423
1424    Ok(merged_path)
1425}
1426
1427/// Merge chunked residuals NPY files into a single merged file.
1428///
1429/// Uses atomic writes to prevent corruption from interrupted writes.
1430/// Uses file-based locking to coordinate concurrent processes.
1431pub fn merge_residuals_chunks(
1432    index_path: &Path,
1433    num_chunks: usize,
1434    padding_rows: usize,
1435) -> Result<std::path::PathBuf> {
1436    use ndarray_npy::ReadNpyExt;
1437
1438    let merged_path = index_path.join("merged_residuals.npy");
1439    let manifest_path = index_path.join("merged_residuals.manifest.json");
1440    let temp_path = index_path.join("merged_residuals.npy.tmp");
1441    let lock_path = index_path.join("merged_residuals.lock");
1442
1443    // Acquire exclusive lock to prevent concurrent merge operations.
1444    // This is critical for multi-process scenarios (e.g., multiple API workers).
1445    let _lock = FileLockGuard::acquire(&lock_path)?;
1446
1447    // After acquiring the lock, re-check if merge is still needed.
1448    // Another process might have completed the merge while we were waiting.
1449
1450    // Load previous manifest (re-read after acquiring lock)
1451    let old_manifest = load_merge_manifest(&manifest_path);
1452
1453    // Scan chunks and detect changes
1454    let mut chunks: Vec<ChunkInfo> = Vec::new();
1455    let mut total_rows = 0usize;
1456    let mut ncols = 0usize;
1457    let mut chain_broken = false;
1458
1459    for i in 0..num_chunks {
1460        let filename = format!("{}.residuals.npy", i);
1461        let path = index_path.join(&filename);
1462
1463        if path.exists() {
1464            let mtime = get_mtime(&path)?;
1465
1466            // Get shape by reading header
1467            let file = File::open(&path)?;
1468            let arr: Array2<u8> = Array2::read_npy(file)?;
1469            let rows = arr.nrows();
1470            ncols = arr.ncols();
1471
1472            if rows > 0 {
1473                total_rows += rows;
1474
1475                let is_clean = if let Some(ref manifest) = old_manifest {
1476                    manifest
1477                        .chunks
1478                        .get(&filename)
1479                        .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1480                } else {
1481                    false
1482                };
1483
1484                if !is_clean {
1485                    chain_broken = true;
1486                }
1487
1488                chunks.push(ChunkInfo {
1489                    path,
1490                    filename,
1491                    rows,
1492                    mtime,
1493                });
1494            }
1495        }
1496    }
1497
1498    if total_rows == 0 || ncols == 0 {
1499        return Err(Error::IndexLoad("No residual data to merge".into()));
1500    }
1501
1502    let final_rows = total_rows + padding_rows;
1503
1504    // Check if we need to rewrite:
1505    // 1. Merged file doesn't exist
1506    // 2. Chunks have changed
1507    // 3. Padding has changed
1508    // 4. Total rows or ncols don't match
1509    let padding_changed = old_manifest
1510        .as_ref()
1511        .map(|m| m.padding_rows != padding_rows)
1512        .unwrap_or(true);
1513    let total_rows_mismatch = old_manifest
1514        .as_ref()
1515        .map(|m| m.total_rows != final_rows)
1516        .unwrap_or(true);
1517    let ncols_mismatch = old_manifest
1518        .as_ref()
1519        .map(|m| m.ncols != ncols && m.ncols != 0)
1520        .unwrap_or(false);
1521
1522    let needs_full_rewrite = !merged_path.exists()
1523        || chain_broken
1524        || padding_changed
1525        || total_rows_mismatch
1526        || ncols_mismatch;
1527
1528    if needs_full_rewrite {
1529        // Write to temp file first (atomic write pattern)
1530        let file = File::create(&temp_path)
1531            .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1532        let mut writer = BufWriter::new(file);
1533
1534        // Write header
1535        let header_size = write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1536
1537        // Write chunk data
1538        let mut written_rows = 0usize;
1539        for chunk in &chunks {
1540            let file = File::open(&chunk.path)?;
1541            let arr: Array2<u8> = Array2::read_npy(file)?;
1542            for row in arr.rows() {
1543                writer.write_all(row.as_slice().unwrap())?;
1544            }
1545            written_rows += arr.nrows();
1546        }
1547
1548        // Write padding zeros
1549        let zero_row = vec![0u8; ncols];
1550        for _ in 0..padding_rows {
1551            writer.write_all(&zero_row)?;
1552        }
1553        written_rows += padding_rows;
1554
1555        // Flush and sync to disk
1556        writer
1557            .flush()
1558            .map_err(|e| Error::IndexLoad(format!("Failed to flush merged residuals: {}", e)))?;
1559        let file = writer
1560            .into_inner()
1561            .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1562        file.sync_all().map_err(|e| {
1563            Error::IndexLoad(format!("Failed to sync merged residuals to disk: {}", e))
1564        })?;
1565
1566        // Verify file size before renaming
1567        let expected_size = header_size + written_rows * ncols;
1568        let actual_size = fs::metadata(&temp_path)
1569            .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1570            .len() as usize;
1571
1572        if actual_size != expected_size {
1573            // Clean up temp file and return error
1574            let _ = fs::remove_file(&temp_path);
1575            return Err(Error::IndexLoad(format!(
1576                "Merged residuals file size mismatch: expected {} bytes, got {} bytes",
1577                expected_size, actual_size
1578            )));
1579        }
1580
1581        // Atomic rename
1582        fs::rename(&temp_path, &merged_path)
1583            .map_err(|e| Error::IndexLoad(format!("Failed to rename merged residuals: {}", e)))?;
1584    } else {
1585        // Validate existing merged file before using it
1586        if merged_path.exists() {
1587            let file_size = fs::metadata(&merged_path)
1588                .map_err(|e| {
1589                    Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1590                })?
1591                .len() as usize;
1592
1593            // NPY header is at least 64 bytes, data is final_rows * ncols bytes
1594            let min_expected_size = 64 + final_rows * ncols;
1595            if file_size < min_expected_size {
1596                // File is corrupted, force regeneration
1597                let _ = fs::remove_file(&merged_path);
1598                let _ = fs::remove_file(&manifest_path);
1599                // Lock is held, so we can safely drop it before recursing
1600                drop(_lock);
1601                return merge_residuals_chunks(index_path, num_chunks, padding_rows);
1602            }
1603        }
1604    }
1605
1606    // Build and save manifest with full metadata
1607    let mut chunk_map = HashMap::new();
1608    for chunk in &chunks {
1609        chunk_map.insert(
1610            chunk.filename.clone(),
1611            ChunkManifestEntry {
1612                rows: chunk.rows,
1613                mtime: chunk.mtime,
1614            },
1615        );
1616    }
1617    let new_manifest = MergeManifest {
1618        chunks: chunk_map,
1619        padding_rows,
1620        total_rows: final_rows,
1621        ncols,
1622    };
1623    save_merge_manifest(&manifest_path, &new_manifest)?;
1624
1625    Ok(merged_path)
1626}
1627
1628/// Clear merged files and manifests to force regeneration on next load.
1629///
1630/// This should be called after index updates to ensure the merged files
1631/// are regenerated with the latest data. The function silently ignores
1632/// missing files.
1633///
1634/// Acquires file locks to prevent racing with concurrent merge operations
1635/// in multi-process deployments.
1636pub fn clear_merged_files(index_path: &Path) -> Result<()> {
1637    // Acquire locks to prevent racing with ongoing merge operations.
1638    // This is important in multi-process scenarios where one process might
1639    // be loading (merging) while another is updating (clearing).
1640    let codes_lock_path = index_path.join("merged_codes.lock");
1641    let residuals_lock_path = index_path.join("merged_residuals.lock");
1642    let _codes_lock = FileLockGuard::acquire(&codes_lock_path)?;
1643    let _residuals_lock = FileLockGuard::acquire(&residuals_lock_path)?;
1644
1645    let files_to_remove = [
1646        "merged_codes.npy",
1647        "merged_codes.npy.tmp",
1648        "merged_codes.manifest.json",
1649        "merged_codes.manifest.json.tmp",
1650        "merged_residuals.npy",
1651        "merged_residuals.npy.tmp",
1652        "merged_residuals.manifest.json",
1653        "merged_residuals.manifest.json.tmp",
1654    ];
1655
1656    for filename in files_to_remove {
1657        let path = index_path.join(filename);
1658        if path.exists() {
1659            fs::remove_file(&path)
1660                .map_err(|e| Error::IndexLoad(format!("Failed to remove {}: {}", filename, e)))?;
1661        }
1662    }
1663
1664    Ok(())
1665}
1666
1667// ============================================================================
1668// Fast-PLAID Compatibility Conversion
1669// ============================================================================
1670
1671/// Convert a fast-plaid index to next-plaid compatible format.
1672///
1673/// This function detects and converts:
1674/// - float16 → float32 for centroids, avg_residual, bucket_cutoffs, bucket_weights
1675/// - int64 → int32 for ivf_lengths
1676/// - `<u1` → `|u1` for residuals
1677///
1678/// Returns true if any conversion was performed, false if already compatible.
1679pub fn convert_fastplaid_to_nextplaid(index_path: &Path) -> Result<bool> {
1680    let mut converted = false;
1681
1682    // Float files to convert from f16 to f32
1683    let float_files = [
1684        "centroids.npy",
1685        "avg_residual.npy",
1686        "bucket_cutoffs.npy",
1687        "bucket_weights.npy",
1688    ];
1689
1690    for filename in float_files {
1691        let path = index_path.join(filename);
1692        if path.exists() {
1693            let dtype = detect_npy_dtype(&path)?;
1694            if dtype == "<f2" {
1695                eprintln!("  Converting {} from float16 to float32", filename);
1696                convert_f16_to_f32_npy(&path)?;
1697                converted = true;
1698            }
1699        }
1700    }
1701
1702    // Convert ivf_lengths from i64 to i32
1703    let ivf_lengths_path = index_path.join("ivf_lengths.npy");
1704    if ivf_lengths_path.exists() {
1705        let dtype = detect_npy_dtype(&ivf_lengths_path)?;
1706        if dtype == "<i8" {
1707            eprintln!("  Converting ivf_lengths.npy from int64 to int32");
1708            convert_i64_to_i32_npy(&ivf_lengths_path)?;
1709            converted = true;
1710        }
1711    }
1712
1713    // Normalize residual files to use "|u1" descriptor
1714    // fast-plaid uses "<u1" which ndarray_npy doesn't accept
1715    for entry in fs::read_dir(index_path)? {
1716        let entry = entry?;
1717        let filename = entry.file_name().to_string_lossy().to_string();
1718        if filename.ends_with(".residuals.npy") {
1719            let path = entry.path();
1720            let dtype = detect_npy_dtype(&path)?;
1721            if dtype == "<u1" {
1722                eprintln!(
1723                    "  Normalizing {} dtype descriptor from <u1 to |u1",
1724                    filename
1725                );
1726                normalize_u8_npy(&path)?;
1727                converted = true;
1728            }
1729        }
1730    }
1731
1732    Ok(converted)
1733}
1734
1735#[cfg(test)]
1736mod tests {
1737    use super::*;
1738    use std::io::Write;
1739    use tempfile::NamedTempFile;
1740
1741    #[test]
1742    fn test_mmap_array2_f32() {
1743        // Create a test file
1744        let mut file = NamedTempFile::new().unwrap();
1745
1746        // Write header (3 rows, 2 cols)
1747        file.write_all(&3i64.to_le_bytes()).unwrap();
1748        file.write_all(&2i64.to_le_bytes()).unwrap();
1749
1750        // Write data
1751        for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1752            file.write_all(&val.to_le_bytes()).unwrap();
1753        }
1754
1755        file.flush().unwrap();
1756
1757        // Load and verify
1758        let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1759        assert_eq!(mmap.shape(), (3, 2));
1760
1761        let row0 = mmap.row(0);
1762        assert_eq!(row0[0], 1.0);
1763        assert_eq!(row0[1], 2.0);
1764
1765        let owned = mmap.to_owned();
1766        assert_eq!(owned[[2, 0]], 5.0);
1767        assert_eq!(owned[[2, 1]], 6.0);
1768    }
1769
1770    #[test]
1771    fn test_mmap_array1_i64() {
1772        let mut file = NamedTempFile::new().unwrap();
1773
1774        // Write header (4 elements)
1775        file.write_all(&4i64.to_le_bytes()).unwrap();
1776
1777        // Write data
1778        for val in [10i64, 20, 30, 40] {
1779            file.write_all(&val.to_le_bytes()).unwrap();
1780        }
1781
1782        file.flush().unwrap();
1783
1784        let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1785        assert_eq!(mmap.len(), 4);
1786        assert_eq!(mmap.get(0), 10);
1787        assert_eq!(mmap.get(3), 40);
1788
1789        let owned = mmap.to_owned();
1790        assert_eq!(owned[1], 20);
1791        assert_eq!(owned[2], 30);
1792    }
1793
1794    #[test]
1795    fn test_write_read_roundtrip() {
1796        let file = NamedTempFile::new().unwrap();
1797        let path = file.path();
1798
1799        // Create test array
1800        let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1801
1802        // Write
1803        write_array2_f32(&array, path).unwrap();
1804
1805        // Read back
1806        let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1807        let loaded = mmap.to_owned();
1808
1809        assert_eq!(array, loaded);
1810    }
1811}