genegraph_storage/
lance.rs

1//! Lance storage backend for graph embeddings.
2//!
3//! Async-first implementation that matches the async `StorageBackend` trait:
4//! - All I/O is async, no internal `block_on` or runtime creation.
5//! - Callers (CLI, tests, services) are responsible for providing a Tokio runtime.
6
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use arrow::array::{Float64Array, Int64Array, UInt32Array};
12use arrow::datatypes::{DataType, Field, Schema};
13use arrow_array::{Array as ArrowArray, FixedSizeListArray, RecordBatch, RecordBatchIterator};
14use futures::StreamExt;
15use lance::dataset::{Dataset, WriteMode, WriteParams};
16use log::{debug, info, trace};
17use smartcore::linalg::basic::arrays::Array;
18use smartcore::linalg::basic::matrix::DenseMatrix;
19use sprs::{CsMat, TriMat};
20
21use crate::metadata::FileInfo;
22use crate::metadata::GeneMetadata;
23use crate::traits::StorageBackend;
24use crate::{StorageError, StorageResult};
25
26/// Lance-based storage backend for ArrowSpace graph embeddings.
27///
28/// Stores dense and sparse matrices as Lance datasets using a columnar format
29/// (`row`, `col`, `value` for sparse; `col_*` for dense) schema for efficient
30/// random and columnar access.
31#[derive(Debug, Clone)]
32pub struct LanceStorage {
33    pub(crate) _base: String,
34    pub(crate) _name: String,
35}
36
37impl LanceStorage {
38    /// Creates a new Lance storage backend.
39    ///
40    /// This is used for on-the-fly creation. For proper setup use `Genefold<...>::seed`.
41    ///
42    /// # Arguments
43    ///
44    /// * `_base` - Base directory path for all storage files
45    /// * `_name` - Name prefix for this storage instance
46    pub fn new(_base: String, _name: String) -> Self {
47        info!("Creating LanceStorage at base={}, name={}", _base, _name);
48        Self { _base, _name }
49    }
50
51    /// Validates that the storage directory is properly initialized with metadata.
52    ///
53    /// # Returns
54    ///
55    /// Returns `Ok(())` if metadata file exists, otherwise returns an error.
56    fn validate_initialized(&self, md_path: &Path) -> StorageResult<()> {
57        assert_eq!(self.metadata_path(), *md_path);
58        if !md_path.exists() {
59            return Err(StorageError::Invalid(format!(
60                "Storage not initialized: metadata file missing at {:?}. \
61Call save_metadata() or save_eigenmaps_all()/save_energymaps_all() first.",
62                md_path
63            )));
64        }
65        Ok(())
66    }
67
68    /// Converts a dense matrix to a RecordBatch in vector format (Lance-optimized).
69    /// Each row of the matrix becomes a single FixedSizeList entry.
70    ///
71    /// Arguments:
72    /// * matrix - Dense matrix to convert (N rows × F cols)
73    ///
74    /// Returns:
75    /// RecordBatch with schema: { vector: FixedSizeList<Float64>[F] }
76    fn to_dense_record_batch(
77        &self,
78        matrix: &DenseMatrix<f64>,
79    ) -> Result<RecordBatch, StorageError> {
80        let (rows, cols) = (matrix.shape().0, matrix.shape().1);
81
82        debug!(
83            "Converting dense matrix to RecordBatch (vector format): {}x{}",
84            rows, cols
85        );
86
87        if rows == 0 || cols == 0 {
88            return Err(StorageError::Invalid(
89                "Cannot convert empty matrix to RecordBatch".to_string(),
90            ));
91        }
92
93        // Flatten matrix row-by-row into a single Vec<f64>
94        let mut values: Vec<f64> = Vec::with_capacity(rows * cols);
95        for r in 0..rows {
96            for c in 0..cols {
97                values.push(*matrix.get((r, c)));
98            }
99        }
100
101        // Create FixedSizeList field: each entry is a vector of length cols
102        let value_field = Field::new("item", DataType::Float64, false);
103        let list_field = Field::new(
104            "vector",
105            DataType::FixedSizeList(Arc::new(value_field), cols as i32),
106            false,
107        );
108
109        let schema = Schema::new(vec![list_field]);
110
111        // Build the FixedSizeList array
112        let values_array = Float64Array::from(values);
113        let list_array = FixedSizeListArray::new(
114            Arc::new(Field::new("item", DataType::Float64, false)),
115            cols as i32,
116            Arc::new(values_array),
117            None, // No nulls
118        );
119
120        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)])
121            .map_err(|e| StorageError::Lance(e.to_string()))?;
122
123        trace!(
124            "RecordBatch created with {} rows (vectors of length {})",
125            batch.num_rows(),
126            cols
127        );
128
129        Ok(batch)
130    }
131
132    /// Reconstructs a dense matrix from a RecordBatch in vector format.
133    ///
134    /// Arguments:
135    /// * batch - RecordBatch containing FixedSizeList<Float64> vectors
136    ///
137    /// Returns:
138    /// DenseMatrix in column-major format (smartcore convention)
139    fn from_dense_record_batch(
140        &self,
141        batch: &RecordBatch,
142    ) -> Result<DenseMatrix<f64>, StorageError> {
143        use std::mem;
144
145        debug!("Reconstructing dense matrix from RecordBatch (vector format)");
146        debug!("Batch has {} columns", batch.num_columns());
147
148        if batch.num_columns() != 1 {
149            return Err(StorageError::Invalid(format!(
150                "Expected Lance row-major format with 1 FixedSizeList<Float64> column, but found {} columns. \
151                  This parquet file appears to be in wide format (feature-per-column). \
152                  Convert it first using: \
153                  `python -c \"import pyarrow.parquet as pq; import pyarrow.compute as pc; \
154                  tbl = pq.read_table('input.parquet'); \
155                  import pyarrow as pa; \
156                  vectors = pa.array([row.as_py() for row in tbl.to_pylist()], type=pa.list_(pa.float64(), len(tbl.column_names))); \
157                  new_tbl = pa.table({{'vector': vectors}}); \
158                  pq.write_table(new_tbl, 'output.parquet')\"` \
159                  or use a Lance-native writer in your data pipeline.",
160                batch.num_columns()
161            )));
162        }
163
164        debug!("Extracting FixedSizeList column");
165        let column = batch.column(0);
166        let list_array = column
167            .as_any()
168            .downcast_ref::<FixedSizeListArray>()
169            .ok_or_else(|| {
170                StorageError::Invalid(format!(
171                    "Column 0 is not FixedSizeList (found type: {:?}). \
172                      Expected Lance row-major format with a single FixedSizeList<Float64> column.",
173                    column.data_type()
174                ))
175            })?;
176
177        let rows = list_array.len();
178        let cols = list_array.value_length() as usize;
179
180        debug!("Matrix dimensions: {}x{}", rows, cols);
181
182        // Guard against excessive allocations
183        let total = rows
184            .checked_mul(cols)
185            .ok_or_else(|| StorageError::Invalid("Matrix size overflow (rows*cols)".to_string()))?;
186        let bytes = total
187            .checked_mul(mem::size_of::<f64>())
188            .ok_or_else(|| StorageError::Invalid("Byte size overflow".to_string()))?;
189
190        const MAX_BYTES: usize = 4usize * 1024 * 1024 * 1024; // 4 GiB
191        if bytes > MAX_BYTES {
192            return Err(StorageError::Invalid(format!(
193                "Dense load would allocate {} bytes for {}x{} matrix; exceeds 4GiB cap. \
194                  Enable --reduce-dim or shard your input data.",
195                bytes, rows, cols
196            )));
197        }
198
199        // Extract Float64 values
200        let values_array = list_array
201            .values()
202            .as_any()
203            .downcast_ref::<Float64Array>()
204            .ok_or_else(|| {
205                StorageError::Invalid("FixedSizeList values are not Float64Array".to_string())
206            })?;
207
208        debug!("Converting row-major to column-major");
209        let mut data = vec![0.0f64; total];
210        for r in 0..rows {
211            for c in 0..cols {
212                let row_major_idx = r * cols + c;
213                let col_major_idx = c * rows + r;
214                data[col_major_idx] = values_array.value(row_major_idx);
215            }
216        }
217
218        debug!("Creating DenseMatrix");
219        DenseMatrix::new(rows, cols, data, true).map_err(|e| StorageError::Invalid(e.to_string()))
220    }
221
222    /// Converts a sparse CSR matrix to a RecordBatch in columnar format.
223    ///
224    /// Only non-zero entries are stored.
225    fn to_sparse_record_batch(&self, m: &CsMat<f64>) -> StorageResult<RecordBatch> {
226        debug!(
227            "Converting sparse matrix to RecordBatch: {} x {}, nnz={}",
228            m.rows(),
229            m.cols(),
230            m.nnz()
231        );
232
233        let mut row_idx = Vec::with_capacity(m.nnz());
234        let mut col_idx = Vec::with_capacity(m.nnz());
235        let mut vals = Vec::with_capacity(m.nnz());
236
237        for (v, (r, c)) in m.iter() {
238            row_idx.push(r as u32);
239            col_idx.push(c as u32);
240            vals.push(*v);
241        }
242
243        // Store actual dimensions in schema metadata
244        let mut schema_metadata = std::collections::HashMap::new();
245        schema_metadata.insert("rows".to_string(), m.rows().to_string());
246        schema_metadata.insert("cols".to_string(), m.cols().to_string());
247        schema_metadata.insert("nnz".to_string(), m.nnz().to_string());
248
249        let schema = Schema::new(vec![
250            Field::new("row", DataType::UInt32, false),
251            Field::new("col", DataType::UInt32, false),
252            Field::new("value", DataType::Float64, false),
253        ])
254        .with_metadata(schema_metadata);
255
256        let batch = RecordBatch::try_new(
257            Arc::new(schema),
258            vec![
259                Arc::new(UInt32Array::from(row_idx)) as _,
260                Arc::new(UInt32Array::from(col_idx)) as _,
261                Arc::new(Float64Array::from(vals)) as _,
262            ],
263        )
264        .map_err(|e| StorageError::Lance(e.to_string()))?;
265
266        trace!(
267            "Sparse RecordBatch created with {} entries",
268            batch.num_rows()
269        );
270        Ok(batch)
271    }
272
273    /// Reconstructs a sparse CSR matrix from a RecordBatch in columnar format.
274    ///
275    /// * `batch` - RecordBatch containing (`row`, `col`, `value`) triplets
276    /// * `expected_rows` / `expected_cols` - dimensions taken from metadata
277    #[allow(clippy::wrong_self_convention)]
278    fn from_sparse_record_batch(
279        &self,
280        batch: RecordBatch,
281        expected_rows: usize,
282        expected_cols: usize,
283    ) -> StorageResult<CsMat<f64>> {
284        use arrow::array::UInt32Array;
285
286        debug!("Reconstructing sparse matrix from RecordBatch");
287
288        let row_arr = batch
289            .column(0)
290            .as_any()
291            .downcast_ref::<UInt32Array>()
292            .ok_or_else(|| StorageError::Invalid("row column type mismatch".into()))?;
293        let col_arr = batch
294            .column(1)
295            .as_any()
296            .downcast_ref::<UInt32Array>()
297            .ok_or_else(|| StorageError::Invalid("col column type mismatch".into()))?;
298        let val_arr = batch
299            .column(2)
300            .as_any()
301            .downcast_ref::<Float64Array>()
302            .ok_or_else(|| StorageError::Invalid("value column type mismatch".into()))?;
303
304        let n = row_arr.len();
305        if n == 0 {
306            debug!(
307                "Empty RecordBatch, returning {}x{} sparse matrix",
308                expected_rows, expected_cols
309            );
310            return Ok(CsMat::zero((expected_rows, expected_cols)));
311        }
312
313        // Try to read dimensions from schema metadata (for validation)
314        let schema = batch.schema();
315        let schema_metadata = schema.metadata();
316        if let (Some(rows_str), Some(cols_str)) =
317            (schema_metadata.get("rows"), schema_metadata.get("cols"))
318        {
319            let schema_rows = rows_str.parse::<usize>().ok();
320            let schema_cols = cols_str.parse::<usize>().ok();
321            if schema_rows != Some(expected_rows) || schema_cols != Some(expected_cols) {
322                panic!(
323                    "Schema metadata dimensions ({:?}x{:?}) don't match storage metadata ({}x{})",
324                    schema_rows, schema_cols, expected_rows, expected_cols
325                );
326            } else {
327                debug!(
328                    "Schema metadata matches storage metadata: {}x{}",
329                    expected_rows, expected_cols
330                );
331            }
332        }
333
334        let rows = expected_rows;
335        let cols = expected_cols;
336        debug!(
337            "Reconstructing {}x{} sparse matrix from {} entries",
338            rows, cols, n
339        );
340
341        let mut trimat = TriMat::new((rows, cols));
342        for i in 0..n {
343            let r = row_arr.value(i) as usize;
344            let c = col_arr.value(i) as usize;
345            let v = val_arr.value(i);
346
347            if r >= rows || c >= cols {
348                return Err(StorageError::Invalid(format!(
349                    "Index out of bounds: ({}, {}) in {}x{} matrix",
350                    r, c, rows, cols
351                )));
352            }
353            trimat.add_triplet(r, c, v);
354        }
355
356        let result = trimat.to_csr();
357        if result.rows() != rows || result.cols() != cols {
358            return Err(StorageError::Invalid(format!(
359                "Dimension mismatch after reconstruction: expected {}x{}, got {}x{}",
360                rows,
361                cols,
362                result.rows(),
363                result.cols()
364            )));
365        }
366
367        Ok(result)
368    }
369
370    /// Async helper: write a RecordBatch to a Lance dataset.
371    async fn write_lance_batch_async(&self, path: &Path, batch: RecordBatch) -> StorageResult<()> {
372        let uri = Self::path_to_uri(path);
373        info!("Writing Lance dataset to {}", uri);
374
375        let schema = batch.schema();
376        let batches = vec![batch];
377        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
378
379        let params = WriteParams {
380            mode: WriteMode::Create,
381            ..WriteParams::default()
382        };
383
384        Dataset::write(reader, &uri, Some(params))
385            .await
386            .map_err(|e| StorageError::Lance(e.to_string()))?;
387
388        info!("Successfully wrote Lance dataset to {}", uri);
389        Ok(())
390    }
391
392    /// Async helper: read and concatenate all RecordBatches from a Lance dataset.
393    async fn read_lance_all_batches_async(&self, path: &Path) -> StorageResult<RecordBatch> {
394        let uri = Self::path_to_uri(path);
395        info!("Reading Lance dataset from {}", uri);
396
397        let dataset = Dataset::open(&uri)
398            .await
399            .map_err(|e| StorageError::Lance(e.to_string()))?;
400        let scanner = dataset.scan();
401        let mut stream = scanner
402            .try_into_stream()
403            .await
404            .map_err(|e| StorageError::Lance(e.to_string()))?;
405
406        let mut batches = Vec::new();
407        while let Some(batch_result) = stream.next().await {
408            let batch = batch_result.map_err(|e| StorageError::Lance(e.to_string()))?;
409            batches.push(batch);
410        }
411
412        if batches.is_empty() {
413            return Err(StorageError::Invalid("Empty Lance dataset".into()));
414        }
415
416        let schema = batches[0].schema();
417        let combined = arrow::compute::concat_batches(&schema, &batches)
418            .map_err(|e| StorageError::Lance(format!("Failed to concatenate batches: {}", e)))?;
419
420        debug!(
421            "Combined Lance batch for {:?} has {} rows",
422            path,
423            combined.num_rows()
424        );
425        Ok(combined)
426    }
427
428    /// Async helper: read the first RecordBatch from a Lance dataset.
429    async fn read_lance_first_batch_async(&self, path: &Path) -> StorageResult<RecordBatch> {
430        let uri = Self::path_to_uri(path);
431        info!("Reading first batch from Lance dataset {}", uri);
432
433        let dataset = Dataset::open(&uri)
434            .await
435            .map_err(|e| StorageError::Lance(e.to_string()))?;
436        let scanner = dataset.scan();
437        let mut stream = scanner
438            .try_into_stream()
439            .await
440            .map_err(|e| StorageError::Lance(e.to_string()))?;
441
442        let batch = stream
443            .next()
444            .await
445            .ok_or_else(|| StorageError::Lance("empty Lance dataset".to_string()))?
446            .map_err(|e| StorageError::Lance(e.to_string()))?;
447
448        debug!(
449            "Read first RecordBatch for path {:?} with {} rows",
450            path,
451            batch.num_rows()
452        );
453        Ok(batch)
454    }
455}
456
457impl StorageBackend for LanceStorage {
458    fn get_base(&self) -> String {
459        self._base.clone()
460    }
461
462    fn get_name(&self) -> String {
463        self._name.clone()
464    }
465
466    fn base_path(&self) -> PathBuf {
467        PathBuf::from(&self._base)
468    }
469
470    fn metadata_path(&self) -> PathBuf {
471        self.base_path()
472            .join(format!("{}_metadata.json", self._name))
473    }
474
475    /// Converts a filesystem path to a `file://` URI for Lance.
476    fn path_to_uri(path: &Path) -> String {
477        // Ensure we have an absolute path
478        let abs_path = path.canonicalize().unwrap_or_else(|_| {
479            if path.is_absolute() {
480                path.to_path_buf()
481            } else {
482                std::env::current_dir()
483                    .unwrap_or_else(|_| PathBuf::from("/"))
484                    .join(path)
485            }
486        });
487        format!("file://{}", abs_path.display())
488    }
489
490    // Add these methods to impl LanceStorage {} block
491
492    /// Save dense matrix using Lance-optimized vector format.
493    ///
494    /// Each row of the matrix becomes a FixedSizeList entry for efficient vector operations.
495    /// This format is optimized for vector search and enables Lance's full-zip encoding.
496    ///
497    /// # Arguments
498    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
499    /// * `matrix` - Dense matrix to save (N rows × F cols)
500    /// * `md_path` - Metadata file path for validation
501    async fn save_dense(
502        &self,
503        key: &str,
504        matrix: &DenseMatrix<f64>,
505        md_path: &Path,
506    ) -> StorageResult<()> {
507        self.validate_initialized(md_path)?;
508        let path = self.file_path(key);
509        let (n_rows, n_cols) = matrix.shape();
510
511        info!(
512            "Saving dense {} matrix: {} x {} at {:?}",
513            key, n_rows, n_cols, path
514        );
515
516        // Convert to Lance-optimized RecordBatch (FixedSizeList format)
517        let batch = self.to_dense_record_batch(matrix)?;
518
519        // Verify batch has correct number of rows
520        if batch.num_rows() != n_rows {
521            return Err(StorageError::Invalid(format!(
522                "RecordBatch has {} rows but matrix has {} rows",
523                batch.num_rows(),
524                n_rows
525            )));
526        }
527
528        // Write to Lance
529        self.write_lance_batch_async(&path, batch).await?;
530
531        info!("Dense {} matrix saved successfully", key);
532        Ok(())
533    }
534
535    /// Load dense matrix from Lance-optimized vector format.
536    ///
537    /// Reads FixedSizeList vectors and reconstructs a column-major DenseMatrix.
538    ///
539    /// # Arguments
540    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
541    ///
542    /// # Returns
543    /// Column-major DenseMatrix matching smartcore conventions
544    async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
545        let path = self.file_path(key);
546        info!("Loading dense {} matrix from {:?}", key, path);
547
548        // Read all batches from Lance (may span multiple batches for large datasets)
549        let batch = self.read_lance_all_batches_async(&path).await?;
550
551        // Convert from FixedSizeList format to DenseMatrix
552        let matrix = self.from_dense_record_batch(&batch)?;
553
554        let (n_rows, n_cols) = matrix.shape();
555        info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
556
557        Ok(matrix)
558    }
559
560    /// Load initial data using columnar format from a file path.
561    ///
562    /// Async test helper that avoids any internal blocking runtimes.
563    async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
564        info!("Loading dense matrix from file (async): {:?}", path);
565
566        if !path.exists() {
567            return Err(StorageError::Invalid(format!(
568                "Dense file does not exist: {:?}",
569                path
570            )));
571        }
572
573        let extension = path
574            .extension()
575            .and_then(|e| e.to_str())
576            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
577
578        match extension {
579            "lance" => {
580                // Use a temporary LanceStorage rooted at the file's parent dir,
581                // same pattern as save_dense_to_file_async.
582                let parent = path
583                    .parent()
584                    .ok_or_else(|| {
585                        StorageError::Invalid(format!("Path has no parent: {:?}", path))
586                    })?
587                    .to_str()
588                    .ok_or_else(|| {
589                        StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
590                    })?
591                    .to_string();
592
593                let tmp_storage =
594                    crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
595
596                // Reuse the async Lance reader logic.
597                let batch = tmp_storage.read_lance_all_batches_async(path).await?;
598                let matrix = tmp_storage.from_dense_record_batch(&batch)?;
599                info!(
600                    "Loaded dense matrix from Lance: {} x {}",
601                    matrix.shape().0,
602                    matrix.shape().1
603                );
604                Ok(matrix)
605            }
606            "parquet" => {
607                use arrow::datatypes::DataType;
608                use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
609                use std::fs::File;
610
611                // 1. Read from Parquet into a single RecordBatch
612                let file = File::open(path)
613                    .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
614
615                let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
616                    StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
617                })?;
618                let mut reader = builder.build().map_err(|e| {
619                    StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
620                })?;
621
622                let mut batches = Vec::new();
623                #[allow(clippy::while_let_on_iterator)]
624                while let Some(batch) = reader.next() {
625                    let batch = batch.map_err(|e| {
626                        StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
627                    })?;
628                    batches.push(batch);
629                }
630
631                if batches.is_empty() {
632                    return Err(StorageError::Invalid(format!(
633                        "Empty parquet dataset at {:?}",
634                        path
635                    )));
636                }
637
638                let schema = batches[0].schema();
639                let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
640                    StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
641                })?;
642
643                // 2. Detect layout: vector (FixedSizeList) vs old wide columnar (col_* Float64)
644                let fields = schema.fields();
645                let is_vector = fields.len() == 1
646                    && matches!(
647                        fields[0].data_type(),
648                        DataType::FixedSizeList(inner, _)
649                            if matches!(inner.data_type(), DataType::Float64)
650                    );
651
652                let is_wide_col = !is_vector
653                    && !fields.is_empty()
654                    && fields
655                        .iter()
656                        .all(|f| matches!(f.data_type(), DataType::Float64))
657                    && fields.iter().any(|f| f.name().starts_with("col_"));
658
659                // 3. Build DenseMatrix from the RecordBatch
660                let matrix = if is_vector {
661                    // New format already: vector column (FixedSizeList<Float64>)
662                    // Reuse the same decoding as Lance.
663                    let parent = path
664                        .parent()
665                        .ok_or_else(|| {
666                            StorageError::Invalid(format!("Path has no parent: {:?}", path))
667                        })?
668                        .to_str()
669                        .ok_or_else(|| {
670                            StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
671                        })?
672                        .to_string();
673
674                    let tmp_storage =
675                        crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
676                    tmp_storage.from_dense_record_batch(&combined)?
677                } else if is_wide_col {
678                    // Old wide columnar: columns like col_0, col_1, ... as Float64
679                    let n_rows = combined.num_rows();
680                    let n_cols = combined.num_columns();
681                    if n_rows == 0 || n_cols == 0 {
682                        return Err(StorageError::Invalid(format!(
683                            "Cannot load empty wide-column parquet at {:?}",
684                            path
685                        )));
686                    }
687
688                    let mut data = Vec::with_capacity(n_rows * n_cols);
689                    for col_idx in 0..n_cols {
690                        let col = combined.column(col_idx);
691                        let arr = col
692                            .as_any()
693                            .downcast_ref::<arrow_array::Float64Array>()
694                            .ok_or_else(|| {
695                                StorageError::Invalid(format!(
696                                    "Wide-column parquet expects Float64, got {:?} in column {}",
697                                    col.data_type(),
698                                    col_idx
699                                ))
700                            })?;
701                        // Build column-major storage: all rows for col 0, then col 1, ...
702                        for row_idx in 0..n_rows {
703                            data.push(arr.value(row_idx));
704                        }
705                    }
706
707                    DenseMatrix::new(n_rows, n_cols, data, true)
708                        .map_err(|e| StorageError::Invalid(e.to_string()))?
709                } else {
710                    return Err(StorageError::Invalid(format!(
711                        "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
712                         or wide Float64 columns named col_*",
713                        path
714                    )));
715                };
716
717                info!(
718                    "Loaded dense matrix from Parquet: {} x {}",
719                    matrix.shape().0,
720                    matrix.shape().1
721                );
722
723                Ok(matrix)
724            }
725            _ => Err(StorageError::Invalid(format!(
726                "Unsupported file format: {}. Only .lance and .parquet are supported",
727                extension
728            ))),
729        }
730    }
731
732    fn file_path(&self, key: &str) -> PathBuf {
733        self.base_path()
734            .join(format!("{}_{}.lance", self._name, key))
735    }
736
737    // =========
738    // ASYNC API (matches StorageBackend)
739    // =========
740
741    async fn save_sparse(
742        &self,
743        key: &str,
744        matrix: &CsMat<f64>,
745        md_path: &Path,
746    ) -> StorageResult<()> {
747        self.validate_initialized(md_path)?;
748        let path = self.file_path(key);
749        info!(
750            "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
751            key,
752            matrix.rows(),
753            matrix.cols(),
754            matrix.nnz(),
755            path
756        );
757
758        let mut metadata = self.load_metadata().await?;
759        let filetype = FileInfo::which_filetype(key);
760        metadata.files.insert(
761            key.to_string(),
762            FileInfo {
763                filename: format!("{}_{}.lance", self.get_name(), key),
764                filetype: filetype.to_string(),
765                storage_format: FileInfo::which_format(&filetype),
766                rows: matrix.rows(),
767                cols: matrix.cols(),
768                nnz: Some(matrix.nnz()),
769                size_bytes: None,
770            },
771        );
772        self.save_metadata(&metadata).await?;
773
774        let batch = self.to_sparse_record_batch(matrix)?;
775        self.write_lance_batch_async(&path, batch).await?;
776        info!("Sparse matrix {} saved successfully", filetype);
777        Ok(())
778    }
779
780    async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
781        info!("Loading sparse {} matrix", key);
782
783        let metadata = self.load_metadata().await?;
784        let filetype = FileInfo::which_filetype(key);
785        let file_info = metadata
786            .files
787            .get(key)
788            .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
789
790        let expected_rows = file_info.rows;
791        let expected_cols = file_info.cols;
792        debug!(
793            "Expected dimensions from storage metadata: {} x {}",
794            expected_rows, expected_cols
795        );
796
797        let path = self.file_path(key);
798        let batch = self.read_lance_first_batch_async(&path).await?;
799        let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
800        info!(
801            "Sparse {} matrix loaded: {} x {}, nnz={}",
802            filetype,
803            matrix.rows(),
804            matrix.cols(),
805            matrix.nnz()
806        );
807        Ok(matrix)
808    }
809
810    async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
811        self.validate_initialized(md_path)?;
812        let path = self.file_path("lambdas");
813        info!("Saving {} lambda values", lambdas.len());
814
815        let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
816        let batch = RecordBatch::try_new(
817            Arc::new(schema),
818            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
819        )
820        .map_err(|e| StorageError::Lance(e.to_string()))?;
821
822        self.write_lance_batch_async(&path, batch).await?;
823        info!("Lambda values saved successfully");
824        Ok(())
825    }
826
827    async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
828        let path = self.file_path("lambdas");
829        info!("Loading lambda values from {:?}", path);
830
831        let batch = self.read_lance_first_batch_async(&path).await?;
832        let arr = batch
833            .column(0)
834            .as_any()
835            .downcast_ref::<Float64Array>()
836            .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
837
838        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
839        info!("Loaded {} lambda values", lambdas.len());
840        Ok(lambdas)
841    }
842
843    async fn save_metadata(&self, metadata: &GeneMetadata) -> StorageResult<PathBuf> {
844        let path = self.metadata_path();
845        info!("Saving metadata to {:?}", path);
846        fs::create_dir_all(self.base_path()).map_err(|e| StorageError::Io(e.to_string()))?;
847        let s = serde_json::to_string_pretty(metadata).map_err(StorageError::Serde)?;
848        fs::write(&path, s).map_err(|e| StorageError::Io(e.to_string()))?;
849        info!("Metadata saved successfully");
850        Ok(path)
851    }
852
853    async fn load_metadata(&self) -> StorageResult<GeneMetadata> {
854        let filename = self.metadata_path();
855        info!("Loading metadata from {:?}", filename);
856        let s = fs::read_to_string(filename).map_err(|e| StorageError::Io(e.to_string()))?;
857        let md: GeneMetadata = serde_json::from_str(&s).map_err(StorageError::Serde)?;
858        info!("Metadata loaded successfully");
859        Ok(md)
860    }
861
862    async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
863        self.validate_initialized(md_path)?;
864        let path = self.file_path(key);
865        info!("Saving {} values for vector {}", vector.len(), key);
866
867        let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
868        let float64_array = Float64Array::from_iter_values(vector.iter().map(|&x| x as f64));
869        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
870            .map_err(|e| StorageError::Lance(e.to_string()))?;
871
872        self.write_lance_batch_async(&path, batch).await?;
873        info!("Index {} saved successfully", key);
874        Ok(())
875    }
876
877    async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
878        self.validate_initialized(md_path)?;
879        let path = self.file_path(key);
880        info!("Saving {} values for index {}", vector.len(), key);
881
882        let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
883        let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
884        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
885            .map_err(|e| StorageError::Lance(e.to_string()))?;
886
887        self.write_lance_batch_async(&path, batch).await?;
888        info!("Index {} saved successfully", key);
889        Ok(())
890    }
891
892    async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
893        let path = self.file_path(filename);
894        info!("Loading vector {} from {:?}", filename, path);
895
896        let batch = self.read_lance_first_batch_async(&path).await?;
897        let arr = batch
898            .column(0)
899            .as_any()
900            .downcast_ref::<Float64Array>()
901            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
902
903        let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i) as f64).collect();
904        info!("Loaded {} vector values for {}", vector.len(), filename);
905        Ok(vector)
906    }
907
908    async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
909        let path = self.file_path(filename);
910        info!("Loading vector {} from {:?}", filename, path);
911
912        let batch = self.read_lance_first_batch_async(&path).await?;
913        let arr = batch
914            .column(0)
915            .as_any()
916            .downcast_ref::<UInt32Array>()
917            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
918
919        let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
920        info!("Loaded {} vector values for {}", vector.len(), filename);
921        Ok(vector)
922    }
923
924    /// Save dense matrix to file in columnar format (col_0, col_1, ..., col_N)
925    ///
926    /// Async test helper that avoids any internal blocking runtimes.
927    #[cfg(test)]
928    async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &PathBuf) -> StorageResult<()> {
929        use tokio::fs as tokio_fs;
930
931        info!("Saving dense matrix to file (async): {:?}", path);
932
933        // Ensure parent dir exists for the test file.
934        if let Some(parent) = path.parent() {
935            tokio_fs::try_exists(parent).await.map_err(|e| {
936                StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
937            })?;
938        }
939
940        // Create a temporary storage only to store the file.
941        let tmp_storage = LanceStorage::new(
942            String::from(path.parent().unwrap().to_str().unwrap()),
943            String::from("tmp_storage"),
944        );
945
946        let extension = path
947            .extension()
948            .and_then(|e| e.to_str())
949            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
950
951        let (n_rows, n_cols) = data.shape();
952        info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
953
954        match extension {
955            "lance" => {
956                let batch = tmp_storage.to_dense_record_batch(data)?;
957                debug!(
958                    "Created RecordBatch with {} rows for Lance",
959                    batch.num_rows()
960                );
961
962                // Verify all rows are in the batch
963                if batch.num_rows() != n_rows {
964                    return Err(StorageError::Invalid(format!(
965                        "RecordBatch has {} rows but matrix has {} rows",
966                        batch.num_rows(),
967                        n_rows
968                    )));
969                }
970
971                tmp_storage.write_lance_batch_async(path, batch).await?;
972                info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
973                Ok(())
974            }
975            "parquet" => {
976                use parquet::arrow::ArrowWriter;
977                use parquet::file::properties::WriterProperties;
978                use std::fs::File;
979
980                // For tests we still use sync parquet writer; directory was created with tokio_fs.
981                let batch = tmp_storage.to_dense_record_batch(data)?;
982                debug!(
983                    "Created RecordBatch with {} rows for Parquet",
984                    batch.num_rows()
985                );
986
987                if batch.num_rows() != n_rows {
988                    return Err(StorageError::Invalid(format!(
989                        "RecordBatch has {} rows but matrix has {} rows",
990                        batch.num_rows(),
991                        n_rows
992                    )));
993                }
994
995                let file = File::create(path).map_err(|e| {
996                    StorageError::Io(format!("Failed to create parquet file: {}", e))
997                })?;
998
999                let props = WriterProperties::builder()
1000                    .set_compression(parquet::basic::Compression::SNAPPY)
1001                    .build();
1002
1003                let mut writer =
1004                    ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
1005                        StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
1006                    })?;
1007
1008                writer
1009                    .write(&batch)
1010                    .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
1011
1012                writer
1013                    .close()
1014                    .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
1015
1016                info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
1017                Ok(())
1018            }
1019            _ => Err(StorageError::Invalid(format!(
1020                "Unsupported file format: {}. Only .lance and .parquet are supported",
1021                extension
1022            ))),
1023        }
1024    }
1025
1026    /// Save centroid_map (item-to-centroid assignments)
1027    async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
1028        self.validate_initialized(md_path)?;
1029        let path = self.file_path("centroid_map");
1030        info!("Saving {} centroid map entries", map.len());
1031
1032        let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
1033        let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
1034        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
1035            .map_err(|e| StorageError::Lance(e.to_string()))?;
1036
1037        self.write_lance_batch_async(&path, batch).await?;
1038        info!("Centroid map saved successfully");
1039        Ok(())
1040    }
1041
1042    /// Load centroid_map
1043    async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
1044        let path = self.file_path("centroid_map");
1045        info!("Loading centroid map from {:?}", path);
1046
1047        let batch = self.read_lance_first_batch_async(&path).await?;
1048        let arr = batch
1049            .column(0)
1050            .as_any()
1051            .downcast_ref::<UInt32Array>()
1052            .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
1053
1054        let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
1055        info!("Loaded {} centroid map entries", map.len());
1056        Ok(map)
1057    }
1058
1059    /// Save subcentroid_lambdas (tau values for subcentroids)
1060    async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
1061        self.validate_initialized(md_path)?;
1062        let path = self.file_path("subcentroid_lambdas");
1063        info!("Saving {} subcentroid lambda values", lambdas.len());
1064
1065        let schema = Schema::new(vec![Field::new(
1066            "subcentroid_lambda",
1067            DataType::Float64,
1068            false,
1069        )]);
1070        let batch = RecordBatch::try_new(
1071            Arc::new(schema),
1072            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
1073        )
1074        .map_err(|e| StorageError::Lance(e.to_string()))?;
1075
1076        self.write_lance_batch_async(&path, batch).await?;
1077        info!("Subcentroid lambda values saved successfully");
1078        Ok(())
1079    }
1080
1081    /// Load subcentroid_lambdas
1082    async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
1083        let path = self.file_path("subcentroid_lambdas");
1084        info!("Loading subcentroid lambda values from {:?}", path);
1085
1086        let batch = self.read_lance_first_batch_async(&path).await?;
1087        let arr = batch
1088            .column(0)
1089            .as_any()
1090            .downcast_ref::<Float64Array>()
1091            .ok_or_else(|| {
1092                StorageError::Invalid("subcentroid_lambda column type mismatch".into())
1093            })?;
1094
1095        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1096        info!("Loaded {} subcentroid lambda values", lambdas.len());
1097        Ok(lambdas)
1098    }
1099
1100    /// Save subcentroids (dense matrix)
1101    async fn save_subcentroids(
1102        &self,
1103        subcentroids: &DenseMatrix<f64>,
1104        md_path: &Path,
1105    ) -> StorageResult<()> {
1106        self.validate_initialized(md_path)?;
1107        let path = self.file_path("sub_centroids");
1108        let (n_rows, n_cols) = subcentroids.shape();
1109        info!(
1110            "Saving subcentroids matrix {} x {} at {:?}",
1111            n_rows, n_cols, path
1112        );
1113
1114        let batch = self.to_dense_record_batch(subcentroids)?;
1115        self.write_lance_batch_async(&path, batch).await?;
1116        debug!("Subcentroids matrix saved successfully");
1117        Ok(())
1118    }
1119
1120    /// Load subcentroids as Vec<Vec<f64>>
1121    async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
1122        let path = self.file_path("sub_centroids");
1123        info!("Loading sub_centroids from {:?}", path);
1124
1125        let batch = self.read_lance_all_batches_async(&path).await?;
1126        let matrix = self.from_dense_record_batch(&batch)?;
1127
1128        // Convert DenseMatrix to Vec<Vec<f64>>
1129        let (n_rows, n_cols) = matrix.shape();
1130        let mut result = Vec::with_capacity(n_rows);
1131
1132        for row_idx in 0..n_rows {
1133            let row: Vec<f64> = (0..n_cols)
1134                .map(|col_idx| *matrix.get((row_idx, col_idx)))
1135                .collect();
1136            result.push(row);
1137        }
1138
1139        info!(
1140            "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
1141            n_rows, n_cols
1142        );
1143        Ok(result)
1144    }
1145
1146    /// Save item norms vector
1147    async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
1148        self.validate_initialized(md_path)?;
1149        let path = self.file_path("item_norms");
1150        info!("Saving {} item norm values", item_norms.len());
1151
1152        let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
1153        let batch = RecordBatch::try_new(
1154            Arc::new(schema),
1155            vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
1156        )
1157        .map_err(|e| StorageError::Lance(e.to_string()))?;
1158
1159        self.write_lance_batch_async(&path, batch).await?;
1160        info!("Item norms saved successfully");
1161        Ok(())
1162    }
1163
1164    /// Load item norms vector
1165    async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
1166        let path = self.file_path("item_norms");
1167        info!("Loading item norms from {:?}", path);
1168
1169        let batch = self.read_lance_first_batch_async(&path).await?;
1170        let arr = batch
1171            .column(0)
1172            .as_any()
1173            .downcast_ref::<Float64Array>()
1174            .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
1175
1176        let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1177        info!("Loaded {} item norm values", norms.len());
1178        Ok(norms)
1179    }
1180
1181    async fn save_cluster_assignments(
1182        &self,
1183        assignments: &[Option<usize>],
1184        md_path: &Path,
1185    ) -> StorageResult<()> {
1186        self.validate_initialized(md_path)?;
1187        let path = self.file_path("cluster_assignments");
1188        info!("Saving {} cluster assignments", assignments.len());
1189
1190        // Convert Option<usize> to i64 (-1 for None)
1191        let values: Vec<i64> = assignments
1192            .iter()
1193            .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
1194            .collect();
1195
1196        let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
1197        let batch = RecordBatch::try_new(
1198            Arc::new(schema),
1199            vec![Arc::new(Int64Array::from(values)) as _],
1200        )
1201        .map_err(|e| StorageError::Lance(e.to_string()))?;
1202
1203        self.write_lance_batch_async(&path, batch).await?;
1204        info!("Cluster assignments saved successfully");
1205        Ok(())
1206    }
1207
1208    async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
1209        let path = self.file_path("cluster_assignments");
1210        info!("Loading cluster assignments from {:?}", path);
1211
1212        let batch = self.read_lance_first_batch_async(&path).await?;
1213        let arr = batch
1214            .column(0)
1215            .as_any()
1216            .downcast_ref::<Int64Array>()
1217            .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
1218
1219        let assignments: Vec<Option<usize>> = (0..arr.len())
1220            .map(|i| {
1221                let val = arr.value(i);
1222                if val < 0 { None } else { Some(val as usize) }
1223            })
1224            .collect();
1225
1226        info!("Loaded {} cluster assignments", assignments.len());
1227        Ok(assignments)
1228    }
1229}