genegraph_storage/
lance.rs

1//! Lance storage backend for graph embeddings.
2//!
3//! Async-first implementation that matches the async `StorageBackend` trait:
4//! - All I/O is async, no internal `block_on` or runtime creation.
5//! - Callers (CLI, tests, services) are responsible for providing a Tokio runtime.
6
7use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use arrow::array::{Float64Array, Int64Array, UInt32Array};
12use arrow::datatypes::{DataType, Field, Schema};
13use arrow_array::{Array as ArrowArray, FixedSizeListArray, RecordBatch, RecordBatchIterator};
14use futures::StreamExt;
15use lance::dataset::{Dataset, WriteMode, WriteParams};
16use log::{debug, info, trace};
17use smartcore::linalg::basic::arrays::Array;
18use smartcore::linalg::basic::matrix::DenseMatrix;
19use sprs::{CsMat, TriMat};
20
21use crate::metadata::FileInfo;
22use crate::metadata::GeneMetadata;
23use crate::traits::StorageBackend;
24use crate::{StorageError, StorageResult};
25
26/// Lance-based storage backend for ArrowSpace graph embeddings.
27///
28/// Stores dense and sparse matrices as Lance datasets using a columnar format
29/// (`row`, `col`, `value` for sparse; `col_*` for dense) schema for efficient
30/// random and columnar access.
31#[derive(Debug, Clone)]
32pub struct LanceStorage {
33    pub(crate) _base: String,
34    pub(crate) _name: String,
35}
36
37impl LanceStorage {
38    /// Creates a new Lance storage backend.
39    ///
40    /// This is used for on-the-fly creation. For proper setup use `Genefold<...>::seed`.
41    ///
42    /// # Arguments
43    ///
44    /// * `_base` - Base directory path for all storage files
45    /// * `_name` - Name prefix for this storage instance
46    pub fn new(_base: String, _name: String) -> Self {
47        info!("Creating LanceStorage at base={}, name={}", _base, _name);
48        Self { _base, _name }
49    }
50
51    /// Validates that the storage directory is properly initialized with metadata.
52    ///
53    /// # Returns
54    ///
55    /// Returns `Ok(())` if metadata file exists, otherwise returns an error.
56    fn validate_initialized(&self, md_path: &Path) -> StorageResult<()> {
57        assert_eq!(self.metadata_path(), *md_path);
58        if !md_path.exists() {
59            return Err(StorageError::Invalid(format!(
60                "Storage not initialized: metadata file missing at {:?}. \
61Call save_metadata() or save_eigenmaps_all()/save_energymaps_all() first.",
62                md_path
63            )));
64        }
65        Ok(())
66    }
67
68    /// Converts a dense matrix to a RecordBatch in vector format (Lance-optimized).
69    /// Each row of the matrix becomes a single FixedSizeList entry.
70    ///
71    /// Arguments:
72    /// * matrix - Dense matrix to convert (N rows × F cols)
73    ///
74    /// Returns:
75    /// RecordBatch with schema: { vector: FixedSizeList<Float64>[F] }
76    fn to_dense_record_batch(
77        &self,
78        matrix: &DenseMatrix<f64>,
79    ) -> Result<RecordBatch, StorageError> {
80        let (rows, cols) = (matrix.shape().0, matrix.shape().1);
81
82        debug!(
83            "Converting dense matrix to RecordBatch (vector format): {}x{}",
84            rows, cols
85        );
86
87        if rows == 0 || cols == 0 {
88            return Err(StorageError::Invalid(
89                "Cannot convert empty matrix to RecordBatch".to_string(),
90            ));
91        }
92
93        // Flatten matrix row-by-row into a single Vec<f64>
94        let mut values: Vec<f64> = Vec::with_capacity(rows * cols);
95        for r in 0..rows {
96            for c in 0..cols {
97                values.push(*matrix.get((r, c)));
98            }
99        }
100
101        // Create FixedSizeList field: each entry is a vector of length cols
102        let value_field = Field::new("item", DataType::Float64, false);
103        let list_field = Field::new(
104            "vector",
105            DataType::FixedSizeList(Arc::new(value_field), cols as i32),
106            false,
107        );
108
109        let schema = Schema::new(vec![list_field]);
110
111        // Build the FixedSizeList array
112        let values_array = Float64Array::from(values);
113        let list_array = FixedSizeListArray::new(
114            Arc::new(Field::new("item", DataType::Float64, false)),
115            cols as i32,
116            Arc::new(values_array),
117            None, // No nulls
118        );
119
120        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)])
121            .map_err(|e| StorageError::Lance(e.to_string()))?;
122
123        trace!(
124            "RecordBatch created with {} rows (vectors of length {})",
125            batch.num_rows(),
126            cols
127        );
128
129        Ok(batch)
130    }
131
132    /// Reconstructs a dense matrix from a RecordBatch in vector format.
133    ///
134    /// Arguments:
135    /// * batch - RecordBatch containing FixedSizeList<Float64> vectors
136    ///
137    /// Returns:
138    /// DenseMatrix in column-major format (smartcore convention)
139    #[allow(clippy::wrong_self_convention)]
140    fn from_dense_record_batch(
141        &self,
142        batch: &RecordBatch,
143    ) -> Result<DenseMatrix<f64>, StorageError> {
144        use std::mem;
145
146        debug!("Reconstructing dense matrix from RecordBatch (vector format)");
147        debug!("Batch has {} columns", batch.num_columns());
148
149        if batch.num_columns() != 1 {
150            return Err(StorageError::Invalid(format!(
151                "Expected Lance row-major format with 1 FixedSizeList<Float64> column, but found {} columns. \
152                  This parquet file appears to be in wide format (feature-per-column). \
153                  Convert it first using: \
154                  `python -c \"import pyarrow.parquet as pq; import pyarrow.compute as pc; \
155                  tbl = pq.read_table('input.parquet'); \
156                  import pyarrow as pa; \
157                  vectors = pa.array([row.as_py() for row in tbl.to_pylist()], type=pa.list_(pa.float64(), len(tbl.column_names))); \
158                  new_tbl = pa.table({{'vector': vectors}}); \
159                  pq.write_table(new_tbl, 'output.parquet')\"` \
160                  or use a Lance-native writer in your data pipeline.",
161                batch.num_columns()
162            )));
163        }
164
165        debug!("Extracting FixedSizeList column");
166        let column = batch.column(0);
167        let list_array = column
168            .as_any()
169            .downcast_ref::<FixedSizeListArray>()
170            .ok_or_else(|| {
171                StorageError::Invalid(format!(
172                    "Column 0 is not FixedSizeList (found type: {:?}). \
173                      Expected Lance row-major format with a single FixedSizeList<Float64> column.",
174                    column.data_type()
175                ))
176            })?;
177
178        let rows = list_array.len();
179        let cols = list_array.value_length() as usize;
180
181        debug!("Matrix dimensions: {}x{}", rows, cols);
182
183        // Guard against excessive allocations
184        let total = rows
185            .checked_mul(cols)
186            .ok_or_else(|| StorageError::Invalid("Matrix size overflow (rows*cols)".to_string()))?;
187        let bytes = total
188            .checked_mul(mem::size_of::<f64>())
189            .ok_or_else(|| StorageError::Invalid("Byte size overflow".to_string()))?;
190
191        const MAX_BYTES: usize = 4usize * 1024 * 1024 * 1024; // 4 GiB
192        if bytes > MAX_BYTES {
193            return Err(StorageError::Invalid(format!(
194                "Dense load would allocate {} bytes for {}x{} matrix; exceeds 4GiB cap. \
195                  Enable --reduce-dim or shard your input data.",
196                bytes, rows, cols
197            )));
198        }
199
200        // Extract Float64 values
201        let values_array = list_array
202            .values()
203            .as_any()
204            .downcast_ref::<Float64Array>()
205            .ok_or_else(|| {
206                StorageError::Invalid("FixedSizeList values are not Float64Array".to_string())
207            })?;
208
209        debug!("Converting row-major to column-major");
210        let mut data = vec![0.0f64; total];
211        for r in 0..rows {
212            for c in 0..cols {
213                let row_major_idx = r * cols + c;
214                let col_major_idx = c * rows + r;
215                data[col_major_idx] = values_array.value(row_major_idx);
216            }
217        }
218
219        debug!("Creating DenseMatrix");
220        DenseMatrix::new(rows, cols, data, true).map_err(|e| StorageError::Invalid(e.to_string()))
221    }
222
223    /// Spawn a LanceStorage from an existing seeded directory (with metadata.json)
224    pub async fn spawn(base_path: String) -> Result<(LanceStorage, GeneMetadata), StorageError> {
225        // Reuse the generic `exists` helper from the StorageBackend trait
226        let (exists, md_path) = <LanceStorage as StorageBackend>::exists(&base_path);
227        assert!(
228            exists && md_path.is_some(),
229            "Metadata does not exist in this base path"
230        );
231
232        // Load metadata from the discovered metadata.json
233        let metadata = GeneMetadata::read(md_path.unwrap()).await?;
234
235        // Construct the LanceStorage using the metadata-provided nameid
236        let storage = LanceStorage::new(base_path.clone(), metadata.name_id.clone());
237
238        Ok((storage, metadata))
239    }
240
241    /// Converts a sparse CSR matrix to a RecordBatch in columnar format.
242    ///
243    /// Only non-zero entries are stored.
244    fn to_sparse_record_batch(&self, m: &CsMat<f64>) -> StorageResult<RecordBatch> {
245        debug!(
246            "Converting sparse matrix to RecordBatch: {} x {}, nnz={}",
247            m.rows(),
248            m.cols(),
249            m.nnz()
250        );
251
252        let mut row_idx = Vec::with_capacity(m.nnz());
253        let mut col_idx = Vec::with_capacity(m.nnz());
254        let mut vals = Vec::with_capacity(m.nnz());
255
256        for (v, (r, c)) in m.iter() {
257            row_idx.push(r as u32);
258            col_idx.push(c as u32);
259            vals.push(*v);
260        }
261
262        // Store actual dimensions in schema metadata
263        let mut schema_metadata = std::collections::HashMap::new();
264        schema_metadata.insert("rows".to_string(), m.rows().to_string());
265        schema_metadata.insert("cols".to_string(), m.cols().to_string());
266        schema_metadata.insert("nnz".to_string(), m.nnz().to_string());
267
268        let schema = Schema::new(vec![
269            Field::new("row", DataType::UInt32, false),
270            Field::new("col", DataType::UInt32, false),
271            Field::new("value", DataType::Float64, false),
272        ])
273        .with_metadata(schema_metadata);
274
275        let batch = RecordBatch::try_new(
276            Arc::new(schema),
277            vec![
278                Arc::new(UInt32Array::from(row_idx)) as _,
279                Arc::new(UInt32Array::from(col_idx)) as _,
280                Arc::new(Float64Array::from(vals)) as _,
281            ],
282        )
283        .map_err(|e| StorageError::Lance(e.to_string()))?;
284
285        trace!(
286            "Sparse RecordBatch created with {} entries",
287            batch.num_rows()
288        );
289        Ok(batch)
290    }
291
292    /// Reconstructs a sparse CSR matrix from a RecordBatch in columnar format.
293    ///
294    /// * `batch` - RecordBatch containing (`row`, `col`, `value`) triplets
295    /// * `expected_rows` / `expected_cols` - dimensions taken from metadata
296    #[allow(clippy::wrong_self_convention)]
297    fn from_sparse_record_batch(
298        &self,
299        batch: RecordBatch,
300        expected_rows: usize,
301        expected_cols: usize,
302    ) -> StorageResult<CsMat<f64>> {
303        use arrow::array::UInt32Array;
304
305        debug!("Reconstructing sparse matrix from RecordBatch");
306
307        let row_arr = batch
308            .column(0)
309            .as_any()
310            .downcast_ref::<UInt32Array>()
311            .ok_or_else(|| StorageError::Invalid("row column type mismatch".into()))?;
312        let col_arr = batch
313            .column(1)
314            .as_any()
315            .downcast_ref::<UInt32Array>()
316            .ok_or_else(|| StorageError::Invalid("col column type mismatch".into()))?;
317        let val_arr = batch
318            .column(2)
319            .as_any()
320            .downcast_ref::<Float64Array>()
321            .ok_or_else(|| StorageError::Invalid("value column type mismatch".into()))?;
322
323        let n = row_arr.len();
324        if n == 0 {
325            debug!(
326                "Empty RecordBatch, returning {}x{} sparse matrix",
327                expected_rows, expected_cols
328            );
329            return Ok(CsMat::zero((expected_rows, expected_cols)));
330        }
331
332        // Try to read dimensions from schema metadata (for validation)
333        let schema = batch.schema();
334        let schema_metadata = schema.metadata();
335        if let (Some(rows_str), Some(cols_str)) =
336            (schema_metadata.get("rows"), schema_metadata.get("cols"))
337        {
338            let schema_rows = rows_str.parse::<usize>().ok();
339            let schema_cols = cols_str.parse::<usize>().ok();
340            if schema_rows != Some(expected_rows) || schema_cols != Some(expected_cols) {
341                panic!(
342                    "Schema metadata dimensions ({:?}x{:?}) don't match storage metadata ({}x{})",
343                    schema_rows, schema_cols, expected_rows, expected_cols
344                );
345            } else {
346                debug!(
347                    "Schema metadata matches storage metadata: {}x{}",
348                    expected_rows, expected_cols
349                );
350            }
351        }
352
353        let rows = expected_rows;
354        let cols = expected_cols;
355        debug!(
356            "Reconstructing {}x{} sparse matrix from {} entries",
357            rows, cols, n
358        );
359
360        let mut trimat = TriMat::new((rows, cols));
361        for i in 0..n {
362            let r = row_arr.value(i) as usize;
363            let c = col_arr.value(i) as usize;
364            let v = val_arr.value(i);
365
366            if r >= rows || c >= cols {
367                return Err(StorageError::Invalid(format!(
368                    "Index out of bounds: ({}, {}) in {}x{} matrix",
369                    r, c, rows, cols
370                )));
371            }
372            trimat.add_triplet(r, c, v);
373        }
374
375        let result = trimat.to_csr();
376        if result.rows() != rows || result.cols() != cols {
377            return Err(StorageError::Invalid(format!(
378                "Dimension mismatch after reconstruction: expected {}x{}, got {}x{}",
379                rows,
380                cols,
381                result.rows(),
382                result.cols()
383            )));
384        }
385
386        Ok(result)
387    }
388
389    /// Async helper: write a RecordBatch to a Lance dataset.
390    async fn write_lance_batch_async(&self, path: &Path, batch: RecordBatch) -> StorageResult<()> {
391        let uri = Self::path_to_uri(path);
392        info!("Writing Lance dataset to {}", uri);
393
394        let schema = batch.schema();
395        let batches = vec![batch];
396        let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
397
398        let params = WriteParams {
399            mode: WriteMode::Create,
400            ..WriteParams::default()
401        };
402
403        Dataset::write(reader, &uri, Some(params))
404            .await
405            .map_err(|e| StorageError::Lance(e.to_string()))?;
406
407        info!("Successfully wrote Lance dataset to {}", uri);
408        Ok(())
409    }
410
411    /// Async helper: read and concatenate all RecordBatches from a Lance dataset.
412    async fn read_lance_all_batches_async(&self, path: &Path) -> StorageResult<RecordBatch> {
413        let uri = Self::path_to_uri(path);
414        info!("Reading Lance dataset from {}", uri);
415
416        let dataset = Dataset::open(&uri)
417            .await
418            .map_err(|e| StorageError::Lance(e.to_string()))?;
419        let scanner = dataset.scan();
420        let mut stream = scanner
421            .try_into_stream()
422            .await
423            .map_err(|e| StorageError::Lance(e.to_string()))?;
424
425        let mut batches = Vec::new();
426        while let Some(batch_result) = stream.next().await {
427            let batch = batch_result.map_err(|e| StorageError::Lance(e.to_string()))?;
428            batches.push(batch);
429        }
430
431        if batches.is_empty() {
432            return Err(StorageError::Invalid("Empty Lance dataset".into()));
433        }
434
435        let schema = batches[0].schema();
436        let combined = arrow::compute::concat_batches(&schema, &batches)
437            .map_err(|e| StorageError::Lance(format!("Failed to concatenate batches: {}", e)))?;
438
439        debug!(
440            "Combined Lance batch for {:?} has {} rows",
441            path,
442            combined.num_rows()
443        );
444        Ok(combined)
445    }
446
447    /// Async helper: read the first RecordBatch from a Lance dataset.
448    async fn read_lance_first_batch_async(&self, path: &Path) -> StorageResult<RecordBatch> {
449        let uri = Self::path_to_uri(path);
450        info!("Reading first batch from Lance dataset {}", uri);
451
452        let dataset = Dataset::open(&uri)
453            .await
454            .map_err(|e| StorageError::Lance(e.to_string()))?;
455        let scanner = dataset.scan();
456        let mut stream = scanner
457            .try_into_stream()
458            .await
459            .map_err(|e| StorageError::Lance(e.to_string()))?;
460
461        let batch = stream
462            .next()
463            .await
464            .ok_or_else(|| StorageError::Lance("empty Lance dataset".to_string()))?
465            .map_err(|e| StorageError::Lance(e.to_string()))?;
466
467        debug!(
468            "Read first RecordBatch for path {:?} with {} rows",
469            path,
470            batch.num_rows()
471        );
472        Ok(batch)
473    }
474}
475
476impl StorageBackend for LanceStorage {
477    fn get_base(&self) -> String {
478        self._base.clone()
479    }
480
481    fn get_name(&self) -> String {
482        self._name.clone()
483    }
484
485    fn base_path(&self) -> PathBuf {
486        PathBuf::from(&self._base)
487    }
488
489    fn metadata_path(&self) -> PathBuf {
490        self.base_path()
491            .join(format!("{}_metadata.json", self._name))
492    }
493
494    /// Converts the base path for the store to a `file://` URI for Lance.
495    fn basepath_to_uri(&self) -> String {
496        Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
497    }
498
499    /// Converts a full file path to a `file://` URI for Lance.
500    fn path_to_uri(path: &Path) -> String {
501        path.canonicalize()
502            .unwrap_or_else(|_| {
503                if path.is_absolute() {
504                    path.to_path_buf()
505                } else if path.is_relative() {
506                    std::env::current_dir()
507                        .unwrap_or_else(|_| PathBuf::from("/"))
508                        .join(path)
509                } else {
510                    PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(path)
511                }
512            })
513            .to_string_lossy()
514            .to_string()
515    }
516
517    // Add these methods to impl LanceStorage {} block
518
519    /// Save dense matrix using Lance-optimized vector format.
520    ///
521    /// Each row of the matrix becomes a FixedSizeList entry for efficient vector operations.
522    /// This format is optimized for vector search and enables Lance's full-zip encoding.
523    ///
524    /// # Arguments
525    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
526    /// * `matrix` - Dense matrix to save (N rows × F cols)
527    /// * `md_path` - Metadata file path for validation
528    async fn save_dense(
529        &self,
530        key: &str,
531        matrix: &DenseMatrix<f64>,
532        md_path: &Path,
533    ) -> StorageResult<()> {
534        self.validate_initialized(md_path)?;
535        let path = self.file_path(key);
536        let (n_rows, n_cols) = matrix.shape();
537
538        info!(
539            "Saving dense {} matrix: {} x {} at {:?}",
540            key, n_rows, n_cols, path
541        );
542
543        // Convert to Lance-optimized RecordBatch (FixedSizeList format)
544        let batch = self.to_dense_record_batch(matrix)?;
545
546        // Verify batch has correct number of rows
547        if batch.num_rows() != n_rows {
548            return Err(StorageError::Invalid(format!(
549                "RecordBatch has {} rows but matrix has {} rows",
550                batch.num_rows(),
551                n_rows
552            )));
553        }
554
555        // Write to Lance
556        self.write_lance_batch_async(&path, batch).await?;
557
558        info!("Dense {} matrix saved successfully", key);
559        Ok(())
560    }
561
562    /// Load dense matrix from Lance-optimized vector format.
563    ///
564    /// Reads FixedSizeList vectors and reconstructs a column-major DenseMatrix.
565    ///
566    /// # Arguments
567    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
568    ///
569    /// # Returns
570    /// Column-major DenseMatrix matching smartcore conventions
571    async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
572        let path = self.file_path(key);
573        info!("Loading dense {} matrix from {:?}", key, path);
574
575        // Read all batches from Lance (may span multiple batches for large datasets)
576        let batch = self.read_lance_all_batches_async(&path).await?;
577
578        // Convert from FixedSizeList format to DenseMatrix
579        let matrix = self.from_dense_record_batch(&batch)?;
580
581        let (n_rows, n_cols) = matrix.shape();
582        info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
583
584        Ok(matrix)
585    }
586
587    /// Load initial data using columnar format from a file path.
588    ///
589    /// Async test helper that avoids any internal blocking runtimes.
590    async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
591        info!("Loading dense matrix from file (async): {:?}", path);
592
593        if !path.exists() {
594            return Err(StorageError::Invalid(format!(
595                "Dense file does not exist: {:?}",
596                path
597            )));
598        }
599
600        let extension = path
601            .extension()
602            .and_then(|e| e.to_str())
603            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
604
605        match extension {
606            "lance" => {
607                // Use a temporary LanceStorage rooted at the file's parent dir,
608                // same pattern as save_dense_to_file_async.
609                let parent = path
610                    .parent()
611                    .ok_or_else(|| {
612                        StorageError::Invalid(format!("Path has no parent: {:?}", path))
613                    })?
614                    .to_str()
615                    .ok_or_else(|| {
616                        StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
617                    })?
618                    .to_string();
619
620                let tmp_storage =
621                    crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
622
623                // Reuse the async Lance reader logic.
624                let batch = tmp_storage.read_lance_all_batches_async(path).await?;
625                let matrix = tmp_storage.from_dense_record_batch(&batch)?;
626                info!(
627                    "Loaded dense matrix from Lance: {} x {}",
628                    matrix.shape().0,
629                    matrix.shape().1
630                );
631                Ok(matrix)
632            }
633            "parquet" => {
634                use arrow::datatypes::DataType;
635                use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
636                use std::fs::File;
637
638                // 1. Read from Parquet into a single RecordBatch
639                let file = File::open(path)
640                    .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
641
642                let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
643                    StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
644                })?;
645                let mut reader = builder.build().map_err(|e| {
646                    StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
647                })?;
648
649                let mut batches = Vec::new();
650                #[allow(clippy::while_let_on_iterator)]
651                while let Some(batch) = reader.next() {
652                    let batch = batch.map_err(|e| {
653                        StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
654                    })?;
655                    batches.push(batch);
656                }
657
658                if batches.is_empty() {
659                    return Err(StorageError::Invalid(format!(
660                        "Empty parquet dataset at {:?}",
661                        path
662                    )));
663                }
664
665                let schema = batches[0].schema();
666                let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
667                    StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
668                })?;
669
670                // 2. Detect layout: vector (FixedSizeList) vs old wide columnar (col_* Float64)
671                let fields = schema.fields();
672                let is_vector = fields.len() == 1
673                    && matches!(
674                        fields[0].data_type(),
675                        DataType::FixedSizeList(inner, _)
676                            if matches!(inner.data_type(), DataType::Float64)
677                    );
678
679                let is_wide_col = !is_vector
680                    && !fields.is_empty()
681                    && fields
682                        .iter()
683                        .all(|f| matches!(f.data_type(), DataType::Float64))
684                    && fields.iter().any(|f| f.name().starts_with("col_"));
685
686                // 3. Build DenseMatrix from the RecordBatch
687                let matrix = if is_vector {
688                    // New format already: vector column (FixedSizeList<Float64>)
689                    // Reuse the same decoding as Lance.
690                    let parent = path
691                        .parent()
692                        .ok_or_else(|| {
693                            StorageError::Invalid(format!("Path has no parent: {:?}", path))
694                        })?
695                        .to_str()
696                        .ok_or_else(|| {
697                            StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
698                        })?
699                        .to_string();
700
701                    let tmp_storage =
702                        crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
703                    tmp_storage.from_dense_record_batch(&combined)?
704                } else if is_wide_col {
705                    // Old wide columnar: columns like col_0, col_1, ... as Float64
706                    let n_rows = combined.num_rows();
707                    let n_cols = combined.num_columns();
708                    if n_rows == 0 || n_cols == 0 {
709                        return Err(StorageError::Invalid(format!(
710                            "Cannot load empty wide-column parquet at {:?}",
711                            path
712                        )));
713                    }
714
715                    let mut data = Vec::with_capacity(n_rows * n_cols);
716                    for col_idx in 0..n_cols {
717                        let col = combined.column(col_idx);
718                        let arr = col
719                            .as_any()
720                            .downcast_ref::<arrow_array::Float64Array>()
721                            .ok_or_else(|| {
722                                StorageError::Invalid(format!(
723                                    "Wide-column parquet expects Float64, got {:?} in column {}",
724                                    col.data_type(),
725                                    col_idx
726                                ))
727                            })?;
728                        // Build column-major storage: all rows for col 0, then col 1, ...
729                        for row_idx in 0..n_rows {
730                            data.push(arr.value(row_idx));
731                        }
732                    }
733
734                    DenseMatrix::new(n_rows, n_cols, data, true)
735                        .map_err(|e| StorageError::Invalid(e.to_string()))?
736                } else {
737                    return Err(StorageError::Invalid(format!(
738                        "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
739                         or wide Float64 columns named col_*",
740                        path
741                    )));
742                };
743
744                info!(
745                    "Loaded dense matrix from Parquet: {} x {}",
746                    matrix.shape().0,
747                    matrix.shape().1
748                );
749
750                Ok(matrix)
751            }
752            _ => Err(StorageError::Invalid(format!(
753                "Unsupported file format: {}. Only .lance and .parquet are supported",
754                extension
755            ))),
756        }
757    }
758
759    fn file_path(&self, key: &str) -> PathBuf {
760        self.base_path()
761            .join(format!("{}_{}.lance", self._name, key))
762    }
763
764    // =========
765    // ASYNC API (matches StorageBackend)
766    // =========
767
768    async fn save_sparse(
769        &self,
770        key: &str,
771        matrix: &CsMat<f64>,
772        md_path: &Path,
773    ) -> StorageResult<()> {
774        self.validate_initialized(md_path)?;
775        let path = self.file_path(key);
776        info!(
777            "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
778            key,
779            matrix.rows(),
780            matrix.cols(),
781            matrix.nnz(),
782            path
783        );
784
785        let mut metadata = self.load_metadata().await?;
786        let filetype = FileInfo::which_filetype(key);
787        metadata.files.insert(
788            key.to_string(),
789            FileInfo {
790                filename: format!("{}_{}.lance", self.get_name(), key),
791                filetype: filetype.to_string(),
792                storage_format: FileInfo::which_format(&filetype),
793                rows: matrix.rows(),
794                cols: matrix.cols(),
795                nnz: Some(matrix.nnz()),
796                size_bytes: None,
797            },
798        );
799        self.save_metadata(&metadata).await?;
800
801        let batch = self.to_sparse_record_batch(matrix)?;
802        self.write_lance_batch_async(&path, batch).await?;
803        info!("Sparse matrix {} saved successfully", filetype);
804        Ok(())
805    }
806
807    async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
808        info!("Loading sparse {} matrix", key);
809
810        let metadata = self.load_metadata().await?;
811        let filetype = FileInfo::which_filetype(key);
812        let file_info = metadata
813            .files
814            .get(key)
815            .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
816
817        let expected_rows = file_info.rows;
818        let expected_cols = file_info.cols;
819        debug!(
820            "Expected dimensions from storage metadata: {} x {}",
821            expected_rows, expected_cols
822        );
823
824        let path = self.file_path(key);
825        let batch = self.read_lance_first_batch_async(&path).await?;
826        let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
827        info!(
828            "Sparse {} matrix loaded: {} x {}, nnz={}",
829            filetype,
830            matrix.rows(),
831            matrix.cols(),
832            matrix.nnz()
833        );
834        Ok(matrix)
835    }
836
837    async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
838        self.validate_initialized(md_path)?;
839        let path = self.file_path("lambdas");
840        info!("Saving {} lambda values", lambdas.len());
841
842        let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
843        let batch = RecordBatch::try_new(
844            Arc::new(schema),
845            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
846        )
847        .map_err(|e| StorageError::Lance(e.to_string()))?;
848
849        self.write_lance_batch_async(&path, batch).await?;
850        info!("Lambda values saved successfully");
851        Ok(())
852    }
853
854    async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
855        let path = self.file_path("lambdas");
856        info!("Loading lambda values from {:?}", path);
857
858        let batch = self.read_lance_first_batch_async(&path).await?;
859        let arr = batch
860            .column(0)
861            .as_any()
862            .downcast_ref::<Float64Array>()
863            .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
864
865        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
866        info!("Loaded {} lambda values", lambdas.len());
867        Ok(lambdas)
868    }
869
870    async fn save_metadata(&self, metadata: &GeneMetadata) -> StorageResult<PathBuf> {
871        let path = self.metadata_path();
872        info!("Saving metadata to {:?}", path);
873        fs::create_dir_all(self.base_path()).map_err(|e| StorageError::Io(e.to_string()))?;
874        let s = serde_json::to_string_pretty(metadata).map_err(StorageError::Serde)?;
875        fs::write(&path, s).map_err(|e| StorageError::Io(e.to_string()))?;
876        info!("Metadata saved successfully");
877        Ok(path)
878    }
879
880    async fn load_metadata(&self) -> StorageResult<GeneMetadata> {
881        let filename = self.metadata_path();
882        info!("Loading metadata from {:?}", filename);
883        let s = fs::read_to_string(filename).map_err(|e| StorageError::Io(e.to_string()))?;
884        let md: GeneMetadata = serde_json::from_str(&s).map_err(StorageError::Serde)?;
885        info!("Metadata loaded successfully");
886        Ok(md)
887    }
888
889    async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
890        self.validate_initialized(md_path)?;
891        let path = self.file_path(key);
892        info!("Saving {} values for vector {}", vector.len(), key);
893
894        let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
895        let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
896        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
897            .map_err(|e| StorageError::Lance(e.to_string()))?;
898
899        self.write_lance_batch_async(&path, batch).await?;
900        info!("Index {} saved successfully", key);
901        Ok(())
902    }
903
904    async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
905        self.validate_initialized(md_path)?;
906        let path = self.file_path(key);
907        info!("Saving {} values for index {}", vector.len(), key);
908
909        let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
910        let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
911        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
912            .map_err(|e| StorageError::Lance(e.to_string()))?;
913
914        self.write_lance_batch_async(&path, batch).await?;
915        info!("Index {} saved successfully", key);
916        Ok(())
917    }
918
919    async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
920        let path = self.file_path(filename);
921        info!("Loading vector {} from {:?}", filename, path);
922
923        let batch = self.read_lance_first_batch_async(&path).await?;
924        let arr = batch
925            .column(0)
926            .as_any()
927            .downcast_ref::<Float64Array>()
928            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
929
930        let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
931        info!("Loaded {} vector values for {}", vector.len(), filename);
932        Ok(vector)
933    }
934
935    async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
936        let path = self.file_path(filename);
937        info!("Loading vector {} from {:?}", filename, path);
938
939        let batch = self.read_lance_first_batch_async(&path).await?;
940        let arr = batch
941            .column(0)
942            .as_any()
943            .downcast_ref::<UInt32Array>()
944            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
945
946        let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
947        info!("Loaded {} vector values for {}", vector.len(), filename);
948        Ok(vector)
949    }
950
951    /// Save dense matrix to file in columnar format (col_0, col_1, ..., col_N)
952    ///
953    /// Async test helper that avoids any internal blocking runtimes.
954    #[cfg(test)]
955    async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &PathBuf) -> StorageResult<()> {
956        use tokio::fs as tokio_fs;
957
958        info!("Saving dense matrix to file (async): {:?}", path);
959
960        // Ensure parent dir exists for the test file.
961        if let Some(parent) = path.parent() {
962            tokio_fs::try_exists(parent).await.map_err(|e| {
963                StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
964            })?;
965        }
966
967        // Create a temporary storage only to store the file.
968        let tmp_storage = LanceStorage::new(
969            String::from(path.parent().unwrap().to_str().unwrap()),
970            String::from("tmp_storage"),
971        );
972
973        let extension = path
974            .extension()
975            .and_then(|e| e.to_str())
976            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
977
978        let (n_rows, n_cols) = data.shape();
979        info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
980
981        match extension {
982            "lance" => {
983                let batch = tmp_storage.to_dense_record_batch(data)?;
984                debug!(
985                    "Created RecordBatch with {} rows for Lance",
986                    batch.num_rows()
987                );
988
989                // Verify all rows are in the batch
990                if batch.num_rows() != n_rows {
991                    return Err(StorageError::Invalid(format!(
992                        "RecordBatch has {} rows but matrix has {} rows",
993                        batch.num_rows(),
994                        n_rows
995                    )));
996                }
997
998                tmp_storage.write_lance_batch_async(path, batch).await?;
999                info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
1000                Ok(())
1001            }
1002            "parquet" => {
1003                use parquet::arrow::ArrowWriter;
1004                use parquet::file::properties::WriterProperties;
1005                use std::fs::File;
1006
1007                // For tests we still use sync parquet writer; directory was created with tokio_fs.
1008                let batch = tmp_storage.to_dense_record_batch(data)?;
1009                debug!(
1010                    "Created RecordBatch with {} rows for Parquet",
1011                    batch.num_rows()
1012                );
1013
1014                if batch.num_rows() != n_rows {
1015                    return Err(StorageError::Invalid(format!(
1016                        "RecordBatch has {} rows but matrix has {} rows",
1017                        batch.num_rows(),
1018                        n_rows
1019                    )));
1020                }
1021
1022                let file = File::create(path).map_err(|e| {
1023                    StorageError::Io(format!("Failed to create parquet file: {}", e))
1024                })?;
1025
1026                let props = WriterProperties::builder()
1027                    .set_compression(parquet::basic::Compression::SNAPPY)
1028                    .build();
1029
1030                let mut writer =
1031                    ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
1032                        StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
1033                    })?;
1034
1035                writer
1036                    .write(&batch)
1037                    .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
1038
1039                writer
1040                    .close()
1041                    .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
1042
1043                info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
1044                Ok(())
1045            }
1046            _ => Err(StorageError::Invalid(format!(
1047                "Unsupported file format: {}. Only .lance and .parquet are supported",
1048                extension
1049            ))),
1050        }
1051    }
1052
1053    /// Save centroid_map (item-to-centroid assignments)
1054    async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
1055        self.validate_initialized(md_path)?;
1056        let path = self.file_path("centroid_map");
1057        info!("Saving {} centroid map entries", map.len());
1058
1059        let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
1060        let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
1061        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
1062            .map_err(|e| StorageError::Lance(e.to_string()))?;
1063
1064        self.write_lance_batch_async(&path, batch).await?;
1065        info!("Centroid map saved successfully");
1066        Ok(())
1067    }
1068
1069    /// Load centroid_map
1070    async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
1071        let path = self.file_path("centroid_map");
1072        info!("Loading centroid map from {:?}", path);
1073
1074        let batch = self.read_lance_first_batch_async(&path).await?;
1075        let arr = batch
1076            .column(0)
1077            .as_any()
1078            .downcast_ref::<UInt32Array>()
1079            .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
1080
1081        let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
1082        info!("Loaded {} centroid map entries", map.len());
1083        Ok(map)
1084    }
1085
1086    /// Save subcentroid_lambdas (tau values for subcentroids)
1087    async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
1088        self.validate_initialized(md_path)?;
1089        let path = self.file_path("subcentroid_lambdas");
1090        info!("Saving {} subcentroid lambda values", lambdas.len());
1091
1092        let schema = Schema::new(vec![Field::new(
1093            "subcentroid_lambda",
1094            DataType::Float64,
1095            false,
1096        )]);
1097        let batch = RecordBatch::try_new(
1098            Arc::new(schema),
1099            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
1100        )
1101        .map_err(|e| StorageError::Lance(e.to_string()))?;
1102
1103        self.write_lance_batch_async(&path, batch).await?;
1104        info!("Subcentroid lambda values saved successfully");
1105        Ok(())
1106    }
1107
1108    /// Load subcentroid_lambdas
1109    async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
1110        let path = self.file_path("subcentroid_lambdas");
1111        info!("Loading subcentroid lambda values from {:?}", path);
1112
1113        let batch = self.read_lance_first_batch_async(&path).await?;
1114        let arr = batch
1115            .column(0)
1116            .as_any()
1117            .downcast_ref::<Float64Array>()
1118            .ok_or_else(|| {
1119                StorageError::Invalid("subcentroid_lambda column type mismatch".into())
1120            })?;
1121
1122        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1123        info!("Loaded {} subcentroid lambda values", lambdas.len());
1124        Ok(lambdas)
1125    }
1126
1127    /// Save subcentroids (dense matrix)
1128    async fn save_subcentroids(
1129        &self,
1130        subcentroids: &DenseMatrix<f64>,
1131        md_path: &Path,
1132    ) -> StorageResult<()> {
1133        self.validate_initialized(md_path)?;
1134        let path = self.file_path("sub_centroids");
1135        let (n_rows, n_cols) = subcentroids.shape();
1136        info!(
1137            "Saving subcentroids matrix {} x {} at {:?}",
1138            n_rows, n_cols, path
1139        );
1140
1141        let batch = self.to_dense_record_batch(subcentroids)?;
1142        self.write_lance_batch_async(&path, batch).await?;
1143        debug!("Subcentroids matrix saved successfully");
1144        Ok(())
1145    }
1146
1147    /// Load subcentroids as Vec<Vec<f64>>
1148    async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
1149        let path = self.file_path("sub_centroids");
1150        info!("Loading sub_centroids from {:?}", path);
1151
1152        let batch = self.read_lance_all_batches_async(&path).await?;
1153        let matrix = self.from_dense_record_batch(&batch)?;
1154
1155        // Convert DenseMatrix to Vec<Vec<f64>>
1156        let (n_rows, n_cols) = matrix.shape();
1157        let mut result = Vec::with_capacity(n_rows);
1158
1159        for row_idx in 0..n_rows {
1160            let row: Vec<f64> = (0..n_cols)
1161                .map(|col_idx| *matrix.get((row_idx, col_idx)))
1162                .collect();
1163            result.push(row);
1164        }
1165
1166        info!(
1167            "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
1168            n_rows, n_cols
1169        );
1170        Ok(result)
1171    }
1172
1173    /// Save item norms vector
1174    async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
1175        self.validate_initialized(md_path)?;
1176        let path = self.file_path("item_norms");
1177        info!("Saving {} item norm values", item_norms.len());
1178
1179        let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
1180        let batch = RecordBatch::try_new(
1181            Arc::new(schema),
1182            vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
1183        )
1184        .map_err(|e| StorageError::Lance(e.to_string()))?;
1185
1186        self.write_lance_batch_async(&path, batch).await?;
1187        info!("Item norms saved successfully");
1188        Ok(())
1189    }
1190
1191    /// Load item norms vector
1192    async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
1193        let path = self.file_path("item_norms");
1194        info!("Loading item norms from {:?}", path);
1195
1196        let batch = self.read_lance_first_batch_async(&path).await?;
1197        let arr = batch
1198            .column(0)
1199            .as_any()
1200            .downcast_ref::<Float64Array>()
1201            .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
1202
1203        let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1204        info!("Loaded {} item norm values", norms.len());
1205        Ok(norms)
1206    }
1207
1208    async fn save_cluster_assignments(
1209        &self,
1210        assignments: &[Option<usize>],
1211        md_path: &Path,
1212    ) -> StorageResult<()> {
1213        self.validate_initialized(md_path)?;
1214        let path = self.file_path("cluster_assignments");
1215        info!("Saving {} cluster assignments", assignments.len());
1216
1217        // Convert Option<usize> to i64 (-1 for None)
1218        let values: Vec<i64> = assignments
1219            .iter()
1220            .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
1221            .collect();
1222
1223        let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
1224        let batch = RecordBatch::try_new(
1225            Arc::new(schema),
1226            vec![Arc::new(Int64Array::from(values)) as _],
1227        )
1228        .map_err(|e| StorageError::Lance(e.to_string()))?;
1229
1230        self.write_lance_batch_async(&path, batch).await?;
1231        info!("Cluster assignments saved successfully");
1232        Ok(())
1233    }
1234
1235    async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
1236        let path = self.file_path("cluster_assignments");
1237        info!("Loading cluster assignments from {:?}", path);
1238
1239        let batch = self.read_lance_first_batch_async(&path).await?;
1240        let arr = batch
1241            .column(0)
1242            .as_any()
1243            .downcast_ref::<Int64Array>()
1244            .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
1245
1246        let assignments: Vec<Option<usize>> = (0..arr.len())
1247            .map(|i| {
1248                let val = arr.value(i);
1249                if val < 0 { None } else { Some(val as usize) }
1250            })
1251            .collect();
1252
1253        info!("Loaded {} cluster assignments", assignments.len());
1254        Ok(assignments)
1255    }
1256}