Skip to main content

next_plaid/
mmap.rs

1//! Memory-mapped file support for efficient large index loading.
2//!
3//! This module provides utilities for loading large arrays from disk using
4//! memory-mapped files, avoiding the need to load entire arrays into RAM.
5//!
6//! Two formats are supported:
7//! - Custom raw binary format (legacy): 8-byte header with shape, then raw data
8//! - NPY format: Standard NumPy format with header, used for index files
9
10use std::collections::HashMap;
11use std::fs;
12use std::fs::File;
13use std::io::{BufReader, BufWriter, Write};
14use std::path::Path;
15
16use byteorder::{LittleEndian, ReadBytesExt};
17use fs2::FileExt;
18use memmap2::{Mmap, MmapMut};
19use ndarray::{Array1, Array2, ArrayView1, ArrayView2};
20
21use crate::error::{Error, Result};
22
23/// RAII guard for file-based locking to coordinate concurrent processes.
24/// The lock is released when this guard is dropped.
25struct FileLockGuard {
26    _file: File,
27}
28
29impl FileLockGuard {
30    /// Acquire an exclusive lock on the given lock file path.
31    /// Creates the lock file if it doesn't exist.
32    /// Blocks until the lock is acquired.
33    fn acquire(lock_path: &Path) -> Result<Self> {
34        let file = std::fs::OpenOptions::new()
35            .read(true)
36            .write(true)
37            .create(true)
38            .truncate(false)
39            .open(lock_path)
40            .map_err(|e| {
41                Error::IndexLoad(format!("Failed to open lock file {:?}: {}", lock_path, e))
42            })?;
43
44        file.lock_exclusive().map_err(|e| {
45            Error::IndexLoad(format!("Failed to acquire lock on {:?}: {}", lock_path, e))
46        })?;
47
48        Ok(Self { _file: file })
49    }
50}
51
52impl Drop for FileLockGuard {
53    fn drop(&mut self) {
54        // Lock is automatically released when file is closed
55        let _ = self._file.unlock();
56    }
57}
58
59/// A memory-mapped array of f32 values.
60///
61/// This struct provides zero-copy access to large arrays stored on disk.
62pub struct MmapArray2F32 {
63    _mmap: Mmap,
64    shape: (usize, usize),
65    data_offset: usize,
66}
67
68impl MmapArray2F32 {
69    /// Load a 2D f32 array from a raw binary file.
70    ///
71    /// The file format is:
72    /// - 8 bytes: nrows (i64 little-endian)
73    /// - 8 bytes: ncols (i64 little-endian)
74    /// - nrows * ncols * 4 bytes: f32 data (little-endian)
75    pub fn from_raw_file(path: &Path) -> Result<Self> {
76        let file = File::open(path)
77            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
78
79        let mmap = unsafe {
80            Mmap::map(&file)
81                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
82        };
83
84        if mmap.len() < 16 {
85            return Err(Error::IndexLoad("File too small for header".into()));
86        }
87
88        // Read shape from header
89        let mut cursor = std::io::Cursor::new(&mmap[..16]);
90        let nrows = cursor
91            .read_i64::<LittleEndian>()
92            .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
93            as usize;
94        let ncols = cursor
95            .read_i64::<LittleEndian>()
96            .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
97            as usize;
98
99        let expected_size = 16 + nrows * ncols * 4;
100        if mmap.len() < expected_size {
101            return Err(Error::IndexLoad(format!(
102                "File size {} too small for shape ({}, {})",
103                mmap.len(),
104                nrows,
105                ncols
106            )));
107        }
108
109        Ok(Self {
110            _mmap: mmap,
111            shape: (nrows, ncols),
112            data_offset: 16,
113        })
114    }
115
116    /// Get the shape of the array.
117    pub fn shape(&self) -> (usize, usize) {
118        self.shape
119    }
120
121    /// Get the number of rows.
122    pub fn nrows(&self) -> usize {
123        self.shape.0
124    }
125
126    /// Get the number of columns.
127    pub fn ncols(&self) -> usize {
128        self.shape.1
129    }
130
131    /// Get a view of a row.
132    pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
133        let start = self.data_offset + idx * self.shape.1 * 4;
134        let bytes = &self._mmap[start..start + self.shape.1 * 4];
135
136        // Safety: We've verified the bounds and alignment
137        let data =
138            unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
139
140        ArrayView1::from_shape(self.shape.1, data).unwrap()
141    }
142
143    /// Load a range of rows into an owned Array2.
144    pub fn load_rows(&self, start: usize, end: usize) -> Array2<f32> {
145        let nrows = end - start;
146        let byte_start = self.data_offset + start * self.shape.1 * 4;
147        let byte_end = self.data_offset + end * self.shape.1 * 4;
148        let bytes = &self._mmap[byte_start..byte_end];
149
150        // Safety: We've verified the bounds
151        let data = unsafe {
152            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
153        };
154
155        Array2::from_shape_vec((nrows, self.shape.1), data.to_vec()).unwrap()
156    }
157
158    /// Convert to an owned Array2 (loads all data into memory).
159    pub fn to_owned(&self) -> Array2<f32> {
160        self.load_rows(0, self.shape.0)
161    }
162}
163
164/// A memory-mapped array of u8 values.
165pub struct MmapArray2U8 {
166    _mmap: Mmap,
167    shape: (usize, usize),
168    data_offset: usize,
169}
170
171impl MmapArray2U8 {
172    /// Load a 2D u8 array from a raw binary file.
173    pub fn from_raw_file(path: &Path) -> Result<Self> {
174        let file = File::open(path)
175            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
176
177        let mmap = unsafe {
178            Mmap::map(&file)
179                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
180        };
181
182        if mmap.len() < 16 {
183            return Err(Error::IndexLoad("File too small for header".into()));
184        }
185
186        let mut cursor = std::io::Cursor::new(&mmap[..16]);
187        let nrows = cursor
188            .read_i64::<LittleEndian>()
189            .map_err(|e| Error::IndexLoad(format!("Failed to read nrows: {}", e)))?
190            as usize;
191        let ncols = cursor
192            .read_i64::<LittleEndian>()
193            .map_err(|e| Error::IndexLoad(format!("Failed to read ncols: {}", e)))?
194            as usize;
195
196        let expected_size = 16 + nrows * ncols;
197        if mmap.len() < expected_size {
198            return Err(Error::IndexLoad(format!(
199                "File size {} too small for shape ({}, {})",
200                mmap.len(),
201                nrows,
202                ncols
203            )));
204        }
205
206        Ok(Self {
207            _mmap: mmap,
208            shape: (nrows, ncols),
209            data_offset: 16,
210        })
211    }
212
213    /// Get the shape of the array.
214    pub fn shape(&self) -> (usize, usize) {
215        self.shape
216    }
217
218    /// Get a view of the data as ArrayView2.
219    pub fn view(&self) -> ArrayView2<'_, u8> {
220        let bytes = &self._mmap[self.data_offset..self.data_offset + self.shape.0 * self.shape.1];
221        ArrayView2::from_shape(self.shape, bytes).unwrap()
222    }
223
224    /// Load a range of rows into an owned Array2.
225    pub fn load_rows(&self, start: usize, end: usize) -> Array2<u8> {
226        let nrows = end - start;
227        let byte_start = self.data_offset + start * self.shape.1;
228        let byte_end = self.data_offset + end * self.shape.1;
229        let bytes = &self._mmap[byte_start..byte_end];
230
231        Array2::from_shape_vec((nrows, self.shape.1), bytes.to_vec()).unwrap()
232    }
233
234    /// Convert to an owned Array2.
235    pub fn to_owned(&self) -> Array2<u8> {
236        self.load_rows(0, self.shape.0)
237    }
238}
239
240/// A memory-mapped array of i64 values.
241pub struct MmapArray1I64 {
242    _mmap: Mmap,
243    len: usize,
244    data_offset: usize,
245}
246
247impl MmapArray1I64 {
248    /// Load a 1D i64 array from a raw binary file.
249    pub fn from_raw_file(path: &Path) -> Result<Self> {
250        let file = File::open(path)
251            .map_err(|e| Error::IndexLoad(format!("Failed to open file {:?}: {}", path, e)))?;
252
253        let mmap = unsafe {
254            Mmap::map(&file)
255                .map_err(|e| Error::IndexLoad(format!("Failed to mmap file {:?}: {}", path, e)))?
256        };
257
258        if mmap.len() < 8 {
259            return Err(Error::IndexLoad("File too small for header".into()));
260        }
261
262        let mut cursor = std::io::Cursor::new(&mmap[..8]);
263        let len = cursor
264            .read_i64::<LittleEndian>()
265            .map_err(|e| Error::IndexLoad(format!("Failed to read length: {}", e)))?
266            as usize;
267
268        let expected_size = 8 + len * 8;
269        if mmap.len() < expected_size {
270            return Err(Error::IndexLoad(format!(
271                "File size {} too small for length {}",
272                mmap.len(),
273                len
274            )));
275        }
276
277        Ok(Self {
278            _mmap: mmap,
279            len,
280            data_offset: 8,
281        })
282    }
283
284    /// Get the length of the array.
285    pub fn len(&self) -> usize {
286        self.len
287    }
288
289    /// Returns true if the array is empty.
290    pub fn is_empty(&self) -> bool {
291        self.len == 0
292    }
293
294    /// Get a value at an index.
295    pub fn get(&self, idx: usize) -> i64 {
296        let start = self.data_offset + idx * 8;
297        let bytes = &self._mmap[start..start + 8];
298        i64::from_le_bytes(bytes.try_into().unwrap())
299    }
300
301    /// Convert to an owned Array1.
302    pub fn to_owned(&self) -> Array1<i64> {
303        let bytes = &self._mmap[self.data_offset..self.data_offset + self.len * 8];
304
305        // Safety: We've verified the bounds
306        let data = unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const i64, self.len) };
307
308        Array1::from_vec(data.to_vec())
309    }
310}
311
312/// Write an `Array2<f32>` to a raw binary file format.
313pub fn write_array2_f32(array: &Array2<f32>, path: &Path) -> Result<()> {
314    use std::io::Write;
315
316    let file = File::create(path)
317        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
318    let mut writer = std::io::BufWriter::new(file);
319
320    let nrows = array.nrows() as i64;
321    let ncols = array.ncols() as i64;
322
323    writer
324        .write_all(&nrows.to_le_bytes())
325        .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
326    writer
327        .write_all(&ncols.to_le_bytes())
328        .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
329
330    for val in array.iter() {
331        writer
332            .write_all(&val.to_le_bytes())
333            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
334    }
335
336    writer
337        .flush()
338        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
339
340    Ok(())
341}
342
343/// Write an `Array2<u8>` to a raw binary file format.
344pub fn write_array2_u8(array: &Array2<u8>, path: &Path) -> Result<()> {
345    use std::io::Write;
346
347    let file = File::create(path)
348        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
349    let mut writer = std::io::BufWriter::new(file);
350
351    let nrows = array.nrows() as i64;
352    let ncols = array.ncols() as i64;
353
354    writer
355        .write_all(&nrows.to_le_bytes())
356        .map_err(|e| Error::IndexLoad(format!("Failed to write nrows: {}", e)))?;
357    writer
358        .write_all(&ncols.to_le_bytes())
359        .map_err(|e| Error::IndexLoad(format!("Failed to write ncols: {}", e)))?;
360
361    for row in array.rows() {
362        writer
363            .write_all(row.as_slice().unwrap())
364            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
365    }
366
367    writer
368        .flush()
369        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
370
371    Ok(())
372}
373
374/// Write an `Array1<i64>` to a raw binary file format.
375pub fn write_array1_i64(array: &Array1<i64>, path: &Path) -> Result<()> {
376    use std::io::Write;
377
378    let file = File::create(path)
379        .map_err(|e| Error::IndexLoad(format!("Failed to create file {:?}: {}", path, e)))?;
380    let mut writer = std::io::BufWriter::new(file);
381
382    let len = array.len() as i64;
383
384    writer
385        .write_all(&len.to_le_bytes())
386        .map_err(|e| Error::IndexLoad(format!("Failed to write length: {}", e)))?;
387
388    for val in array.iter() {
389        writer
390            .write_all(&val.to_le_bytes())
391            .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
392    }
393
394    writer
395        .flush()
396        .map_err(|e| Error::IndexLoad(format!("Failed to flush: {}", e)))?;
397
398    Ok(())
399}
400
401// ============================================================================
402// NPY Format Memory-Mapped Arrays
403// ============================================================================
404
405/// NPY file magic bytes
406const NPY_MAGIC: &[u8] = b"\x93NUMPY";
407
408/// Parse dtype from NPY header string (e.g., "<f2" for float16, "<f4" for float32)
409fn parse_dtype_from_header(header: &str) -> Result<String> {
410    // Find 'descr': '...'
411    let descr_start = header
412        .find("'descr':")
413        .ok_or_else(|| Error::IndexLoad("No descr in NPY header".into()))?;
414
415    let after_descr = &header[descr_start + 8..];
416    let quote_start = after_descr
417        .find('\'')
418        .ok_or_else(|| Error::IndexLoad("No dtype quote in NPY header".into()))?;
419    let rest = &after_descr[quote_start + 1..];
420    let quote_end = rest
421        .find('\'')
422        .ok_or_else(|| Error::IndexLoad("Unclosed dtype quote in NPY header".into()))?;
423
424    Ok(rest[..quote_end].to_string())
425}
426
427/// Detect NPY file dtype without loading the entire file
428pub fn detect_npy_dtype(path: &Path) -> Result<String> {
429    let file = File::open(path)
430        .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
431
432    let mmap = unsafe {
433        Mmap::map(&file)
434            .map_err(|e| Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e)))?
435    };
436
437    if mmap.len() < 10 {
438        return Err(Error::IndexLoad("NPY file too small".into()));
439    }
440
441    // Check magic
442    if &mmap[..6] != NPY_MAGIC {
443        return Err(Error::IndexLoad("Invalid NPY magic".into()));
444    }
445
446    let major_version = mmap[6];
447
448    // Read header length
449    let header_len = if major_version == 1 {
450        u16::from_le_bytes([mmap[8], mmap[9]]) as usize
451    } else if major_version == 2 {
452        if mmap.len() < 12 {
453            return Err(Error::IndexLoad("NPY v2 file too small".into()));
454        }
455        u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
456    } else {
457        return Err(Error::IndexLoad(format!(
458            "Unsupported NPY version: {}",
459            major_version
460        )));
461    };
462
463    let header_start = if major_version == 1 { 10 } else { 12 };
464    let header_end = header_start + header_len;
465
466    if mmap.len() < header_end {
467        return Err(Error::IndexLoad("NPY header exceeds file size".into()));
468    }
469
470    let header_str = std::str::from_utf8(&mmap[header_start..header_end])
471        .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
472
473    parse_dtype_from_header(header_str)
474}
475
476/// Convert a float16 NPY file to float32 in place
477pub fn convert_f16_to_f32_npy(path: &Path) -> Result<()> {
478    use half::f16;
479    use std::io::Read;
480
481    // Read the entire file
482    let mut file = File::open(path)
483        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
484    let mut data = Vec::new();
485    file.read_to_end(&mut data)
486        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
487
488    if data.len() < 10 || &data[..6] != NPY_MAGIC {
489        return Err(Error::IndexLoad("Invalid NPY file".into()));
490    }
491
492    let major_version = data[6];
493    let header_start = if major_version == 1 { 10 } else { 12 };
494    let header_len = if major_version == 1 {
495        u16::from_le_bytes([data[8], data[9]]) as usize
496    } else {
497        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
498    };
499    let header_end = header_start + header_len;
500
501    // Parse header to get shape
502    let header_str = std::str::from_utf8(&data[header_start..header_end])
503        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
504    let shape = parse_shape_from_header(header_str)?;
505
506    // Calculate total elements
507    let total_elements: usize = shape.iter().product();
508    let f16_data = &data[header_end..header_end + total_elements * 2];
509
510    // Convert f16 to f32
511    let mut f32_data = Vec::with_capacity(total_elements * 4);
512    for chunk in f16_data.chunks(2) {
513        let f16_val = f16::from_le_bytes([chunk[0], chunk[1]]);
514        let f32_val: f32 = f16_val.to_f32();
515        f32_data.extend_from_slice(&f32_val.to_le_bytes());
516    }
517
518    // Write new file with f32 dtype
519    let file = File::create(path)
520        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
521    let mut writer = BufWriter::new(file);
522
523    if shape.len() == 1 {
524        write_npy_header_1d(&mut writer, shape[0], "<f4")?;
525    } else if shape.len() == 2 {
526        write_npy_header_2d(&mut writer, shape[0], shape[1], "<f4")?;
527    } else {
528        return Err(Error::IndexLoad("Unsupported shape dimensions".into()));
529    }
530
531    writer
532        .write_all(&f32_data)
533        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
534    writer.flush()?;
535
536    Ok(())
537}
538
539/// Convert an int64 NPY file to int32 in place
540pub fn convert_i64_to_i32_npy(path: &Path) -> Result<()> {
541    use std::io::Read;
542
543    // Read the entire file
544    let mut file = File::open(path)
545        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
546    let mut data = Vec::new();
547    file.read_to_end(&mut data)
548        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
549
550    if data.len() < 10 || &data[..6] != NPY_MAGIC {
551        return Err(Error::IndexLoad("Invalid NPY file".into()));
552    }
553
554    let major_version = data[6];
555    let header_start = if major_version == 1 { 10 } else { 12 };
556    let header_len = if major_version == 1 {
557        u16::from_le_bytes([data[8], data[9]]) as usize
558    } else {
559        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
560    };
561    let header_end = header_start + header_len;
562
563    // Parse header to get shape
564    let header_str = std::str::from_utf8(&data[header_start..header_end])
565        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
566    let shape = parse_shape_from_header(header_str)?;
567
568    if shape.len() != 1 {
569        return Err(Error::IndexLoad("Expected 1D array for i64->i32".into()));
570    }
571
572    let len = shape[0];
573    let i64_data = &data[header_end..header_end + len * 8];
574
575    // Convert i64 to i32
576    let mut i32_data = Vec::with_capacity(len * 4);
577    for chunk in i64_data.chunks(8) {
578        let i64_val = i64::from_le_bytes(chunk.try_into().unwrap());
579        let i32_val = i64_val as i32;
580        i32_data.extend_from_slice(&i32_val.to_le_bytes());
581    }
582
583    // Write new file with i32 dtype
584    let file = File::create(path)
585        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
586    let mut writer = BufWriter::new(file);
587
588    write_npy_header_1d(&mut writer, len, "<i4")?;
589
590    writer
591        .write_all(&i32_data)
592        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
593    writer.flush()?;
594
595    Ok(())
596}
597
598/// Re-save a u8 NPY file to ensure dtype descriptor is "|u1" (platform-independent)
599///
600/// Note: We can't use ndarray_npy::ReadNpyExt here because it doesn't accept "<u1"
601/// descriptor, so we manually read the raw data and resave with "|u1".
602pub fn normalize_u8_npy(path: &Path) -> Result<()> {
603    use std::io::Read;
604
605    // Read the entire file
606    let mut file = File::open(path)
607        .map_err(|e| Error::IndexLoad(format!("Failed to open {:?}: {}", path, e)))?;
608    let mut data = Vec::new();
609    file.read_to_end(&mut data)
610        .map_err(|e| Error::IndexLoad(format!("Failed to read {:?}: {}", path, e)))?;
611
612    if data.len() < 10 || &data[..6] != NPY_MAGIC {
613        return Err(Error::IndexLoad("Invalid NPY file".into()));
614    }
615
616    let major_version = data[6];
617    let header_start = if major_version == 1 { 10 } else { 12 };
618    let header_len = if major_version == 1 {
619        u16::from_le_bytes([data[8], data[9]]) as usize
620    } else {
621        u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize
622    };
623    let header_end = header_start + header_len;
624
625    // Parse header to get shape
626    let header_str = std::str::from_utf8(&data[header_start..header_end])
627        .map_err(|e| Error::IndexLoad(format!("Invalid header: {}", e)))?;
628    let shape = parse_shape_from_header(header_str)?;
629
630    if shape.len() != 2 {
631        return Err(Error::IndexLoad(
632            "Expected 2D array for u8 normalization".into(),
633        ));
634    }
635
636    let nrows = shape[0];
637    let ncols = shape[1];
638    let u8_data = &data[header_end..header_end + nrows * ncols];
639
640    // Re-write with explicit "|u1" dtype
641    let new_file = File::create(path)
642        .map_err(|e| Error::IndexLoad(format!("Failed to create {:?}: {}", path, e)))?;
643    let mut writer = BufWriter::new(new_file);
644
645    write_npy_header_2d(&mut writer, nrows, ncols, "|u1")?;
646
647    writer
648        .write_all(u8_data)
649        .map_err(|e| Error::IndexLoad(format!("Failed to write data: {}", e)))?;
650    writer.flush()?;
651
652    Ok(())
653}
654
655/// Parse NPY header and return (shape, data_offset, is_fortran_order)
656fn parse_npy_header(mmap: &Mmap) -> Result<(Vec<usize>, usize, bool)> {
657    if mmap.len() < 10 {
658        return Err(Error::IndexLoad("NPY file too small".into()));
659    }
660
661    // Check magic
662    if &mmap[..6] != NPY_MAGIC {
663        return Err(Error::IndexLoad("Invalid NPY magic".into()));
664    }
665
666    let major_version = mmap[6];
667    let _minor_version = mmap[7];
668
669    // Read header length
670    let header_len = if major_version == 1 {
671        u16::from_le_bytes([mmap[8], mmap[9]]) as usize
672    } else if major_version == 2 {
673        if mmap.len() < 12 {
674            return Err(Error::IndexLoad("NPY v2 file too small".into()));
675        }
676        u32::from_le_bytes([mmap[8], mmap[9], mmap[10], mmap[11]]) as usize
677    } else {
678        return Err(Error::IndexLoad(format!(
679            "Unsupported NPY version: {}",
680            major_version
681        )));
682    };
683
684    let header_start = if major_version == 1 { 10 } else { 12 };
685    let header_end = header_start + header_len;
686
687    if mmap.len() < header_end {
688        return Err(Error::IndexLoad("NPY header exceeds file size".into()));
689    }
690
691    // Parse header dict (simplified Python dict parsing)
692    let header_str = std::str::from_utf8(&mmap[header_start..header_end])
693        .map_err(|e| Error::IndexLoad(format!("Invalid NPY header encoding: {}", e)))?;
694
695    // Extract shape from header like: {'descr': '<i8', 'fortran_order': False, 'shape': (12345,), }
696    let shape = parse_shape_from_header(header_str)?;
697    let fortran_order = header_str.contains("'fortran_order': True");
698
699    Ok((shape, header_end, fortran_order))
700}
701
702/// Parse shape tuple from NPY header string
703fn parse_shape_from_header(header: &str) -> Result<Vec<usize>> {
704    // Find 'shape': (...)
705    let shape_start = header
706        .find("'shape':")
707        .ok_or_else(|| Error::IndexLoad("No shape in NPY header".into()))?;
708
709    let after_shape = &header[shape_start + 8..];
710    let paren_start = after_shape
711        .find('(')
712        .ok_or_else(|| Error::IndexLoad("No shape tuple in NPY header".into()))?;
713    let paren_end = after_shape
714        .find(')')
715        .ok_or_else(|| Error::IndexLoad("Unclosed shape tuple in NPY header".into()))?;
716
717    let shape_content = &after_shape[paren_start + 1..paren_end];
718
719    // Parse comma-separated numbers
720    let mut shape = Vec::new();
721    for part in shape_content.split(',') {
722        let trimmed = part.trim();
723        if !trimmed.is_empty() {
724            let dim: usize = trimmed.parse().map_err(|e| {
725                Error::IndexLoad(format!("Invalid shape dimension '{}': {}", trimmed, e))
726            })?;
727            shape.push(dim);
728        }
729    }
730
731    Ok(shape)
732}
733
734/// Memory-mapped NPY array for i64 values (used for codes).
735///
736/// This struct provides zero-copy access to 1D i64 arrays stored in NPY format.
737pub struct MmapNpyArray1I64 {
738    _mmap: Mmap,
739    len: usize,
740    data_offset: usize,
741}
742
743impl MmapNpyArray1I64 {
744    /// Create an empty instance backed by an anonymous mmap (no file).
745    ///
746    /// Used to release file-backed mmap handles before file operations on Windows,
747    /// where deleting or renaming a memory-mapped file causes OS error 1224.
748    pub fn empty() -> Self {
749        let mmap = MmapMut::map_anon(1)
750            .expect("failed to create anonymous mmap")
751            .make_read_only()
752            .expect("failed to make anonymous mmap read-only");
753        Self {
754            _mmap: mmap,
755            len: 0,
756            data_offset: 0,
757        }
758    }
759
760    /// Load a 1D i64 array from an NPY file.
761    pub fn from_npy_file(path: &Path) -> Result<Self> {
762        let file = File::open(path)
763            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
764
765        let mmap = unsafe {
766            Mmap::map(&file).map_err(|e| {
767                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
768            })?
769        };
770
771        let (shape, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
772
773        if shape.is_empty() {
774            return Err(Error::IndexLoad("Empty shape in NPY file".into()));
775        }
776
777        let len = shape[0];
778
779        // Verify file size
780        let expected_size = data_offset + len * 8;
781        if mmap.len() < expected_size {
782            return Err(Error::IndexLoad(format!(
783                "NPY file size {} too small for {} elements",
784                mmap.len(),
785                len
786            )));
787        }
788
789        Ok(Self {
790            _mmap: mmap,
791            len,
792            data_offset,
793        })
794    }
795
796    /// Get the length of the array.
797    pub fn len(&self) -> usize {
798        self.len
799    }
800
801    /// Returns true if the array is empty.
802    pub fn is_empty(&self) -> bool {
803        self.len == 0
804    }
805
806    /// Get a slice of the data as &[i64].
807    ///
808    /// Returns a `Vec<i64>` instead of &[i64] to handle unaligned data safely.
809    ///
810    /// # Safety
811    /// The caller must ensure start <= end <= len.
812    pub fn slice(&self, start: usize, end: usize) -> Vec<i64> {
813        let count = end - start;
814        let mut result = Vec::with_capacity(count);
815
816        for i in start..end {
817            result.push(self.get(i));
818        }
819
820        result
821    }
822
823    /// Get a value at an index.
824    pub fn get(&self, idx: usize) -> i64 {
825        let start = self.data_offset + idx * 8;
826        let bytes = &self._mmap[start..start + 8];
827        i64::from_le_bytes(bytes.try_into().unwrap())
828    }
829}
830
831/// Memory-mapped NPY array for f32 values (used for centroids).
832///
833/// This struct provides zero-copy access to 2D f32 arrays stored in NPY format.
834/// Unlike loading into an owned `Array2<f32>`, this approach lets the OS manage
835/// paging, reducing resident memory usage for large centroid matrices.
836pub struct MmapNpyArray2F32 {
837    _mmap: Mmap,
838    shape: (usize, usize),
839    data_offset: usize,
840}
841
842impl MmapNpyArray2F32 {
843    /// Load a 2D f32 array from an NPY file.
844    pub fn from_npy_file(path: &Path) -> Result<Self> {
845        let file = File::open(path)
846            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
847
848        let mmap = unsafe {
849            Mmap::map(&file).map_err(|e| {
850                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
851            })?
852        };
853
854        let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
855
856        if shape_vec.len() != 2 {
857            return Err(Error::IndexLoad(format!(
858                "Expected 2D array, got {}D",
859                shape_vec.len()
860            )));
861        }
862
863        let shape = (shape_vec[0], shape_vec[1]);
864
865        // Verify file size (f32 = 4 bytes)
866        let expected_size = data_offset + shape.0 * shape.1 * 4;
867        if mmap.len() < expected_size {
868            return Err(Error::IndexLoad(format!(
869                "NPY file size {} too small for shape {:?}",
870                mmap.len(),
871                shape
872            )));
873        }
874
875        Ok(Self {
876            _mmap: mmap,
877            shape,
878            data_offset,
879        })
880    }
881
882    /// Get the shape of the array.
883    pub fn shape(&self) -> (usize, usize) {
884        self.shape
885    }
886
887    /// Get the number of rows.
888    pub fn nrows(&self) -> usize {
889        self.shape.0
890    }
891
892    /// Get the number of columns.
893    pub fn ncols(&self) -> usize {
894        self.shape.1
895    }
896
897    /// Get a view of the entire array as ArrayView2.
898    ///
899    /// This provides zero-copy access to the memory-mapped data.
900    pub fn view(&self) -> ArrayView2<'_, f32> {
901        let byte_start = self.data_offset;
902        let byte_end = self.data_offset + self.shape.0 * self.shape.1 * 4;
903        let bytes = &self._mmap[byte_start..byte_end];
904
905        // Safety: We've verified bounds and f32 is 4-byte aligned in NPY format
906        let data = unsafe {
907            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.0 * self.shape.1)
908        };
909
910        ArrayView2::from_shape(self.shape, data).unwrap()
911    }
912
913    /// Get a view of a single row.
914    pub fn row(&self, idx: usize) -> ArrayView1<'_, f32> {
915        let byte_start = self.data_offset + idx * self.shape.1 * 4;
916        let bytes = &self._mmap[byte_start..byte_start + self.shape.1 * 4];
917
918        // Safety: We've verified bounds and alignment
919        let data =
920            unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const f32, self.shape.1) };
921
922        ArrayView1::from_shape(self.shape.1, data).unwrap()
923    }
924
925    /// Get a view of rows [start..end] as ArrayView2.
926    pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, f32> {
927        let nrows = end - start;
928        let byte_start = self.data_offset + start * self.shape.1 * 4;
929        let byte_end = self.data_offset + end * self.shape.1 * 4;
930        let bytes = &self._mmap[byte_start..byte_end];
931
932        // Safety: We've verified bounds
933        let data = unsafe {
934            std::slice::from_raw_parts(bytes.as_ptr() as *const f32, nrows * self.shape.1)
935        };
936
937        ArrayView2::from_shape((nrows, self.shape.1), data).unwrap()
938    }
939
940    /// Convert to an owned Array2 (loads all data into memory).
941    ///
942    /// Use this only when you need an owned copy; prefer `view()` for read-only access.
943    pub fn to_owned(&self) -> Array2<f32> {
944        self.view().to_owned()
945    }
946}
947
948/// Memory-mapped NPY array for u8 values (used for residuals).
949///
950/// This struct provides zero-copy access to 2D u8 arrays stored in NPY format.
951pub struct MmapNpyArray2U8 {
952    _mmap: Mmap,
953    shape: (usize, usize),
954    data_offset: usize,
955}
956
957impl MmapNpyArray2U8 {
958    /// Create an empty instance backed by an anonymous mmap (no file).
959    ///
960    /// Used to release file-backed mmap handles before file operations on Windows,
961    /// where deleting or renaming a memory-mapped file causes OS error 1224.
962    pub fn empty() -> Self {
963        let mmap = MmapMut::map_anon(1)
964            .expect("failed to create anonymous mmap")
965            .make_read_only()
966            .expect("failed to make anonymous mmap read-only");
967        Self {
968            _mmap: mmap,
969            shape: (0, 0),
970            data_offset: 0,
971        }
972    }
973
974    /// Load a 2D u8 array from an NPY file.
975    pub fn from_npy_file(path: &Path) -> Result<Self> {
976        let file = File::open(path)
977            .map_err(|e| Error::IndexLoad(format!("Failed to open NPY file {:?}: {}", path, e)))?;
978
979        let mmap = unsafe {
980            Mmap::map(&file).map_err(|e| {
981                Error::IndexLoad(format!("Failed to mmap NPY file {:?}: {}", path, e))
982            })?
983        };
984
985        let (shape_vec, data_offset, _fortran_order) = parse_npy_header(&mmap)?;
986
987        if shape_vec.len() != 2 {
988            return Err(Error::IndexLoad(format!(
989                "Expected 2D array, got {}D",
990                shape_vec.len()
991            )));
992        }
993
994        let shape = (shape_vec[0], shape_vec[1]);
995
996        // Verify file size
997        let expected_size = data_offset + shape.0 * shape.1;
998        if mmap.len() < expected_size {
999            return Err(Error::IndexLoad(format!(
1000                "NPY file size {} too small for shape {:?}",
1001                mmap.len(),
1002                shape
1003            )));
1004        }
1005
1006        Ok(Self {
1007            _mmap: mmap,
1008            shape,
1009            data_offset,
1010        })
1011    }
1012
1013    /// Get the shape of the array.
1014    pub fn shape(&self) -> (usize, usize) {
1015        self.shape
1016    }
1017
1018    /// Get the number of rows.
1019    pub fn nrows(&self) -> usize {
1020        self.shape.0
1021    }
1022
1023    /// Get the number of columns.
1024    pub fn ncols(&self) -> usize {
1025        self.shape.1
1026    }
1027
1028    /// Get a view of rows [start..end] as ArrayView2.
1029    pub fn slice_rows(&self, start: usize, end: usize) -> ArrayView2<'_, u8> {
1030        let nrows = end - start;
1031        let byte_start = self.data_offset + start * self.shape.1;
1032        let byte_end = self.data_offset + end * self.shape.1;
1033        let bytes = &self._mmap[byte_start..byte_end];
1034
1035        ArrayView2::from_shape((nrows, self.shape.1), bytes).unwrap()
1036    }
1037
1038    /// Get a view of the entire array.
1039    pub fn view(&self) -> ArrayView2<'_, u8> {
1040        self.slice_rows(0, self.shape.0)
1041    }
1042
1043    /// Get a single row as a slice.
1044    pub fn row(&self, idx: usize) -> &[u8] {
1045        let byte_start = self.data_offset + idx * self.shape.1;
1046        let byte_end = byte_start + self.shape.1;
1047        &self._mmap[byte_start..byte_end]
1048    }
1049}
1050
1051// ============================================================================
1052// Merged File Creation
1053// ============================================================================
1054
1055/// Manifest entry for tracking chunk files
1056#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1057pub struct ChunkManifestEntry {
1058    pub rows: usize,
1059    pub mtime: f64,
1060}
1061
1062/// Manifest for merged files, including metadata about the merge
1063#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1064pub struct MergeManifest {
1065    /// Chunk information
1066    pub chunks: HashMap<String, ChunkManifestEntry>,
1067    /// Number of padding rows used in the merge
1068    #[serde(default)]
1069    pub padding_rows: usize,
1070    /// Number of chunks expected (for fast-path validation)
1071    #[serde(default)]
1072    pub num_chunks: usize,
1073    /// Mtime of metadata.json at merge time (detects any index modifications)
1074    #[serde(default)]
1075    pub metadata_mtime: f64,
1076    /// Total rows in the merged file (including padding)
1077    #[serde(default)]
1078    pub total_rows: usize,
1079    /// Number of columns (for 2D arrays like residuals)
1080    #[serde(default)]
1081    pub ncols: usize,
1082}
1083
1084/// Legacy manifest type (for backwards compatibility during migration)
1085pub type ChunkManifest = HashMap<String, ChunkManifestEntry>;
1086
1087/// Load manifest from disk if it exists
1088/// Handles both new MergeManifest format and legacy ChunkManifest format
1089fn load_merge_manifest(manifest_path: &Path) -> Option<MergeManifest> {
1090    if manifest_path.exists() {
1091        if let Ok(file) = File::open(manifest_path) {
1092            // Try to load as new format first
1093            let reader = BufReader::new(file);
1094            if let Ok(manifest) = serde_json::from_reader::<_, MergeManifest>(reader) {
1095                return Some(manifest);
1096            }
1097            // Try legacy format
1098            if let Ok(file) = File::open(manifest_path) {
1099                if let Ok(chunks) =
1100                    serde_json::from_reader::<_, ChunkManifest>(BufReader::new(file))
1101                {
1102                    // Convert legacy format - missing padding info means we need to regenerate
1103                    return Some(MergeManifest {
1104                        chunks,
1105                        padding_rows: 0,
1106                        total_rows: 0,
1107                        ncols: 0,
1108                        num_chunks: 0,
1109                        metadata_mtime: 0.0,
1110                    });
1111                }
1112            }
1113        }
1114    }
1115    None
1116}
1117
1118/// Save manifest to disk atomically (write to temp file, then rename)
1119fn save_merge_manifest(manifest_path: &Path, manifest: &MergeManifest) -> Result<()> {
1120    let temp_path = manifest_path.with_extension("manifest.json.tmp");
1121
1122    // Write to temp file
1123    let file = File::create(&temp_path)
1124        .map_err(|e| Error::IndexLoad(format!("Failed to create temp manifest: {}", e)))?;
1125    let mut writer = BufWriter::new(file);
1126    serde_json::to_writer(&mut writer, manifest)
1127        .map_err(|e| Error::IndexLoad(format!("Failed to write manifest: {}", e)))?;
1128    writer
1129        .flush()
1130        .map_err(|e| Error::IndexLoad(format!("Failed to flush manifest: {}", e)))?;
1131
1132    // Sync to disk
1133    writer
1134        .into_inner()
1135        .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?
1136        .sync_all()
1137        .map_err(|e| Error::IndexLoad(format!("Failed to sync manifest: {}", e)))?;
1138
1139    // Atomic rename
1140    fs::rename(&temp_path, manifest_path)
1141        .map_err(|e| Error::IndexLoad(format!("Failed to rename manifest: {}", e)))?;
1142
1143    Ok(())
1144}
1145
1146/// Get file modification time as f64 seconds since epoch
1147fn get_mtime(path: &Path) -> Result<f64> {
1148    let metadata = fs::metadata(path)
1149        .map_err(|e| Error::IndexLoad(format!("Failed to get metadata for {:?}: {}", path, e)))?;
1150    let mtime = metadata
1151        .modified()
1152        .map_err(|e| Error::IndexLoad(format!("Failed to get mtime: {}", e)))?;
1153    let duration = mtime
1154        .duration_since(std::time::UNIX_EPOCH)
1155        .map_err(|e| Error::IndexLoad(format!("Invalid mtime: {}", e)))?;
1156    Ok(duration.as_secs_f64())
1157}
1158
1159/// Build the NPY header dict string and compute the total header size (magic + version + len + padded dict).
1160fn npy_header_layout(header_dict: &str) -> (usize, usize) {
1161    let header_len = header_dict.len();
1162    let padding = (64 - ((10 + header_len) % 64)) % 64;
1163    let total = 10 + header_len + padding + 1; // +1 for the trailing newline
1164    (padding, total)
1165}
1166
1167fn npy_header_dict_1d(len: usize, dtype: &str) -> String {
1168    format!(
1169        "{{'descr': '{}', 'fortran_order': False, 'shape': ({},), }}",
1170        dtype, len
1171    )
1172}
1173
1174fn npy_header_dict_2d(nrows: usize, ncols: usize, dtype: &str) -> String {
1175    format!(
1176        "{{'descr': '{}', 'fortran_order': False, 'shape': ({}, {}), }}",
1177        dtype, nrows, ncols
1178    )
1179}
1180
1181/// Compute the NPY header size for a 1D array (without writing).
1182fn npy_header_size_1d(len: usize, dtype: &str) -> usize {
1183    let dict = npy_header_dict_1d(len, dtype);
1184    npy_header_layout(&dict).1
1185}
1186
1187/// Compute the NPY header size for a 2D array (without writing).
1188fn npy_header_size_2d(nrows: usize, ncols: usize, dtype: &str) -> usize {
1189    let dict = npy_header_dict_2d(nrows, ncols, dtype);
1190    npy_header_layout(&dict).1
1191}
1192
1193/// Write an NPY header (shared implementation for 1D and 2D).
1194fn write_npy_header(writer: &mut impl Write, header_dict: &str) -> Result<usize> {
1195    let (padding, total) = npy_header_layout(header_dict);
1196    let padded_header = format!("{}{}\n", header_dict, " ".repeat(padding));
1197
1198    // Write magic + version (v1.0)
1199    writer
1200        .write_all(NPY_MAGIC)
1201        .map_err(|e| Error::IndexLoad(format!("Failed to write NPY magic: {}", e)))?;
1202    writer
1203        .write_all(&[1, 0])
1204        .map_err(|e| Error::IndexLoad(format!("Failed to write version: {}", e)))?;
1205
1206    // Write header length (2 bytes for v1.0)
1207    let header_len_bytes = (padded_header.len() as u16).to_le_bytes();
1208    writer
1209        .write_all(&header_len_bytes)
1210        .map_err(|e| Error::IndexLoad(format!("Failed to write header len: {}", e)))?;
1211
1212    // Write header
1213    writer
1214        .write_all(padded_header.as_bytes())
1215        .map_err(|e| Error::IndexLoad(format!("Failed to write header: {}", e)))?;
1216
1217    Ok(total)
1218}
1219
1220/// Write NPY header for a 1D array
1221fn write_npy_header_1d(writer: &mut impl Write, len: usize, dtype: &str) -> Result<usize> {
1222    write_npy_header(writer, &npy_header_dict_1d(len, dtype))
1223}
1224
1225/// Write NPY header for a 2D array
1226fn write_npy_header_2d(
1227    writer: &mut impl Write,
1228    nrows: usize,
1229    ncols: usize,
1230    dtype: &str,
1231) -> Result<usize> {
1232    write_npy_header(writer, &npy_header_dict_2d(nrows, ncols, dtype))
1233}
1234
1235/// Information about a chunk file for merging
1236struct ChunkInfo {
1237    path: std::path::PathBuf,
1238    filename: String,
1239    rows: usize,
1240    mtime: f64,
1241}
1242
1243/// Merge chunked codes NPY files into a single merged file.
1244///
1245/// Uses incremental persistence with manifest tracking to skip unchanged chunks.
1246/// Uses atomic writes to prevent corruption from interrupted writes.
1247/// Uses file-based locking to coordinate concurrent processes.
1248/// Returns the path to the merged file.
1249pub fn merge_codes_chunks(
1250    index_path: &Path,
1251    num_chunks: usize,
1252    padding_rows: usize,
1253) -> Result<std::path::PathBuf> {
1254    use ndarray_npy::ReadNpyExt;
1255
1256    let merged_path = index_path.join("merged_codes.npy");
1257    let manifest_path = index_path.join("merged_codes.manifest.json");
1258    let temp_path = index_path.join("merged_codes.npy.tmp");
1259    let lock_path = index_path.join("merged_codes.lock");
1260
1261    // Fast path: if manifest exists with matching params, metadata.json hasn't changed,
1262    // and merged file exists with correct size, skip chunk scanning entirely.
1263    let metadata_json_path = index_path.join("metadata.json");
1264    let current_metadata_mtime = get_mtime(&metadata_json_path).unwrap_or(0.0);
1265    if let Some(ref manifest) = load_merge_manifest(&manifest_path) {
1266        let mtime_matches = manifest.metadata_mtime > 0.0
1267            && (manifest.metadata_mtime - current_metadata_mtime).abs() < 0.001;
1268        if manifest.num_chunks == num_chunks
1269            && manifest.padding_rows == padding_rows
1270            && manifest.chunks.len() == num_chunks
1271            && manifest.total_rows > 0
1272            && mtime_matches
1273            && merged_path.exists()
1274        {
1275            if let Ok(meta) = std::fs::metadata(&merged_path) {
1276                let expected_size = npy_header_size_1d(manifest.total_rows, "<i8")
1277                    + manifest.total_rows * std::mem::size_of::<i64>();
1278                if meta.len() == expected_size as u64 {
1279                    return Ok(merged_path);
1280                }
1281            }
1282        }
1283    }
1284
1285    // Acquire exclusive lock to prevent concurrent merge operations.
1286    // This is critical for multi-process scenarios (e.g., multiple API workers).
1287    let _lock = FileLockGuard::acquire(&lock_path)?;
1288
1289    // After acquiring the lock, re-check if merge is still needed.
1290    // Another process might have completed the merge while we were waiting.
1291
1292    // Load previous manifest (re-read after acquiring lock)
1293    let old_manifest = load_merge_manifest(&manifest_path);
1294
1295    // Scan chunks and detect changes
1296    let mut chunks: Vec<ChunkInfo> = Vec::new();
1297    let mut total_rows = 0usize;
1298    let mut chain_broken = false;
1299
1300    for i in 0..num_chunks {
1301        let filename = format!("{}.codes.npy", i);
1302        let path = index_path.join(&filename);
1303
1304        if path.exists() {
1305            let mtime = get_mtime(&path)?;
1306
1307            // Get shape by reading header only
1308            let file = File::open(&path)?;
1309            let arr: Array1<i64> = Array1::read_npy(file)?;
1310            let rows = arr.len();
1311
1312            if rows > 0 {
1313                total_rows += rows;
1314
1315                // Check if this chunk changed
1316                let is_clean = if let Some(ref manifest) = old_manifest {
1317                    manifest
1318                        .chunks
1319                        .get(&filename)
1320                        .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1321                } else {
1322                    false
1323                };
1324
1325                if !is_clean {
1326                    chain_broken = true;
1327                }
1328
1329                chunks.push(ChunkInfo {
1330                    path,
1331                    filename,
1332                    rows,
1333                    mtime,
1334                });
1335            }
1336        }
1337    }
1338
1339    if total_rows == 0 {
1340        return Err(Error::IndexLoad("No data to merge".into()));
1341    }
1342
1343    let final_rows = total_rows + padding_rows;
1344
1345    // Check if we need to rewrite:
1346    // 1. Merged file doesn't exist
1347    // 2. Chunks have changed
1348    // 3. Padding has changed (stored in manifest)
1349    // 4. Total rows don't match (safety check)
1350    let padding_changed = old_manifest
1351        .as_ref()
1352        .map(|m| m.padding_rows != padding_rows)
1353        .unwrap_or(true);
1354    let total_rows_mismatch = old_manifest
1355        .as_ref()
1356        .map(|m| m.total_rows != final_rows)
1357        .unwrap_or(true);
1358
1359    let needs_full_rewrite =
1360        !merged_path.exists() || chain_broken || padding_changed || total_rows_mismatch;
1361
1362    if needs_full_rewrite {
1363        // Write to temp file first (atomic write pattern)
1364        let file = File::create(&temp_path)
1365            .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1366        let mut writer = BufWriter::new(file);
1367
1368        // Write header
1369        let header_size = write_npy_header_1d(&mut writer, final_rows, "<i8")?;
1370
1371        // Write chunk data
1372        let mut written_rows = 0usize;
1373        for chunk in &chunks {
1374            let file = File::open(&chunk.path)?;
1375            let arr: Array1<i64> = Array1::read_npy(file)?;
1376            for &val in arr.iter() {
1377                writer.write_all(&val.to_le_bytes())?;
1378            }
1379            written_rows += arr.len();
1380        }
1381
1382        // Write padding zeros
1383        for _ in 0..padding_rows {
1384            writer.write_all(&0i64.to_le_bytes())?;
1385        }
1386        written_rows += padding_rows;
1387
1388        // Flush and sync to disk
1389        writer
1390            .flush()
1391            .map_err(|e| Error::IndexLoad(format!("Failed to flush merged file: {}", e)))?;
1392        let file = writer
1393            .into_inner()
1394            .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1395        file.sync_all()
1396            .map_err(|e| Error::IndexLoad(format!("Failed to sync merged file to disk: {}", e)))?;
1397
1398        // Verify file size before renaming
1399        let expected_size = header_size + written_rows * 8;
1400        let actual_size = fs::metadata(&temp_path)
1401            .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1402            .len() as usize;
1403
1404        if actual_size != expected_size {
1405            // Clean up temp file and return error
1406            let _ = fs::remove_file(&temp_path);
1407            return Err(Error::IndexLoad(format!(
1408                "Merged codes file size mismatch: expected {} bytes, got {} bytes",
1409                expected_size, actual_size
1410            )));
1411        }
1412
1413        // Atomic rename (overwrites existing file)
1414        fs::rename(&temp_path, &merged_path)
1415            .map_err(|e| Error::IndexLoad(format!("Failed to rename merged file: {}", e)))?;
1416    } else {
1417        // Validate existing merged file before using it
1418        if merged_path.exists() {
1419            let file_size = fs::metadata(&merged_path)
1420                .map_err(|e| {
1421                    Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1422                })?
1423                .len() as usize;
1424
1425            // NPY header is at least 64 bytes, data is final_rows * 8 bytes
1426            let min_expected_size = 64 + final_rows * 8;
1427            if file_size < min_expected_size {
1428                // File is corrupted, force regeneration by recursing with empty manifest
1429                let _ = fs::remove_file(&merged_path);
1430                let _ = fs::remove_file(&manifest_path);
1431                // Lock is held, so we can safely drop it before recursing
1432                drop(_lock);
1433                return merge_codes_chunks(index_path, num_chunks, padding_rows);
1434            }
1435        }
1436    }
1437
1438    // Build and save manifest with full metadata
1439    let mut chunk_map = HashMap::new();
1440    for chunk in &chunks {
1441        chunk_map.insert(
1442            chunk.filename.clone(),
1443            ChunkManifestEntry {
1444                rows: chunk.rows,
1445                mtime: chunk.mtime,
1446            },
1447        );
1448    }
1449    let new_manifest = MergeManifest {
1450        chunks: chunk_map,
1451        padding_rows,
1452        total_rows: final_rows,
1453        ncols: 0, // Not used for 1D codes array
1454        num_chunks,
1455        metadata_mtime: current_metadata_mtime,
1456    };
1457    save_merge_manifest(&manifest_path, &new_manifest)?;
1458
1459    Ok(merged_path)
1460}
1461
1462/// Merge chunked residuals NPY files into a single merged file.
1463///
1464/// Uses atomic writes to prevent corruption from interrupted writes.
1465/// Uses file-based locking to coordinate concurrent processes.
1466pub fn merge_residuals_chunks(
1467    index_path: &Path,
1468    num_chunks: usize,
1469    padding_rows: usize,
1470) -> Result<std::path::PathBuf> {
1471    use ndarray_npy::ReadNpyExt;
1472
1473    let merged_path = index_path.join("merged_residuals.npy");
1474    let manifest_path = index_path.join("merged_residuals.manifest.json");
1475    let temp_path = index_path.join("merged_residuals.npy.tmp");
1476    let lock_path = index_path.join("merged_residuals.lock");
1477
1478    // Fast path: if manifest exists with matching params, metadata.json hasn't changed,
1479    // and merged file has correct size, skip chunk scanning entirely.
1480    let metadata_json_path = index_path.join("metadata.json");
1481    let current_metadata_mtime = get_mtime(&metadata_json_path).unwrap_or(0.0);
1482    if let Some(ref manifest) = load_merge_manifest(&manifest_path) {
1483        if manifest.num_chunks == num_chunks
1484            && manifest.padding_rows == padding_rows
1485            && manifest.chunks.len() == num_chunks
1486            && manifest.total_rows > 0
1487            && manifest.ncols > 0
1488            && manifest.metadata_mtime > 0.0
1489            && (manifest.metadata_mtime - current_metadata_mtime).abs() < 0.001
1490            && merged_path.exists()
1491        {
1492            if let Ok(meta) = std::fs::metadata(&merged_path) {
1493                let expected_size = npy_header_size_2d(manifest.total_rows, manifest.ncols, "|u1")
1494                    + manifest.total_rows * manifest.ncols;
1495                if meta.len() == expected_size as u64 {
1496                    return Ok(merged_path);
1497                }
1498            }
1499        }
1500    }
1501
1502    // Acquire exclusive lock to prevent concurrent merge operations.
1503    // This is critical for multi-process scenarios (e.g., multiple API workers).
1504    let _lock = FileLockGuard::acquire(&lock_path)?;
1505
1506    // After acquiring the lock, re-check if merge is still needed.
1507    // Another process might have completed the merge while we were waiting.
1508
1509    // Load previous manifest (re-read after acquiring lock)
1510    let old_manifest = load_merge_manifest(&manifest_path);
1511
1512    // Scan chunks and detect changes
1513    let mut chunks: Vec<ChunkInfo> = Vec::new();
1514    let mut total_rows = 0usize;
1515    let mut ncols = 0usize;
1516    let mut chain_broken = false;
1517
1518    for i in 0..num_chunks {
1519        let filename = format!("{}.residuals.npy", i);
1520        let path = index_path.join(&filename);
1521
1522        if path.exists() {
1523            let mtime = get_mtime(&path)?;
1524
1525            // Get shape by reading header
1526            let file = File::open(&path)?;
1527            let arr: Array2<u8> = Array2::read_npy(file)?;
1528            let rows = arr.nrows();
1529            ncols = arr.ncols();
1530
1531            if rows > 0 {
1532                total_rows += rows;
1533
1534                let is_clean = if let Some(ref manifest) = old_manifest {
1535                    manifest
1536                        .chunks
1537                        .get(&filename)
1538                        .is_some_and(|entry| entry.mtime == mtime && entry.rows == rows)
1539                } else {
1540                    false
1541                };
1542
1543                if !is_clean {
1544                    chain_broken = true;
1545                }
1546
1547                chunks.push(ChunkInfo {
1548                    path,
1549                    filename,
1550                    rows,
1551                    mtime,
1552                });
1553            }
1554        }
1555    }
1556
1557    if total_rows == 0 || ncols == 0 {
1558        return Err(Error::IndexLoad("No residual data to merge".into()));
1559    }
1560
1561    let final_rows = total_rows + padding_rows;
1562
1563    // Check if we need to rewrite:
1564    // 1. Merged file doesn't exist
1565    // 2. Chunks have changed
1566    // 3. Padding has changed
1567    // 4. Total rows or ncols don't match
1568    let padding_changed = old_manifest
1569        .as_ref()
1570        .map(|m| m.padding_rows != padding_rows)
1571        .unwrap_or(true);
1572    let total_rows_mismatch = old_manifest
1573        .as_ref()
1574        .map(|m| m.total_rows != final_rows)
1575        .unwrap_or(true);
1576    let ncols_mismatch = old_manifest
1577        .as_ref()
1578        .map(|m| m.ncols != ncols && m.ncols != 0)
1579        .unwrap_or(false);
1580
1581    let needs_full_rewrite = !merged_path.exists()
1582        || chain_broken
1583        || padding_changed
1584        || total_rows_mismatch
1585        || ncols_mismatch;
1586
1587    if needs_full_rewrite {
1588        // Write to temp file first (atomic write pattern)
1589        let file = File::create(&temp_path)
1590            .map_err(|e| Error::IndexLoad(format!("Failed to create temp merged file: {}", e)))?;
1591        let mut writer = BufWriter::new(file);
1592
1593        // Write header
1594        let header_size = write_npy_header_2d(&mut writer, final_rows, ncols, "|u1")?;
1595
1596        // Write chunk data
1597        let mut written_rows = 0usize;
1598        for chunk in &chunks {
1599            let file = File::open(&chunk.path)?;
1600            let arr: Array2<u8> = Array2::read_npy(file)?;
1601            for row in arr.rows() {
1602                writer.write_all(row.as_slice().unwrap())?;
1603            }
1604            written_rows += arr.nrows();
1605        }
1606
1607        // Write padding zeros
1608        let zero_row = vec![0u8; ncols];
1609        for _ in 0..padding_rows {
1610            writer.write_all(&zero_row)?;
1611        }
1612        written_rows += padding_rows;
1613
1614        // Flush and sync to disk
1615        writer
1616            .flush()
1617            .map_err(|e| Error::IndexLoad(format!("Failed to flush merged residuals: {}", e)))?;
1618        let file = writer
1619            .into_inner()
1620            .map_err(|e| Error::IndexLoad(format!("Failed to get inner file: {}", e)))?;
1621        file.sync_all().map_err(|e| {
1622            Error::IndexLoad(format!("Failed to sync merged residuals to disk: {}", e))
1623        })?;
1624
1625        // Verify file size before renaming
1626        let expected_size = header_size + written_rows * ncols;
1627        let actual_size = fs::metadata(&temp_path)
1628            .map_err(|e| Error::IndexLoad(format!("Failed to get temp file metadata: {}", e)))?
1629            .len() as usize;
1630
1631        if actual_size != expected_size {
1632            // Clean up temp file and return error
1633            let _ = fs::remove_file(&temp_path);
1634            return Err(Error::IndexLoad(format!(
1635                "Merged residuals file size mismatch: expected {} bytes, got {} bytes",
1636                expected_size, actual_size
1637            )));
1638        }
1639
1640        // Atomic rename
1641        fs::rename(&temp_path, &merged_path)
1642            .map_err(|e| Error::IndexLoad(format!("Failed to rename merged residuals: {}", e)))?;
1643    } else {
1644        // Validate existing merged file before using it
1645        if merged_path.exists() {
1646            let file_size = fs::metadata(&merged_path)
1647                .map_err(|e| {
1648                    Error::IndexLoad(format!("Failed to get merged file metadata: {}", e))
1649                })?
1650                .len() as usize;
1651
1652            // NPY header is at least 64 bytes, data is final_rows * ncols bytes
1653            let min_expected_size = 64 + final_rows * ncols;
1654            if file_size < min_expected_size {
1655                // File is corrupted, force regeneration
1656                let _ = fs::remove_file(&merged_path);
1657                let _ = fs::remove_file(&manifest_path);
1658                // Lock is held, so we can safely drop it before recursing
1659                drop(_lock);
1660                return merge_residuals_chunks(index_path, num_chunks, padding_rows);
1661            }
1662        }
1663    }
1664
1665    // Build and save manifest with full metadata
1666    let mut chunk_map = HashMap::new();
1667    for chunk in &chunks {
1668        chunk_map.insert(
1669            chunk.filename.clone(),
1670            ChunkManifestEntry {
1671                rows: chunk.rows,
1672                mtime: chunk.mtime,
1673            },
1674        );
1675    }
1676    let new_manifest = MergeManifest {
1677        chunks: chunk_map,
1678        padding_rows,
1679        total_rows: final_rows,
1680        ncols,
1681        num_chunks,
1682        metadata_mtime: current_metadata_mtime,
1683    };
1684    save_merge_manifest(&manifest_path, &new_manifest)?;
1685
1686    Ok(merged_path)
1687}
1688
1689/// Clear merged files and manifests to force regeneration on next load.
1690///
1691/// This should be called after index updates to ensure the merged files
1692/// are regenerated with the latest data. The function silently ignores
1693/// missing files.
1694///
1695/// Acquires file locks to prevent racing with concurrent merge operations
1696/// in multi-process deployments.
1697pub fn clear_merged_files(index_path: &Path) -> Result<()> {
1698    // Acquire locks to prevent racing with ongoing merge operations.
1699    // This is important in multi-process scenarios where one process might
1700    // be loading (merging) while another is updating (clearing).
1701    let codes_lock_path = index_path.join("merged_codes.lock");
1702    let residuals_lock_path = index_path.join("merged_residuals.lock");
1703    let _codes_lock = FileLockGuard::acquire(&codes_lock_path)?;
1704    let _residuals_lock = FileLockGuard::acquire(&residuals_lock_path)?;
1705
1706    let files_to_remove = [
1707        "merged_codes.npy",
1708        "merged_codes.npy.tmp",
1709        "merged_codes.manifest.json",
1710        "merged_codes.manifest.json.tmp",
1711        "merged_residuals.npy",
1712        "merged_residuals.npy.tmp",
1713        "merged_residuals.manifest.json",
1714        "merged_residuals.manifest.json.tmp",
1715    ];
1716
1717    for filename in files_to_remove {
1718        let path = index_path.join(filename);
1719        if path.exists() {
1720            fs::remove_file(&path)
1721                .map_err(|e| Error::IndexLoad(format!("Failed to remove {}: {}", filename, e)))?;
1722        }
1723    }
1724
1725    Ok(())
1726}
1727
1728// ============================================================================
1729// Fast-PLAID Compatibility Conversion
1730// ============================================================================
1731
1732/// Convert a fast-plaid index to next-plaid compatible format.
1733///
1734/// This function detects and converts:
1735/// - float16 → float32 for centroids, avg_residual, bucket_cutoffs, bucket_weights
1736/// - int64 → int32 for ivf_lengths
1737/// - `<u1` → `|u1` for residuals
1738///
1739/// Returns true if any conversion was performed, false if already compatible.
1740pub fn convert_fastplaid_to_nextplaid(index_path: &Path) -> Result<bool> {
1741    let mut converted = false;
1742
1743    // Float files to convert from f16 to f32
1744    let float_files = [
1745        "centroids.npy",
1746        "avg_residual.npy",
1747        "bucket_cutoffs.npy",
1748        "bucket_weights.npy",
1749    ];
1750
1751    for filename in float_files {
1752        let path = index_path.join(filename);
1753        if path.exists() {
1754            let dtype = detect_npy_dtype(&path)?;
1755            if dtype == "<f2" {
1756                eprintln!("  Converting {} from float16 to float32", filename);
1757                convert_f16_to_f32_npy(&path)?;
1758                converted = true;
1759            }
1760        }
1761    }
1762
1763    // Convert ivf_lengths from i64 to i32
1764    let ivf_lengths_path = index_path.join("ivf_lengths.npy");
1765    if ivf_lengths_path.exists() {
1766        let dtype = detect_npy_dtype(&ivf_lengths_path)?;
1767        if dtype == "<i8" {
1768            eprintln!("  Converting ivf_lengths.npy from int64 to int32");
1769            convert_i64_to_i32_npy(&ivf_lengths_path)?;
1770            converted = true;
1771        }
1772    }
1773
1774    // Normalize residual files to use "|u1" descriptor
1775    // fast-plaid uses "<u1" which ndarray_npy doesn't accept
1776    for entry in fs::read_dir(index_path)? {
1777        let entry = entry?;
1778        let filename = entry.file_name().to_string_lossy().to_string();
1779        if filename.ends_with(".residuals.npy") {
1780            let path = entry.path();
1781            let dtype = detect_npy_dtype(&path)?;
1782            if dtype == "<u1" {
1783                eprintln!(
1784                    "  Normalizing {} dtype descriptor from <u1 to |u1",
1785                    filename
1786                );
1787                normalize_u8_npy(&path)?;
1788                converted = true;
1789            }
1790        }
1791    }
1792
1793    Ok(converted)
1794}
1795
1796#[cfg(test)]
1797mod tests {
1798    use super::*;
1799    use std::io::Write;
1800    use tempfile::NamedTempFile;
1801
1802    #[test]
1803    fn test_mmap_array2_f32() {
1804        // Create a test file
1805        let mut file = NamedTempFile::new().unwrap();
1806
1807        // Write header (3 rows, 2 cols)
1808        file.write_all(&3i64.to_le_bytes()).unwrap();
1809        file.write_all(&2i64.to_le_bytes()).unwrap();
1810
1811        // Write data
1812        for val in [1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0] {
1813            file.write_all(&val.to_le_bytes()).unwrap();
1814        }
1815
1816        file.flush().unwrap();
1817
1818        // Load and verify
1819        let mmap = MmapArray2F32::from_raw_file(file.path()).unwrap();
1820        assert_eq!(mmap.shape(), (3, 2));
1821
1822        let row0 = mmap.row(0);
1823        assert_eq!(row0[0], 1.0);
1824        assert_eq!(row0[1], 2.0);
1825
1826        let owned = mmap.to_owned();
1827        assert_eq!(owned[[2, 0]], 5.0);
1828        assert_eq!(owned[[2, 1]], 6.0);
1829    }
1830
1831    #[test]
1832    fn test_mmap_array1_i64() {
1833        let mut file = NamedTempFile::new().unwrap();
1834
1835        // Write header (4 elements)
1836        file.write_all(&4i64.to_le_bytes()).unwrap();
1837
1838        // Write data
1839        for val in [10i64, 20, 30, 40] {
1840            file.write_all(&val.to_le_bytes()).unwrap();
1841        }
1842
1843        file.flush().unwrap();
1844
1845        let mmap = MmapArray1I64::from_raw_file(file.path()).unwrap();
1846        assert_eq!(mmap.len(), 4);
1847        assert_eq!(mmap.get(0), 10);
1848        assert_eq!(mmap.get(3), 40);
1849
1850        let owned = mmap.to_owned();
1851        assert_eq!(owned[1], 20);
1852        assert_eq!(owned[2], 30);
1853    }
1854
1855    #[test]
1856    fn test_write_read_roundtrip() {
1857        let file = NamedTempFile::new().unwrap();
1858        let path = file.path();
1859
1860        // Create test array
1861        let array = Array2::from_shape_vec((2, 3), vec![1.0f32, 2.0, 3.0, 4.0, 5.0, 6.0]).unwrap();
1862
1863        // Write
1864        write_array2_f32(&array, path).unwrap();
1865
1866        // Read back
1867        let mmap = MmapArray2F32::from_raw_file(path).unwrap();
1868        let loaded = mmap.to_owned();
1869
1870        assert_eq!(array, loaded);
1871    }
1872}