Skip to main content

next_plaid/
mmap.rs

1//! Memory-mapped file support for efficient large index loading.
2//!
3//! This module provides utilities for loading large arrays from disk using
4//! memory-mapped files, avoiding the need to load entire arrays into RAM.
5//!
6//! Two formats are supported:
7//! - Custom raw binary format (legacy): 8-byte header with shape, then raw data
8//! - NPY format: Standard NumPy format with header, used for index files
9
10use std::collections::HashMap;
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, BufWriter, Write};
14use std::path::Path;
15
16use byteorder::{LittleEndian, ReadBytesExt};
17use fs2::FileExt;
18use memmap2::Mmap;
19use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
20
21use crate::error::{Error, Result};
22
23/// RAII guard for file-based locking to coordinate concurrent processes.
24/// The lock is released when this guard is dropped.
25struct FileLockGuard {
26    _file: File,
27}
28
29impl FileLockGuard {
30    /// Acquire an exclusive lock on the given lock file path.
31    /// Creates the lock file if it doesn't exist.
32    /// Blocks until the lock is acquired.
33    fn acquire(lock_path: &Path) -> Result<Self> {
34        let file = std::fs::OpenOptions::new()
35            .read(true)
36            .write(true)
37            .create(true)
38            .truncate(false)
39            .open(lock_path)
40            .map_err(|e| {
41                Error::IndexLoad(format!("Failed to open lock file {:?}: {}", lock_path, e))
42            })?;
43
44        file.lock_exclusive().map_err(|e| {
45            Error::IndexLoad(format!("Failed to acquire lock on {:?}: {}", lock_path, e))
46        })?;
47
48        Ok(Self { _file: file })
49    }
50}
51
52impl Drop for FileLockGuard {
53    fn drop(&mut self) {
54        // Lock is automatically released when file is closed
55        let _ = self._file.unlock();
56    }
57}
58
59/// A memory-mapped array of f32 values.
60///
61/// This struct provides zero-copy access to large arrays stored on disk.
62pub struct MmapArray2F32 {
63    _mmap: Mmap,
64    shape: (usize, usize),
65    data_offset: usize,
66}
67
68impl MmapArray2F32 {
69    /// Load a 2D f32 array from a raw binary file.
70    ///
71    /// The file format is:
72    /// - 8 bytes: nrows (i64 little-endian)
73    /// - 8 bytes: ncols (i64 little-endian)
74    /// - nrows * ncols * 4 bytes: f32 data (little-endian)
75    pub fn from_raw_file(path: &Path) -> Result<Self> {
76        let file = File::open(path)
77            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
78
79        let mmap = unsafe {
80            Mmap::map(&file)
81                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
82        };
83
84        if mmap.len() < 16 {
85            return Err(Error::IndexLoad("File too small for header".into()));
86        }
87
88        // Read shape from header
89        let mut cursor = std::io::Cursor::new(&mmap[..16]);
90        let nrows = cursor
91            .read_i64::<LittleEndian>()
92            .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
93            as usize;
94        let ncols = cursor
95            .read_i64::<LittleEndian>()
96            .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
97            as usize;
98
99        let expected_size = 16 + nrows * ncols * 4;
100        if mmap.len() < expected_size {
101            return Err(Error::IndexLoad(format!(
102                "File size {} too small for shape ({}, {})",
103                mmap.len(),
104                nrows,
105                ncols
106            )));
107        }
108
109        Ok(Self {
110            _mmap: mmap,
111            shape: (nrows, ncols),
112            data_offset: 16,
113        })
114    }
115
116    /// Get the shape of the array.
117    pub fn shape(&self) -> (usize, usize) {
118        self.shape
119    }
120
121    /// Get the number of rows.
122    pub fn nrows(&self) -> usize {
123        self.shape.0
124    }
125
126    /// Get the number of columns.
127    pub fn ncols(&self) -> usize {
128        self.shape.1
129    }
130
131    /// Get a view of a row.
132    pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
133        let start = self.data_offset + idx * self.shape.1 * 4;
134        let bytes = &self._mmap[start..start + self.shape.1 * 4];
135
136        // Safety: We've verified the bounds and alignment
137        let data =
138            unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
139
140        ArrayView1::from_shape(self.shape.1, data).unwrap()
141    }
142
143    /// Load a range of rows into an owned Array2.
144    pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
145        let nrows = end - start;
146        let byte_start = self.data_offset + start * self.shape.1 * 4;
147        let byte_end = self.data_offset + end * self.shape.1 * 4;
148        let bytes = &self._mmap[byte_start..byte_end];
149
150        // Safety: We've verified the bounds
151        let data = unsafe {
152            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
153        };
154
155        Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
156    }
157
158    /// Convert to an owned Array2 (loads all data into memory).
159    pub fn to_owned(&self) -> Array2<f32> {
160        self.load_rows(0, self.shape.0)
161    }
162}
163
164/// A memory-mapped array of u8 values.
165pub struct MmapArray2U8 {
166    _mmap: Mmap,
167    shape: (usize, usize),
168    data_offset: usize,
169}
170
171impl MmapArray2U8 {
172    /// Load a 2D u8 array from a raw binary file.
173    pub fn from_raw_file(path: &Path) -> Result<Self> {
174        let file = File::open(path)
175            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
176
177        let mmap = unsafe {
178            Mmap::map(&file)
179                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
180        };
181
182        if mmap.len() < 16 {
183            return Err(Error::IndexLoad("File too small for header".into()));
184        }
185
186        let mut cursor = std::io::Cursor::new(&mmap[..16]);
187        let nrows = cursor
188            .read_i64::<LittleEndian>()
189            .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
190            as usize;
191        let ncols = cursor
192            .read_i64::<LittleEndian>()
193            .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
194            as usize;
195
196        let expected_size = 16 + nrows * ncols;
197        if mmap.len() < expected_size {
198            return Err(Error::IndexLoad(format!(
199                "File size {} too small for shape ({}, {})",
200                mmap.len(),
201                nrows,
202                ncols
203            )));
204        }
205
206        Ok(Self {
207            _mmap: mmap,
208            shape: (nrows, ncols),
209            data_offset: 16,
210        })
211    }
212
213    /// Get the shape of the array.
214    pub fn shape(&self) -> (usize, usize) {
215        self.shape
216    }
217
218    /// Get a view of the data as ArrayView2.
219    pub fn view(&self) -> ArrayView2<'_, u8> {
220        let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
221        ArrayView2::from_shape(self.shape, bytes).unwrap()
222    }
223
224    /// Load a range of rows into an owned Array2.
225    pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
226        let nrows = end - start;
227        let byte_start = self.data_offset + start * self.shape.1;
228        let byte_end = self.data_offset + end * self.shape.1;
229        let bytes = &self._mmap[byte_start..byte_end];
230
231        Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
232    }
233
234    /// Convert to an owned Array2.
235    pub fn to_owned(&self) -> Array2<u8> {
236        self.load_rows(0, self.shape.0)
237    }
238}
239
240/// A memory-mapped array of i64 values.
241pub struct MmapArray1I64 {
242    _mmap: Mmap,
243    len: usize,
244    data_offset: usize,
245}
246
247impl MmapArray1I64 {
248    /// Load a 1D i64 array from a raw binary file.
249    pub fn from_raw_file(path: &Path) -> Result<Self> {
250        let file = File::open(path)
251            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
252
253        let mmap = unsafe {
254            Mmap::map(&file)
255                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
256        };
257
258        if mmap.len() < 8 {
259            return Err(Error::IndexLoad("File too small for header".into()));
260        }
261
262        let mut cursor = std::io::Cursor::new(&mmap[..8]);
263        let len = cursor
264            .read_i64::<LittleEndian>()
265            .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
266            as usize;
267
268        let expected_size = 8 + len * 8;
269        if mmap.len() < expected_size {
270            return Err(Error::IndexLoad(format!(
271                "File size {} too small for length {}",
272                mmap.len(),
273                len
274            )));
275        }
276
277        Ok(Self {
278            _mmap: mmap,
279            len,
280            data_offset: 8,
281        })
282    }
283
284    /// Get the length of the array.
285    pub fn len(&self) -> usize {
286        self.len
287    }
288
289    /// Returns true if the array is empty.
290    pub fn is_empty(&self) -> bool {
291        self.len == 0
292    }
293
294    /// Get a value at an index.
295    pub fn get(&self, idx: usize) -> i64 {
296        let start = self.data_offset + idx * 8;
297        let bytes = &self._mmap[start..start + 8];
298        i64::from_le_bytes(bytes.try_into().unwrap())
299    }
300
301    /// Convert to an owned Array1.
302    pub fn to_owned(&self) -> Array1<i64> {
303        let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
304
305        // Safety: We've verified the bounds
306        let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
307
308        Array1::from_vec(data.to_vec())
309    }
310}
311
312/// Write an `Array2<f32>` to a raw binary file format.
313pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
314    use std::io::Write;
315
316    let file = File::create(path)
317        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
318    let mut writer = std::io::BufWriter::new(file);
319
320    let nrows = array.nrows() as i64;
321    let ncols = array.ncols() as i64;
322
323    writer
324        .write_all(&nrows.to_le_bytes())
325        .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
326    writer
327        .write_all(&ncols.to_le_bytes())
328        .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
329
330    for val in array.iter() {
331        writer
332            .write_all(&val.to_le_bytes())
333            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
334    }
335
336    writer
337        .flush()
338        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
339
340    Ok(())
341}
342
343/// Write an `Array2<u8>` to a raw binary file format.
344pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
345    use std::io::Write;
346
347    let file = File::create(path)
348        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
349    let mut writer = std::io::BufWriter::new(file);
350
351    let nrows = array.nrows() as i64;
352    let ncols = array.ncols() as i64;
353
354    writer
355        .write_all(&nrows.to_le_bytes())
356        .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
357    writer
358        .write_all(&ncols.to_le_bytes())
359        .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
360
361    for row in array.rows() {
362        writer
363            .write_all(row.as_slice().unwrap())
364            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
365    }
366
367    writer
368        .flush()
369        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
370
371    Ok(())
372}
373
374/// Write an `Array1<i64>` to a raw binary file format.
375pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
376    use std::io::Write;
377
378    let file = File::create(path)
379        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
380    let mut writer = std::io::BufWriter::new(file);
381
382    let len = array.len() as i64;
383
384    writer
385        .write_all(&len.to_le_bytes())
386        .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
387
388    for val in array.iter() {
389        writer
390            .write_all(&val.to_le_bytes())
391            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
392    }
393
394    writer
395        .flush()
396        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
397
398    Ok(())
399}
400
401// ============================================================================
402// NPY Format Memory-Mapped Arrays
403// ============================================================================
404
405/// NPY file magic bytes
406const NPY_MAGIC: &[u8] = b"\x93NUMPY";
407
408/// Parse dtype from NPY header string (e.g., "<f2" for float16, "<f4" for float32)
409fn parse_dtype_from_header(header: &str) -> Result<String> {
410    // Find 'descr': '...'
411    let descr_start = header
412        .find("'descr':")
413        .ok_or_else(|| Error::IndexLoad("No descr in NPY header".into()))?;
414
415    let after_descr = &header[descr_start + 8..];
416    let quote_start = after_descr
417        .find('\'')
418        .ok_or_else(|| Error::IndexLoad("No dtype quote in NPY header".into()))?;
419    let rest = &after_descr[quote_start + 1..];
420    let quote_end = rest
421        .find('\'')
422        .ok_or_else(|| Error::IndexLoad("Unclosed dtype quote in NPY header".into()))?;
423
424    Ok(rest[..quote_end].to_string())
425}
426
427/// Detect NPY file dtype without loading the entire file
428pub fn detect_npy_dtype(path: &Path) -> Result<String> {
429    let file = File::open(path)
430        .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
431
432    let mmap = unsafe {
433        Mmap::map(&file)
434            .map_err(|e| Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e)))?
435    };
436
437    if mmap.len() < 10 {
438        return Err(Error::IndexLoad("NPY file too small".into()));
439    }
440
441    // Check magic
442    if &mmap[..6] != NPY_MAGIC {
443        return Err(Error::IndexLoad("Invalid NPY magic".into()));
444    }
445
446    let major_version = mmap[6];
447
448    // Read header length
449    let header_len = if major_version == 1 {
450        u16::from_le_bytes([mmap[8], mmap[9]]) as usize
451    } else if major_version == 2 {
452        if mmap.len() < 12 {
453            return Err(Error::IndexLoad("NPY v2 file too small".into()));
454        }
455        u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
456    } else {
457        return Err(Error::IndexLoad(format!(
458            "Unsupported NPY version: {}",
459            major_version
460        )));
461    };
462
463    let header_start = if major_version == 1 { 10 } else { 12 };
464    let header_end = header_start + header_len;
465
466    if mmap.len() < header_end {
467        return Err(Error::IndexLoad("NPY header exceeds file size".into()));
468    }
469
470    let header_str = std::str::from_utf8(&mmap[header_start..header_end])
471        .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
472
473    parse_dtype_from_header(header_str)
474}
475
476/// Convert a float16 NPY file to float32 in place
477pub fn convert_f16_to_f32_npy(path: &Path) -> Result<()> {
478    use half::f16;
479    use std::io::Read;
480
481    // Read the entire file
482    let mut file = File::open(path)
483        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
484    let mut data = Vec::new();
485    file.read_to_end(&mut data)
486        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
487
488    if data.len() < 10 || &data[..6] != NPY_MAGIC {
489        return Err(Error::IndexLoad("Invalid NPY file".into()));
490    }
491
492    let major_version = data[6];
493    let header_start = if major_version == 1 { 10 } else { 12 };
494    let header_len = if major_version == 1 {
495        u16::from_le_bytes([data[8], data[9]]) as usize
496    } else {
497        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
498    };
499    let header_end = header_start + header_len;
500
501    // Parse header to get shape
502    let header_str = std::str::from_utf8(&data[header_start..header_end])
503        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
504    let shape = parse_shape_from_header(header_str)?;
505
506    // Calculate total elements
507    let total_elements: usize = shape.iter().product();
508    let f16_data = &data[header_end..header_end + total_elements * 2];
509
510    // Convert f16 to f32
511    let mut f32_data = Vec::with_capacity(total_elements * 4);
512    for chunk in f16_data.chunks(2) {
513        let f16_val = f16::from_le_bytes([chunk[0], chunk[1]]);
514        let f32_val: f32 = f16_val.to_f32();
515        f32_data.extend_from_slice(&f32_val.to_le_bytes());
516    }
517
518    // Write new file with f32 dtype
519    let file = File::create(path)
520        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
521    let mut writer = BufWriter::new(file);
522
523    if shape.len() == 1 {
524        write_npy_header_1d(&mut writer, shape[0], "<f4")?;
525    } else if shape.len() == 2 {
526        write_npy_header_2d(&mut writer, shape[0], shape[1], "<f4")?;
527    } else {
528        return Err(Error::IndexLoad("Unsupported shape dimensions".into()));
529    }
530
531    writer
532        .write_all(&f32_data)
533        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
534    writer.flush()?;
535
536    Ok(())
537}
538
539/// Convert an int64 NPY file to int32 in place
540pub fn convert_i64_to_i32_npy(path: &Path) -> Result<()> {
541    use std::io::Read;
542
543    // Read the entire file
544    let mut file = File::open(path)
545        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
546    let mut data = Vec::new();
547    file.read_to_end(&mut data)
548        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
549
550    if data.len() < 10 || &data[..6] != NPY_MAGIC {
551        return Err(Error::IndexLoad("Invalid NPY file".into()));
552    }
553
554    let major_version = data[6];
555    let header_start = if major_version == 1 { 10 } else { 12 };
556    let header_len = if major_version == 1 {
557        u16::from_le_bytes([data[8], data[9]]) as usize
558    } else {
559        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
560    };
561    let header_end = header_start + header_len;
562
563    // Parse header to get shape
564    let header_str = std::str::from_utf8(&data[header_start..header_end])
565        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
566    let shape = parse_shape_from_header(header_str)?;
567
568    if shape.len() != 1 {
569        return Err(Error::IndexLoad("Expected 1D array for i64->i32".into()));
570    }
571
572    let len = shape[0];
573    let i64_data = &data[header_end..header_end + len * 8];
574
575    // Convert i64 to i32
576    let mut i32_data = Vec::with_capacity(len * 4);
577    for chunk in i64_data.chunks(8) {
578        let i64_val = i64::from_le_bytes(chunk.try_into().unwrap());
579        let i32_val = i64_val as i32;
580        i32_data.extend_from_slice(&i32_val.to_le_bytes());
581    }
582
583    // Write new file with i32 dtype
584    let file = File::create(path)
585        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
586    let mut writer = BufWriter::new(file);
587
588    write_npy_header_1d(&mut writer, len, "<i4")?;
589
590    writer
591        .write_all(&i32_data)
592        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
593    writer.flush()?;
594
595    Ok(())
596}
597
598/// Re-save a u8 NPY file to ensure dtype descriptor is "|u1" (platform-independent)
599///
600/// Note: We can't use ndarray_npy::ReadNpyExt here because it doesn't accept "<u1"
601/// descriptor, so we manually read the raw data and resave with "|u1".
602pub fn normalize_u8_npy(path: &Path) -> Result<()> {
603    use std::io::Read;
604
605    // Read the entire file
606    let mut file = File::open(path)
607        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
608    let mut data = Vec::new();
609    file.read_to_end(&mut data)
610        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
611
612    if data.len() < 10 || &data[..6] != NPY_MAGIC {
613        return Err(Error::IndexLoad("Invalid NPY file".into()));
614    }
615
616    let major_version = data[6];
617    let header_start = if major_version == 1 { 10 } else { 12 };
618    let header_len = if major_version == 1 {
619        u16::from_le_bytes([data[8], data[9]]) as usize
620    } else {
621        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
622    };
623    let header_end = header_start + header_len;
624
625    // Parse header to get shape
626    let header_str = std::str::from_utf8(&data[header_start..header_end])
627        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
628    let shape = parse_shape_from_header(header_str)?;
629
630    if shape.len() != 2 {
631        return Err(Error::IndexLoad(
632            "Expected 2D array for u8 normalization".into(),
633        ));
634    }
635
636    let nrows = shape[0];
637    let ncols = shape[1];
638    let u8_data = &data[header_end..header_end + nrows * ncols];
639
640    // Re-write with explicit "|u1" dtype
641    let new_file = File::create(path)
642        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
643    let mut writer = BufWriter::new(new_file);
644
645    write_npy_header_2d(&mut writer, nrows, ncols, "|u1")?;
646
647    writer
648        .write_all(u8_data)
649        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
650    writer.flush()?;
651
652    Ok(())
653}
654
655/// Parse NPY header and return (shape, data_offset, is_fortran_order)
656fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
657    if mmap.len() < 10 {
658        return Err(Error::IndexLoad("NPY file too small".into()));
659    }
660
661    // Check magic
662    if &mmap[..6] != NPY_MAGIC {
663        return Err(Error::IndexLoad("Invalid NPY magic".into()));
664    }
665
666    let major_version = mmap[6];
667    let _minor_version = mmap[7];
668
669    // Read header length
670    let header_len = if major_version == 1 {
671        u16::from_le_bytes([mmap[8], mmap[9]]) as usize
672    } else if major_version == 2 {
673        if mmap.len() < 12 {
674            return Err(Error::IndexLoad("NPY v2 file too small".into()));
675        }
676        u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
677    } else {
678        return Err(Error::IndexLoad(format!(
679            "Unsupported NPY version: {}",
680            major_version
681        )));
682    };
683
684    let header_start = if major_version == 1 { 10 } else { 12 };
685    let header_end = header_start + header_len;
686
687    if mmap.len() < header_end {
688        return Err(Error::IndexLoad("NPY header exceeds file size".into()));
689    }
690
691    // Parse header dict (simplified Python dict parsing)
692    let header_str = std::str::from_utf8(&mmap[header_start..header_end])
693        .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
694
695    // Extract shape from header like: {'descr': '<i8', 'fortran_order': False, 'shape': (12345,), }
696    let shape = parse_shape_from_header(header_str)?;
697    let fortran_order = header_str.contains("'fortran_order': True");
698
699    Ok((shape, header_end, fortran_order))
700}
701
702/// Parse shape tuple from NPY header string
703fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
704    // Find 'shape': (...)
705    let shape_start = header
706        .find("'shape':")
707        .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
708
709    let after_shape = &header[shape_start + 8..];
710    let paren_start = after_shape
711        .find('(')
712        .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
713    let paren_end = after_shape
714        .find(')')
715        .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
716
717    let shape_content = &after_shape[paren_start + 1..paren_end];
718
719    // Parse comma-separated numbers
720    let mut shape = Vec::new();
721    for part in shape_content.split(',') {
722        let trimmed = part.trim();
723        if !trimmed.is_empty() {
724            let dim: usize = trimmed.parse().map_err(|e| {
725                Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
726            })?;
727            shape.push(dim);
728        }
729    }
730
731    Ok(shape)
732}
733
734/// Memory-mapped NPY array for i64 values (used for codes).
735///
736/// This struct provides zero-copy access to 1D i64 arrays stored in NPY format.
737pub struct MmapNpyArray1I64 {
738    _mmap: Mmap,
739    len: usize,
740    data_offset: usize,
741}
742
743impl MmapNpyArray1I64 {
744    /// Load a 1D i64 array from an NPY file.
745    pub fn from_npy_file(path: &Path) -> Result<Self> {
746        let file = File::open(path)
747            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
748
749        let mmap = unsafe {
750            Mmap::map(&file).map_err(|e| {
751                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
752            })?
753        };
754
755        let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
756
757        if shape.is_empty() {
758            return Err(Error::IndexLoad("Empty shape in NPY file".into()));
759        }
760
761        let len = shape[0];
762
763        // Verify file size
764        let expected_size = data_offset + len * 8;
765        if mmap.len() < expected_size {
766            return Err(Error::IndexLoad(format!(
767                "NPY file size {} too small for {} elements",
768                mmap.len(),
769                len
770            )));
771        }
772
773        Ok(Self {
774            _mmap: mmap,
775            len,
776            data_offset,
777        })
778    }
779
780    /// Get the length of the array.
781    pub fn len(&self) -> usize {
782        self.len
783    }
784
785    /// Returns true if the array is empty.
786    pub fn is_empty(&self) -> bool {
787        self.len == 0
788    }
789
790    /// Get a slice of the data as &[i64].
791    ///
792    /// Returns a `Vec<i64>` instead of &[i64] to handle unaligned data safely.
793    ///
794    /// # Safety
795    /// The caller must ensure start <= end <= len.
796    pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
797        let count = end - start;
798        let mut result = Vec::with_capacity(count);
799
800        for i in start..end {
801            result.push(self.get(i));
802        }
803
804        result
805    }
806
807    /// Get a value at an index.
808    pub fn get(&self, idx: usize) -> i64 {
809        let start = self.data_offset + idx * 8;
810        let bytes = &self._mmap[start..start + 8];
811        i64::from_le_bytes(bytes.try_into().unwrap())
812    }
813}
814
815/// Memory-mapped NPY array for f32 values (used for centroids).
816///
817/// This struct provides zero-copy access to 2D f32 arrays stored in NPY format.
818/// Unlike loading into an owned `Array2<f32>`, this approach lets the OS manage
819/// paging, reducing resident memory usage for large centroid matrices.
820pub struct MmapNpyArray2F32 {
821    _mmap: Mmap,
822    shape: (usize, usize),
823    data_offset: usize,
824}
825
826impl MmapNpyArray2F32 {
827    /// Load a 2D f32 array from an NPY file.
828    pub fn from_npy_file(path: &Path) -> Result<Self> {
829        let file = File::open(path)
830            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
831
832        let mmap = unsafe {
833            Mmap::map(&file).map_err(|e| {
834                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
835            })?
836        };
837
838        let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
839
840        if shape_vec.len() != 2 {
841            return Err(Error::IndexLoad(format!(
842                "Expected 2D array, got {}D",
843                shape_vec.len()
844            )));
845        }
846
847        let shape = (shape_vec[0], shape_vec[1]);
848
849        // Verify file size (f32 = 4 bytes)
850        let expected_size = data_offset + shape.0 * shape.1 * 4;
851        if mmap.len() < expected_size {
852            return Err(Error::IndexLoad(format!(
853                "NPY file size {} too small for shape {:?}",
854                mmap.len(),
855                shape
856            )));
857        }
858
859        Ok(Self {
860            _mmap: mmap,
861            shape,
862            data_offset,
863        })
864    }
865
866    /// Get the shape of the array.
867    pub fn shape(&self) -> (usize, usize) {
868        self.shape
869    }
870
871    /// Get the number of rows.
872    pub fn nrows(&self) -> usize {
873        self.shape.0
874    }
875
876    /// Get the number of columns.
877    pub fn ncols(&self) -> usize {
878        self.shape.1
879    }
880
881    /// Get a view of the entire array as ArrayView2.
882    ///
883    /// This provides zero-copy access to the memory-mapped data.
884    pub fn view(&self) -> ArrayView2<'_, f32> {
885        let byte_start = self.data_offset;
886        let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
887        let bytes = &self._mmap[byte_start..byte_end];
888
889        // Safety: We've verified bounds and f32 is 4-byte aligned in NPY format
890        let data = unsafe {
891            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
892        };
893
894        ArrayView2::from_shape(self.shape, data).unwrap()
895    }
896
897    /// Get a view of a single row.
898    pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
899        let byte_start = self.data_offset + idx * self.shape.1 * 4;
900        let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
901
902        // Safety: We've verified bounds and alignment
903        let data =
904            unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
905
906        ArrayView1::from_shape(self.shape.1, data).unwrap()
907    }
908
909    /// Get a view of rows [start..end] as ArrayView2.
910    pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
911        let nrows = end - start;
912        let byte_start = self.data_offset + start * self.shape.1 * 4;
913        let byte_end = self.data_offset + end * self.shape.1 * 4;
914        let bytes = &self._mmap[byte_start..byte_end];
915
916        // Safety: We've verified bounds
917        let data = unsafe {
918            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
919        };
920
921        ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
922    }
923
924    /// Convert to an owned Array2 (loads all data into memory).
925    ///
926    /// Use this only when you need an owned copy; prefer `view()` for read-only access.
927    pub fn to_owned(&self) -> Array2<f32> {
928        self.view().to_owned()
929    }
930}
931
932/// Memory-mapped NPY array for u8 values (used for residuals).
933///
934/// This struct provides zero-copy access to 2D u8 arrays stored in NPY format.
935pub struct MmapNpyArray2U8 {
936    _mmap: Mmap,
937    shape: (usize, usize),
938    data_offset: usize,
939}
940
941impl MmapNpyArray2U8 {
942    /// Load a 2D u8 array from an NPY file.
943    pub fn from_npy_file(path: &Path) -> Result<Self> {
944        let file = File::open(path)
945            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
946
947        let mmap = unsafe {
948            Mmap::map(&file).map_err(|e| {
949                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
950            })?
951        };
952
953        let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
954
955        if shape_vec.len() != 2 {
956            return Err(Error::IndexLoad(format!(
957                "Expected 2D array, got {}D",
958                shape_vec.len()
959            )));
960        }
961
962        let shape = (shape_vec[0], shape_vec[1]);
963
964        // Verify file size
965        let expected_size = data_offset + shape.0 * shape.1;
966        if mmap.len() < expected_size {
967            return Err(Error::IndexLoad(format!(
968                "NPY file size {} too small for shape {:?}",
969                mmap.len(),
970                shape
971            )));
972        }
973
974        Ok(Self {
975            _mmap: mmap,
976            shape,
977            data_offset,
978        })
979    }
980
981    /// Get the shape of the array.
982    pub fn shape(&self) -> (usize, usize) {
983        self.shape
984    }
985
986    /// Get the number of rows.
987    pub fn nrows(&self) -> usize {
988        self.shape.0
989    }
990
991    /// Get the number of columns.
992    pub fn ncols(&self) -> usize {
993        self.shape.1
994    }
995
996    /// Get a view of rows [start..end] as ArrayView2.
997    pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
998        let nrows = end - start;
999        let byte_start = self.data_offset + start * self.shape.1;
1000        let byte_end = self.data_offset + end * self.shape.1;
1001        let bytes = &self._mmap[byte_start..byte_end];
1002
1003        ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
1004    }
1005
1006    /// Get a view of the entire array.
1007    pub fn view(&self) -> ArrayView2<'_, u8> {
1008        self.slice_rows(0, self.shape.0)
1009    }
1010
1011    /// Get a single row as a slice.
1012    pub fn row(&self, idx: usize) -> &[u8] {
1013        let byte_start = self.data_offset + idx * self.shape.1;
1014        let byte_end = byte_start + self.shape.1;
1015        &self._mmap[byte_start..byte_end]
1016    }
1017}
1018
1019// ============================================================================
1020// Merged File Creation
1021// ============================================================================
1022
1023/// Manifest entry for tracking chunk files
1024#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1025pub struct ChunkManifestEntry {
1026    pub rows: usize,
1027    pub mtime: f64,
1028}
1029
1030/// Manifest for merged files, including metadata about the merge
1031#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1032pub struct MergeManifest {
1033    /// Chunk information
1034    pub chunks: HashMap<String, ChunkManifestEntry>,
1035    /// Number of padding rows used in the merge
1036    #[serde(default)]
1037    pub padding_rows: usize,
1038    /// Total rows in the merged file (including padding)
1039    #[serde(default)]
1040    pub total_rows: usize,
1041    /// Number of columns (for 2D arrays like residuals)
1042    #[serde(default)]
1043    pub ncols: usize,
1044}
1045
1046/// Legacy manifest type (for backwards compatibility during migration)
1047pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
1048
1049/// Load manifest from disk if it exists
1050/// Handles both new MergeManifest format and legacy ChunkManifest format
1051fn load_merge_manifest(manifest_path: &Path) -> Option<MergeManifest> {
1052    if manifest_path.exists() {
1053        if let Ok(file) = File::open(manifest_path) {
1054            // Try to load as new format first
1055            let reader = BufReader::new(file);
1056            if let Ok(manifest) = serde_json::from_reader::<_, MergeManifest>(reader) {
1057                return Some(manifest);
1058            }
1059            // Try legacy format
1060            if let Ok(file) = File::open(manifest_path) {
1061                if let Ok(chunks) =
1062                    serde_json::from_reader::<_, ChunkManifest>(BufReader::new(file))
1063                {
1064                    // Convert legacy format - missing padding info means we need to regenerate
1065                    return Some(MergeManifest {
1066                        chunks,
1067                        padding_rows: 0,
1068                        total_rows: 0,
1069                        ncols: 0,
1070                    });
1071                }
1072            }
1073        }
1074    }
1075    None
1076}
1077
1078/// Save manifest to disk atomically (write to temp file, then rename)
1079fn save_merge_manifest(manifest_path: &Path, manifest: &MergeManifest) -> Result<()> {
1080    let temp_path = manifest_path.with_extension("manifest.json.tmp");
1081
1082    // Write to temp file
1083    let file = File::create(&temp_path)
1084        .map_err(|e| Error::IndexLoad(format!("Failed to create temp manifest: {}", e)))?;
1085    let mut writer = BufWriter::new(file);
1086    serde_json::to_writer(&mut writer, manifest)
1087        .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
1088    writer
1089        .flush()
1090        .map_err(|e| Error::IndexLoad(format!("Failed to flush manifest: {}", e)))?;
1091
1092    // Sync to disk
1093    writer
1094        .into_inner()
1095        .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?
1096        .sync_all()
1097        .map_err(|e| Error::IndexLoad(format!("Failed to sync manifest: {}", e)))?;
1098
1099    // Atomic rename
1100    fs::rename(&temp_path, manifest_path)
1101        .map_err(|e| Error::IndexLoad(format!("Failed to rename manifest: {}", e)))?;
1102
1103    Ok(())
1104}
1105
1106/// Get file modification time as f64 seconds since epoch
1107fn get_mtime(path: &Path) -> Result<f64> {
1108    let metadata = fs::metadata(path)
1109        .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
1110    let mtime = metadata
1111        .modified()
1112        .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
1113    let duration = mtime
1114        .duration_since(std::time::UNIX_EPOCH)
1115        .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
1116    Ok(duration.as_secs_f64())
1117}
1118
1119/// Write NPY header for a 1D array
1120fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
1121    // Build header dict
1122    let header_dict = format!(
1123        "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
1124        dtype, len
1125    );
1126
1127    // Pad to 64-byte alignment (NPY requirement)
1128    let header_len = header_dict.len();
1129    let padding = (64 - ((10 + header_len) % 64)) % 64;
1130    let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1131
1132    // Write magic + version
1133    writer
1134        .write_all(NPY_MAGIC)
1135        .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1136    writer
1137        .write_all(&[1, 0])
1138        .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?; // v1.0
1139
1140    // Write header length (2 bytes for v1.0)
1141    let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1142    writer
1143        .write_all(&header_len_bytes)
1144        .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1145
1146    // Write header
1147    writer
1148        .write_all(padded_header.as_bytes())
1149        .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1150
1151    Ok(10 + padded_header.len())
1152}
1153
1154/// Write NPY header for a 2D array
1155fn write_npy_header_2d(
1156    writer: &mut impl Write,
1157    nrows: usize,
1158    ncols: usize,
1159    dtype: &str,
1160) -> Result<usize> {
1161    // Build header dict
1162    let header_dict = format!(
1163        "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
1164        dtype, nrows, ncols
1165    );
1166
1167    // Pad to 64-byte alignment
1168    let header_len = header_dict.len();
1169    let padding = (64 - ((10 + header_len) % 64)) % 64;
1170    let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1171
1172    // Write magic + version
1173    writer
1174        .write_all(NPY_MAGIC)
1175        .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1176    writer
1177        .write_all(&[1, 0])
1178        .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
1179
1180    // Write header length
1181    let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1182    writer
1183        .write_all(&header_len_bytes)
1184        .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1185
1186    // Write header
1187    writer
1188        .write_all(padded_header.as_bytes())
1189        .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1190
1191    Ok(10 + padded_header.len())
1192}
1193
1194/// Information about a chunk file for merging
1195struct ChunkInfo {
1196    path: std::path::PathBuf,
1197    filename: String,
1198    rows: usize,
1199    mtime: f64,
1200}
1201
1202/// Merge chunked codes NPY files into a single merged file.
1203///
1204/// Uses incremental persistence with manifest tracking to skip unchanged chunks.
1205/// Uses atomic writes to prevent corruption from interrupted writes.
1206/// Uses file-based locking to coordinate concurrent processes.
1207/// Returns the path to the merged file.
1208pub fn merge_codes_chunks(
1209    index_path: &Path,
1210    num_chunks: usize,
1211    padding_rows: usize,
1212) -> Result<std::path::PathBuf> {
1213    use ndarray_npy::ReadNpyExt;
1214
1215    let merged_path = index_path.join("merged_codes.npy");
1216    let manifest_path = index_path.join("merged_codes.manifest.json");
1217    let temp_path = index_path.join("merged_codes.npy.tmp");
1218    let lock_path = index_path.join("merged_codes.lock");
1219
1220    // Acquire exclusive lock to prevent concurrent merge operations.
1221    // This is critical for multi-process scenarios (e.g., multiple API workers).
1222    let _lock = FileLockGuard::acquire(&lock_path)?;
1223
1224    // After acquiring the lock, re-check if merge is still needed.
1225    // Another process might have completed the merge while we were waiting.
1226
1227    // Load previous manifest (re-read after acquiring lock)
1228    let old_manifest = load_merge_manifest(&manifest_path);
1229
1230    // Scan chunks and detect changes
1231    let mut chunks: Vec<ChunkInfo> = Vec::new();
1232    let mut total_rows = 0usize;
1233    let mut chain_broken = false;
1234
1235    for i in 0..num_chunks {
1236        let filename = format!("{}.codes.npy", i);
1237        let path = index_path.join(&filename);
1238
1239        if path.exists() {
1240            let mtime = get_mtime(&path)?;
1241
1242            // Get shape by reading header only
1243            let file = File::open(&path)?;
1244            let arr: Array1<i64> = Array1::read_npy(file)?;
1245            let rows = arr.len();
1246
1247            if rows > 0 {
1248                total_rows += rows;
1249
1250                // Check if this chunk changed
1251                let is_clean = if let Some(ref manifest) = old_manifest {
1252                    manifest
1253                        .chunks
1254                        .get(&filename)
1255                        .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1256                } else {
1257                    false
1258                };
1259
1260                if !is_clean {
1261                    chain_broken = true;
1262                }
1263
1264                chunks.push(ChunkInfo {
1265                    path,
1266                    filename,
1267                    rows,
1268                    mtime,
1269                });
1270            }
1271        }
1272    }
1273
1274    if total_rows == 0 {
1275        return Err(Error::IndexLoad("No data to merge".into()));
1276    }
1277
1278    let final_rows = total_rows + padding_rows;
1279
1280    // Check if we need to rewrite:
1281    // 1. Merged file doesn't exist
1282    // 2. Chunks have changed
1283    // 3. Padding has changed (stored in manifest)
1284    // 4. Total rows don't match (safety check)
1285    let padding_changed = old_manifest
1286        .as_ref()
1287        .map(|m| m.padding_rows != padding_rows)
1288        .unwrap_or(true);
1289    let total_rows_mismatch = old_manifest
1290        .as_ref()
1291        .map(|m| m.total_rows != final_rows)
1292        .unwrap_or(true);
1293
1294    let needs_full_rewrite =
1295        !merged_path.exists() || chain_broken || padding_changed || total_rows_mismatch;
1296
1297    if needs_full_rewrite {
1298        // Write to temp file first (atomic write pattern)
1299        let file = File::create(&temp_path)
1300            .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1301        let mut writer = BufWriter::new(file);
1302
1303        // Write header
1304        let header_size = write_npy_header_1d(&mut writer, final_rows, "<i8")?;
1305
1306        // Write chunk data
1307        let mut written_rows = 0usize;
1308        for chunk in &chunks {
1309            let file = File::open(&chunk.path)?;
1310            let arr: Array1<i64> = Array1::read_npy(file)?;
1311            for &val in arr.iter() {
1312                writer.write_all(&val.to_le_bytes())?;
1313            }
1314            written_rows += arr.len();
1315        }
1316
1317        // Write padding zeros
1318        for _ in 0..padding_rows {
1319            writer.write_all(&0i64.to_le_bytes())?;
1320        }
1321        written_rows += padding_rows;
1322
1323        // Flush and sync to disk
1324        writer
1325            .flush()
1326            .map_err(|e| Error::IndexLoad(format!("Failed to flush merged file: {}", e)))?;
1327        let file = writer
1328            .into_inner()
1329            .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1330        file.sync_all()
1331            .map_err(|e| Error::IndexLoad(format!("Failed to sync merged file to disk: {}", e)))?;
1332
1333        // Verify file size before renaming
1334        let expected_size = header_size + written_rows * 8;
1335        let actual_size = fs::metadata(&temp_path)
1336            .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1337            .len() as usize;
1338
1339        if actual_size != expected_size {
1340            // Clean up temp file and return error
1341            let _ = fs::remove_file(&temp_path);
1342            return Err(Error::IndexLoad(format!(
1343                "Merged codes file size mismatch: expected {} bytes, got {} bytes",
1344                expected_size, actual_size
1345            )));
1346        }
1347
1348        // Atomic rename (overwrites existing file)
1349        fs::rename(&temp_path, &merged_path)
1350            .map_err(|e| Error::IndexLoad(format!("Failed to rename merged file: {}", e)))?;
1351    } else {
1352        // Validate existing merged file before using it
1353        if merged_path.exists() {
1354            let file_size = fs::metadata(&merged_path)
1355                .map_err(|e| {
1356                    Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1357                })?
1358                .len() as usize;
1359
1360            // NPY header is at least 64 bytes, data is final_rows * 8 bytes
1361            let min_expected_size = 64 + final_rows * 8;
1362            if file_size < min_expected_size {
1363                // File is corrupted, force regeneration by recursing with empty manifest
1364                let _ = fs::remove_file(&merged_path);
1365                let _ = fs::remove_file(&manifest_path);
1366                // Lock is held, so we can safely drop it before recursing
1367                drop(_lock);
1368                return merge_codes_chunks(index_path, num_chunks, padding_rows);
1369            }
1370        }
1371    }
1372
1373    // Build and save manifest with full metadata
1374    let mut chunk_map = HashMap::new();
1375    for chunk in &chunks {
1376        chunk_map.insert(
1377            chunk.filename.clone(),
1378            ChunkManifestEntry {
1379                rows: chunk.rows,
1380                mtime: chunk.mtime,
1381            },
1382        );
1383    }
1384    let new_manifest = MergeManifest {
1385        chunks: chunk_map,
1386        padding_rows,
1387        total_rows: final_rows,
1388        ncols: 0, // Not used for 1D codes array
1389    };
1390    save_merge_manifest(&manifest_path, &new_manifest)?;
1391
1392    Ok(merged_path)
1393}
1394
1395/// Merge chunked residuals NPY files into a single merged file.
1396///
1397/// Uses atomic writes to prevent corruption from interrupted writes.
1398/// Uses file-based locking to coordinate concurrent processes.
1399pub fn merge_residuals_chunks(
1400    index_path: &Path,
1401    num_chunks: usize,
1402    padding_rows: usize,
1403) -> Result<std::path::PathBuf> {
1404    use ndarray_npy::ReadNpyExt;
1405
1406    let merged_path = index_path.join("merged_residuals.npy");
1407    let manifest_path = index_path.join("merged_residuals.manifest.json");
1408    let temp_path = index_path.join("merged_residuals.npy.tmp");
1409    let lock_path = index_path.join("merged_residuals.lock");
1410
1411    // Acquire exclusive lock to prevent concurrent merge operations.
1412    // This is critical for multi-process scenarios (e.g., multiple API workers).
1413    let _lock = FileLockGuard::acquire(&lock_path)?;
1414
1415    // After acquiring the lock, re-check if merge is still needed.
1416    // Another process might have completed the merge while we were waiting.
1417
1418    // Load previous manifest (re-read after acquiring lock)
1419    let old_manifest = load_merge_manifest(&manifest_path);
1420
1421    // Scan chunks and detect changes
1422    let mut chunks: Vec<ChunkInfo> = Vec::new();
1423    let mut total_rows = 0usize;
1424    let mut ncols = 0usize;
1425    let mut chain_broken = false;
1426
1427    for i in 0..num_chunks {
1428        let filename = format!("{}.residuals.npy", i);
1429        let path = index_path.join(&filename);
1430
1431        if path.exists() {
1432            let mtime = get_mtime(&path)?;
1433
1434            // Get shape by reading header
1435            let file = File::open(&path)?;
1436            let arr: Array2<u8> = Array2::read_npy(file)?;
1437            let rows = arr.nrows();
1438            ncols = arr.ncols();
1439
1440            if rows > 0 {
1441                total_rows += rows;
1442
1443                let is_clean = if let Some(ref manifest) = old_manifest {
1444                    manifest
1445                        .chunks
1446                        .get(&filename)
1447                        .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1448                } else {
1449                    false
1450                };
1451
1452                if !is_clean {
1453                    chain_broken = true;
1454                }
1455
1456                chunks.push(ChunkInfo {
1457                    path,
1458                    filename,
1459                    rows,
1460                    mtime,
1461                });
1462            }
1463        }
1464    }
1465
1466    if total_rows == 0 || ncols == 0 {
1467        return Err(Error::IndexLoad("No residual data to merge".into()));
1468    }
1469
1470    let final_rows = total_rows + padding_rows;
1471
1472    // Check if we need to rewrite:
1473    // 1. Merged file doesn't exist
1474    // 2. Chunks have changed
1475    // 3. Padding has changed
1476    // 4. Total rows or ncols don't match
1477    let padding_changed = old_manifest
1478        .as_ref()
1479        .map(|m| m.padding_rows != padding_rows)
1480        .unwrap_or(true);
1481    let total_rows_mismatch = old_manifest
1482        .as_ref()
1483        .map(|m| m.total_rows != final_rows)
1484        .unwrap_or(true);
1485    let ncols_mismatch = old_manifest
1486        .as_ref()
1487        .map(|m| m.ncols != ncols && m.ncols != 0)
1488        .unwrap_or(false);
1489
1490    let needs_full_rewrite = !merged_path.exists()
1491        || chain_broken
1492        || padding_changed
1493        || total_rows_mismatch
1494        || ncols_mismatch;
1495
1496    if needs_full_rewrite {
1497        // Write to temp file first (atomic write pattern)
1498        let file = File::create(&temp_path)
1499            .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1500        let mut writer = BufWriter::new(file);
1501
1502        // Write header
1503        let header_size = write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1504
1505        // Write chunk data
1506        let mut written_rows = 0usize;
1507        for chunk in &chunks {
1508            let file = File::open(&chunk.path)?;
1509            let arr: Array2<u8> = Array2::read_npy(file)?;
1510            for row in arr.rows() {
1511                writer.write_all(row.as_slice().unwrap())?;
1512            }
1513            written_rows += arr.nrows();
1514        }
1515
1516        // Write padding zeros
1517        let zero_row = vec![0u8; ncols];
1518        for _ in 0..padding_rows {
1519            writer.write_all(&zero_row)?;
1520        }
1521        written_rows += padding_rows;
1522
1523        // Flush and sync to disk
1524        writer
1525            .flush()
1526            .map_err(|e| Error::IndexLoad(format!("Failed to flush merged residuals: {}", e)))?;
1527        let file = writer
1528            .into_inner()
1529            .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1530        file.sync_all().map_err(|e| {
1531            Error::IndexLoad(format!("Failed to sync merged residuals to disk: {}", e))
1532        })?;
1533
1534        // Verify file size before renaming
1535        let expected_size = header_size + written_rows * ncols;
1536        let actual_size = fs::metadata(&temp_path)
1537            .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1538            .len() as usize;
1539
1540        if actual_size != expected_size {
1541            // Clean up temp file and return error
1542            let _ = fs::remove_file(&temp_path);
1543            return Err(Error::IndexLoad(format!(
1544                "Merged residuals file size mismatch: expected {} bytes, got {} bytes",
1545                expected_size, actual_size
1546            )));
1547        }
1548
1549        // Atomic rename
1550        fs::rename(&temp_path, &merged_path)
1551            .map_err(|e| Error::IndexLoad(format!("Failed to rename merged residuals: {}", e)))?;
1552    } else {
1553        // Validate existing merged file before using it
1554        if merged_path.exists() {
1555            let file_size = fs::metadata(&merged_path)
1556                .map_err(|e| {
1557                    Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1558                })?
1559                .len() as usize;
1560
1561            // NPY header is at least 64 bytes, data is final_rows * ncols bytes
1562            let min_expected_size = 64 + final_rows * ncols;
1563            if file_size < min_expected_size {
1564                // File is corrupted, force regeneration
1565                let _ = fs::remove_file(&merged_path);
1566                let _ = fs::remove_file(&manifest_path);
1567                // Lock is held, so we can safely drop it before recursing
1568                drop(_lock);
1569                return merge_residuals_chunks(index_path, num_chunks, padding_rows);
1570            }
1571        }
1572    }
1573
1574    // Build and save manifest with full metadata
1575    let mut chunk_map = HashMap::new();
1576    for chunk in &chunks {
1577        chunk_map.insert(
1578            chunk.filename.clone(),
1579            ChunkManifestEntry {
1580                rows: chunk.rows,
1581                mtime: chunk.mtime,
1582            },
1583        );
1584    }
1585    let new_manifest = MergeManifest {
1586        chunks: chunk_map,
1587        padding_rows,
1588        total_rows: final_rows,
1589        ncols,
1590    };
1591    save_merge_manifest(&manifest_path, &new_manifest)?;
1592
1593    Ok(merged_path)
1594}
1595
1596/// Clear merged files and manifests to force regeneration on next load.
1597///
1598/// This should be called after index updates to ensure the merged files
1599/// are regenerated with the latest data. The function silently ignores
1600/// missing files.
1601///
1602/// Acquires file locks to prevent racing with concurrent merge operations
1603/// in multi-process deployments.
1604pub fn clear_merged_files(index_path: &Path) -> Result<()> {
1605    // Acquire locks to prevent racing with ongoing merge operations.
1606    // This is important in multi-process scenarios where one process might
1607    // be loading (merging) while another is updating (clearing).
1608    let codes_lock_path = index_path.join("merged_codes.lock");
1609    let residuals_lock_path = index_path.join("merged_residuals.lock");
1610    let _codes_lock = FileLockGuard::acquire(&codes_lock_path)?;
1611    let _residuals_lock = FileLockGuard::acquire(&residuals_lock_path)?;
1612
1613    let files_to_remove = [
1614        "merged_codes.npy",
1615        "merged_codes.npy.tmp",
1616        "merged_codes.manifest.json",
1617        "merged_codes.manifest.json.tmp",
1618        "merged_residuals.npy",
1619        "merged_residuals.npy.tmp",
1620        "merged_residuals.manifest.json",
1621        "merged_residuals.manifest.json.tmp",
1622    ];
1623
1624    for filename in files_to_remove {
1625        let path = index_path.join(filename);
1626        if path.exists() {
1627            fs::remove_file(&path)
1628                .map_err(|e| Error::IndexLoad(format!("Failed to remove {}: {}", filename, e)))?;
1629        }
1630    }
1631
1632    Ok(())
1633}
1634
1635// ============================================================================
1636// Fast-PLAID Compatibility Conversion
1637// ============================================================================
1638
1639/// Convert a fast-plaid index to next-plaid compatible format.
1640///
1641/// This function detects and converts:
1642/// - float16 → float32 for centroids, avg_residual, bucket_cutoffs, bucket_weights
1643/// - int64 → int32 for ivf_lengths
1644/// - `<u1` → `|u1` for residuals
1645///
1646/// Returns true if any conversion was performed, false if already compatible.
1647pub fn convert_fastplaid_to_nextplaid(index_path: &Path) -> Result<bool> {
1648    let mut converted = false;
1649
1650    // Float files to convert from f16 to f32
1651    let float_files = [
1652        "centroids.npy",
1653        "avg_residual.npy",
1654        "bucket_cutoffs.npy",
1655        "bucket_weights.npy",
1656    ];
1657
1658    for filename in float_files {
1659        let path = index_path.join(filename);
1660        if path.exists() {
1661            let dtype = detect_npy_dtype(&path)?;
1662            if dtype == "<f2" {
1663                eprintln!("  Converting {} from float16 to float32", filename);
1664                convert_f16_to_f32_npy(&path)?;
1665                converted = true;
1666            }
1667        }
1668    }
1669
1670    // Convert ivf_lengths from i64 to i32
1671    let ivf_lengths_path = index_path.join("ivf_lengths.npy");
1672    if ivf_lengths_path.exists() {
1673        let dtype = detect_npy_dtype(&ivf_lengths_path)?;
1674        if dtype == "<i8" {
1675            eprintln!("  Converting ivf_lengths.npy from int64 to int32");
1676            convert_i64_to_i32_npy(&ivf_lengths_path)?;
1677            converted = true;
1678        }
1679    }
1680
1681    // Normalize residual files to use "|u1" descriptor
1682    // fast-plaid uses "<u1" which ndarray_npy doesn't accept
1683    for entry in fs::read_dir(index_path)? {
1684        let entry = entry?;
1685        let filename = entry.file_name().to_string_lossy().to_string();
1686        if filename.ends_with(".residuals.npy") {
1687            let path = entry.path();
1688            let dtype = detect_npy_dtype(&path)?;
1689            if dtype == "<u1" {
1690                eprintln!(
1691                    "  Normalizing {} dtype descriptor from <u1 to |u1",
1692                    filename
1693                );
1694                normalize_u8_npy(&path)?;
1695                converted = true;
1696            }
1697        }
1698    }
1699
1700    Ok(converted)
1701}
1702
1703#[cfg(test)]
1704mod tests {
1705    use super::*;
1706    use std::io::Write;
1707    use tempfile::NamedTempFile;
1708
1709    #[test]
1710    fn test_mmap_array2_f32() {
1711        // Create a test file
1712        let mut file = NamedTempFile::new().unwrap();
1713
1714        // Write header (3 rows, 2 cols)
1715        file.write_all(&3i64.to_le_bytes()).unwrap();
1716        file.write_all(&2i64.to_le_bytes()).unwrap();
1717
1718        // Write data
1719        for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1720            file.write_all(&val.to_le_bytes()).unwrap();
1721        }
1722
1723        file.flush().unwrap();
1724
1725        // Load and verify
1726        let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1727        assert_eq!(mmap.shape(), (3, 2));
1728
1729        let row0 = mmap.row(0);
1730        assert_eq!(row0[0], 1.0);
1731        assert_eq!(row0[1], 2.0);
1732
1733        let owned = mmap.to_owned();
1734        assert_eq!(owned[[2, 0]], 5.0);
1735        assert_eq!(owned[[2, 1]], 6.0);
1736    }
1737
1738    #[test]
1739    fn test_mmap_array1_i64() {
1740        let mut file = NamedTempFile::new().unwrap();
1741
1742        // Write header (4 elements)
1743        file.write_all(&4i64.to_le_bytes()).unwrap();
1744
1745        // Write data
1746        for val in [10i64, 20, 30, 40] {
1747            file.write_all(&val.to_le_bytes()).unwrap();
1748        }
1749
1750        file.flush().unwrap();
1751
1752        let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1753        assert_eq!(mmap.len(), 4);
1754        assert_eq!(mmap.get(0), 10);
1755        assert_eq!(mmap.get(3), 40);
1756
1757        let owned = mmap.to_owned();
1758        assert_eq!(owned[1], 20);
1759        assert_eq!(owned[2], 30);
1760    }
1761
1762    #[test]
1763    fn test_write_read_roundtrip() {
1764        let file = NamedTempFile::new().unwrap();
1765        let path = file.path();
1766
1767        // Create test array
1768        let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1769
1770        // Write
1771        write_array2_f32(&array, path).unwrap();
1772
1773        // Read back
1774        let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1775        let loaded = mmap.to_owned();
1776
1777        assert_eq!(array, loaded);
1778    }
1779}