genegraph_storage/
lance_storage_graph.rs

1//! Lance storage backend for graph embeddings.
2//!
3//! Async-first implementation that matches the async `StorageBackend` trait:
4//! - All I/O is async, no internal `block_on` or runtime creation.
5//! - Callers (CLI, tests, services) are responsible for providing a Tokio runtime.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::{Float64Array, Int64Array, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow_array::{Array as ArrowArray, RecordBatch};
13use log::{debug, info};
14use smartcore::linalg::basic::arrays::Array;
15use smartcore::linalg::basic::matrix::DenseMatrix;
16use sprs::CsMat;
17
18use crate::metadata::FileInfo;
19use crate::metadata::GeneMetadata;
20use crate::traits::backend::StorageBackend;
21use crate::traits::lance::LanceStorage;
22use crate::traits::metadata::Metadata;
23use crate::{StorageError, StorageResult};
24
25/// Lance-based storage backend for ArrowSpace graph embeddings.
26///
27/// Stores dense and sparse matrices as Lance datasets using a columnar format
28/// (`row`, `col`, `value` for sparse; `col_*` for dense) schema for efficient
29/// random and columnar access.
30#[derive(Debug, Clone)]
31pub struct LanceStorageGraph {
32    pub(crate) _base: String,
33    pub(crate) _name: String,
34}
35
36impl LanceStorageGraph {
37    /// Creates a new Lance storage backend.
38    ///
39    /// This is used for on-the-fly creation. For proper setup use `Genefold<...>::seed`.
40    ///
41    /// # Arguments
42    ///
43    /// * `_base` - Base directory path for all storage files
44    /// * `_name` - Name prefix for this storage instance
45    pub fn new(_base: String, _name: String) -> Self {
46        info!("Creating LanceStorage at base={}, name={}", _base, _name);
47        Self { _base, _name }
48    }
49
50    /// Spawn a LanceStorage from an existing seeded directory (with metadata.json)
51    pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
52        // Reuse the generic `exists` helper from the StorageBackend trait
53        let (exists, md_path) = Self::exists(&base_path);
54
55        // Replace assert! with proper error handling
56        if !exists || md_path.is_none() {
57            return Err(StorageError::Invalid(format!(
58                "Metadata does not exist in base path: {}",
59                base_path
60            )));
61        }
62
63        // Load metadata from the discovered metadata.json
64        let metadata = GeneMetadata::read(md_path.unwrap()).await?;
65
66        // Construct the LanceStorage using the metadata-provided nameid
67        let storage = Self::new(base_path.clone(), metadata.name_id.clone());
68        Ok((storage, metadata))
69    }
70}
71
72impl LanceStorage for LanceStorageGraph {}
73
74impl StorageBackend for LanceStorageGraph {
75    fn get_base(&self) -> String {
76        self._base.clone()
77    }
78
79    fn get_name(&self) -> String {
80        self._name.clone()
81    }
82
83    fn base_path(&self) -> PathBuf {
84        PathBuf::from(&self._base)
85    }
86
87    fn metadata_path(&self) -> PathBuf {
88        self.base_path()
89            .join(format!("{}_metadata.json", self._name))
90    }
91
92    /// Converts the base path for the store to a `file://` URI for Lance.
93    fn basepath_to_uri(&self) -> String {
94        Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
95    }
96
97    /// Save dense matrix using Lance-optimized vector format.
98    ///
99    /// Each row of the matrix becomes a FixedSizeList entry for efficient vector operations.
100    /// This format is optimized for vector search and enables Lance's full-zip encoding.
101    ///
102    /// # Arguments
103    /// * `filename` - any name
104    /// * `matrix` - Dense matrix to save (N rows × F cols)
105    /// * `md_path` - Metadata file path for validation
106    async fn save_dense(
107        &self,
108        key: &str,
109        matrix: &DenseMatrix<f64>,
110        md_path: &Path,
111    ) -> StorageResult<()> {
112        self.validate_initialized(md_path)?;
113        let path = self.file_path(key);
114        let (n_rows, n_cols) = matrix.shape();
115
116        info!(
117            "Saving dense {} matrix: {} x {} at {:?}",
118            key, n_rows, n_cols, path
119        );
120
121        // Convert to Lance-optimized RecordBatch (FixedSizeList format)
122        let batch = self.to_dense_record_batch(matrix)?;
123
124        // Verify batch has correct number of rows
125        if batch.num_rows() != n_rows {
126            return Err(StorageError::Invalid(format!(
127                "RecordBatch has {} rows but matrix has {} rows",
128                batch.num_rows(),
129                n_rows
130            )));
131        }
132
133        {
134            // Write to Lance
135            let uri = Self::path_to_uri(&path);
136            self.write_lance_batch_async(uri, batch).await?;
137            let mut md = self.load_metadata().await?;
138            md = md.add_file(
139                key,
140                FileInfo::new(
141                    format!("{}_{}.lance", self.get_name(), key),
142                    "dense",
143                    matrix.shape(),
144                    None,
145                    None,
146                ),
147            );
148            self.save_metadata(&md).await?;
149            info!("Dense {} matrix saved successfully", key);
150        }
151        Ok(())
152    }
153
154    /// Load dense matrix from Lance-optimized vector format.
155    ///
156    /// Reads FixedSizeList vectors and reconstructs a column-major DenseMatrix.
157    ///
158    /// # Arguments
159    /// * `filename` - any name previously assigned
160    ///
161    /// # Returns
162    /// Column-major DenseMatrix matching smartcore conventions
163    async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
164        let path = self.file_path(key);
165        info!("Loading dense {} matrix from {:?}", key, path);
166
167        // Read all batches from Lance (may span multiple batches for large datasets)
168        let uri = Self::path_to_uri(&path);
169        let batch = self.read_lance_all_batches_async(uri).await?;
170
171        // Convert from FixedSizeList format to DenseMatrix
172        let matrix = self.from_dense_record_batch(&batch)?;
173
174        let (n_rows, n_cols) = matrix.shape();
175        info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
176
177        Ok(matrix)
178    }
179
180    /// Load initial data using columnar format from a file path.
181    ///
182    /// Async test helper that avoids any internal blocking runtimes.
183    async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
184        info!("Loading dense matrix from file (async): {:?}", path);
185
186        if !path.exists() {
187            return Err(StorageError::Invalid(format!(
188                "Dense file does not exist: {:?}",
189                path
190            )));
191        }
192
193        let extension = path
194            .extension()
195            .and_then(|e| e.to_str())
196            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
197
198        match extension {
199            "lance" => {
200                // Use a temporary LanceStorage rooted at the file's parent dir,
201                // same pattern as save_dense_to_file_async.
202                let parent = path
203                    .parent()
204                    .ok_or_else(|| {
205                        StorageError::Invalid(format!("Path has no parent: {:?}", path))
206                    })?
207                    .to_str()
208                    .ok_or_else(|| {
209                        StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
210                    })?
211                    .to_string();
212
213                let tmp_storage = Self::new(parent, String::from("tmp_storage"));
214
215                // Reuse the async Lance reader logic.
216                let uri = Self::path_to_uri(path);
217                let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
218                let matrix = tmp_storage.from_dense_record_batch(&batch)?;
219                info!(
220                    "Loaded dense matrix from Lance: {} x {}",
221                    matrix.shape().0,
222                    matrix.shape().1
223                );
224                Ok(matrix)
225            }
226            "parquet" => {
227                use arrow::datatypes::DataType;
228                use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
229                use std::fs::File;
230
231                // 1. Read from Parquet into a single RecordBatch
232                let file = File::open(path)
233                    .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
234
235                let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
236                    StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
237                })?;
238                let mut reader = builder.build().map_err(|e| {
239                    StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
240                })?;
241
242                let mut batches = Vec::new();
243                #[allow(clippy::while_let_on_iterator)]
244                while let Some(batch) = reader.next() {
245                    let batch = batch.map_err(|e| {
246                        StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
247                    })?;
248                    batches.push(batch);
249                }
250
251                if batches.is_empty() {
252                    return Err(StorageError::Invalid(format!(
253                        "Empty parquet dataset at {:?}",
254                        path
255                    )));
256                }
257
258                let schema = batches[0].schema();
259                let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
260                    StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
261                })?;
262
263                // 2. Detect layout: vector (FixedSizeList) vs old wide columnar (col_* Float64)
264                let fields = schema.fields();
265                let is_vector = fields.len() == 1
266                    && matches!(
267                        fields[0].data_type(),
268                        DataType::FixedSizeList(inner, _)
269                            if matches!(inner.data_type(), DataType::Float64)
270                    );
271
272                let is_wide_col = !is_vector
273                    && !fields.is_empty()
274                    && fields
275                        .iter()
276                        .all(|f| matches!(f.data_type(), DataType::Float64))
277                    && fields.iter().any(|f| f.name().starts_with("col_"));
278
279                // 3. Build DenseMatrix from the RecordBatch
280                let matrix = if is_vector {
281                    // New format already: vector column (FixedSizeList<Float64>)
282                    // Reuse the same decoding as Lance.
283                    let parent = path
284                        .parent()
285                        .ok_or_else(|| {
286                            StorageError::Invalid(format!("Path has no parent: {:?}", path))
287                        })?
288                        .to_str()
289                        .ok_or_else(|| {
290                            StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
291                        })?
292                        .to_string();
293
294                    let tmp_storage = Self::new(parent, String::from("tmp_storage"));
295                    tmp_storage.from_dense_record_batch(&combined)?
296                } else if is_wide_col {
297                    // Old wide columnar: columns like col_0, col_1, ... as Float64
298                    let n_rows = combined.num_rows();
299                    let n_cols = combined.num_columns();
300                    if n_rows == 0 || n_cols == 0 {
301                        return Err(StorageError::Invalid(format!(
302                            "Cannot load empty wide-column parquet at {:?}",
303                            path
304                        )));
305                    }
306
307                    let mut data = Vec::with_capacity(n_rows * n_cols);
308                    for col_idx in 0..n_cols {
309                        let col = combined.column(col_idx);
310                        let arr = col
311                            .as_any()
312                            .downcast_ref::<arrow_array::Float64Array>()
313                            .ok_or_else(|| {
314                                StorageError::Invalid(format!(
315                                    "Wide-column parquet expects Float64, got {:?} in column {}",
316                                    col.data_type(),
317                                    col_idx
318                                ))
319                            })?;
320                        // Build column-major storage: all rows for col 0, then col 1, ...
321                        for row_idx in 0..n_rows {
322                            data.push(arr.value(row_idx));
323                        }
324                    }
325
326                    DenseMatrix::new(n_rows, n_cols, data, true)
327                        .map_err(|e| StorageError::Invalid(e.to_string()))?
328                } else {
329                    return Err(StorageError::Invalid(format!(
330                        "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
331                         or wide Float64 columns named col_*",
332                        path
333                    )));
334                };
335
336                info!(
337                    "Loaded dense matrix from Parquet: {} x {}",
338                    matrix.shape().0,
339                    matrix.shape().1
340                );
341
342                Ok(matrix)
343            }
344            _ => Err(StorageError::Invalid(format!(
345                "Unsupported file format: {}. Only .lance and .parquet are supported",
346                extension
347            ))),
348        }
349    }
350
351    fn file_path(&self, key: &str) -> PathBuf {
352        self.base_path()
353            .join(format!("{}_{}.lance", self._name, key))
354    }
355
356    // =========
357    // ASYNC API (matches StorageBackend)
358    // =========
359
360    async fn save_sparse(
361        &self,
362        key: &str,
363        matrix: &CsMat<f64>,
364        md_path: &Path,
365    ) -> StorageResult<()> {
366        self.validate_initialized(md_path)?;
367        let path = self.file_path(key);
368        info!(
369            "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
370            key,
371            matrix.rows(),
372            matrix.cols(),
373            matrix.nnz(),
374            path
375        );
376
377        let filetype = FileInfo::which_filetype(key);
378        {
379            let mut metadata = self.load_metadata().await?;
380            metadata = metadata.add_file(
381                key,
382                FileInfo::new(
383                    format!("{}_{}.lance", self.get_name(), key),
384                    filetype.as_str(),
385                    (matrix.rows(), matrix.cols()),
386                    Some(matrix.nnz()),
387                    None,
388                ),
389            );
390            self.save_metadata(&metadata).await?;
391
392            let batch = self.to_sparse_record_batch(matrix)?;
393            let uri = Self::path_to_uri(&path);
394            self.write_lance_batch_async(uri, batch).await?;
395        }
396        info!("Sparse matrix {} saved successfully", filetype);
397        Ok(())
398    }
399
400    async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
401        info!("Loading sparse {} matrix", key);
402
403        let metadata = self.load_metadata().await?;
404        let filetype = FileInfo::which_filetype(key);
405        let file_info = metadata
406            .files
407            .get(key)
408            .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
409
410        let expected_rows = file_info.rows;
411        let expected_cols = file_info.cols;
412        debug!(
413            "Expected dimensions from storage metadata: {} x {}",
414            expected_rows, expected_cols
415        );
416
417        let path = self.file_path(key);
418        let uri = Self::path_to_uri(&path);
419        let batch = self.read_lance_first_batch_async(uri).await?;
420        let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
421        info!(
422            "Sparse {} matrix loaded: {} x {}, nnz={}",
423            filetype,
424            matrix.rows(),
425            matrix.cols(),
426            matrix.nnz()
427        );
428        Ok(matrix)
429    }
430
431    async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
432        self.validate_initialized(md_path)?;
433        let key = "lambdas";
434        let path = self.file_path("lambdas");
435        info!("Saving {} lambda values", lambdas.len());
436
437        let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
438        let batch = RecordBatch::try_new(
439            Arc::new(schema),
440            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
441        )
442        .map_err(|e| StorageError::Lance(e.to_string()))?;
443
444        {
445            let mut metadata = self.load_metadata().await?;
446            metadata = metadata.add_file(
447                key,
448                FileInfo::new(
449                    format!("{}_{}.lance", self.get_name(), key),
450                    "vector",
451                    (lambdas.len(), 1),
452                    None,
453                    None,
454                ),
455            );
456            self.save_metadata(&metadata).await?;
457
458            let uri = Self::path_to_uri(&path);
459            self.write_lance_batch_async(uri, batch).await?;
460        }
461        info!("Lambda values saved successfully");
462        Ok(())
463    }
464
465    async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
466        let path = self.file_path("lambdas");
467        info!("Loading lambda values from {:?}", path);
468
469        let uri = Self::path_to_uri(&path);
470        let batch = self.read_lance_first_batch_async(uri).await?;
471        let arr = batch
472            .column(0)
473            .as_any()
474            .downcast_ref::<Float64Array>()
475            .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
476
477        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
478        info!("Loaded {} lambda values", lambdas.len());
479        Ok(lambdas)
480    }
481
482    async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
483        self.validate_initialized(md_path)?;
484        let path = self.file_path(key);
485        info!("Saving {} values for vector {}", vector.len(), key);
486
487        let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
488        let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
489        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
490            .map_err(|e| StorageError::Lance(e.to_string()))?;
491
492        {
493            let mut metadata = self.load_metadata().await?;
494            metadata = metadata.add_file(
495                key,
496                FileInfo::new(
497                    format!("{}_{}.lance", self.get_name(), key),
498                    "vector",
499                    (vector.len(), 1),
500                    None,
501                    None,
502                ),
503            );
504            self.save_metadata(&metadata).await?;
505
506            let uri = Self::path_to_uri(&path);
507            self.write_lance_batch_async(uri, batch).await?;
508        }
509        info!("Index {} saved successfully", key);
510        Ok(())
511    }
512
513    async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
514        self.validate_initialized(md_path)?;
515        let path = self.file_path(key);
516        info!("Saving {} values for index {}", vector.len(), key);
517
518        let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
519        let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
520        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
521            .map_err(|e| StorageError::Lance(e.to_string()))?;
522
523        {
524            let mut metadata = self.load_metadata().await?;
525            metadata = metadata.add_file(
526                key,
527                FileInfo::new(
528                    format!("{}_{}.lance", self.get_name(), key),
529                    "vector",
530                    (vector.len(), 1),
531                    None,
532                    None,
533                ),
534            );
535            self.save_metadata(&metadata).await?;
536
537            let uri = Self::path_to_uri(&path);
538            self.write_lance_batch_async(uri, batch).await?;
539        }
540        info!("Index {} saved successfully", key);
541        Ok(())
542    }
543
544    async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
545        let path = self.file_path(filename);
546        info!("Loading vector {} from {:?}", filename, path);
547
548        let uri = Self::path_to_uri(&path);
549        let batch = self.read_lance_first_batch_async(uri).await?;
550        let arr = batch
551            .column(0)
552            .as_any()
553            .downcast_ref::<Float64Array>()
554            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
555
556        let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
557        info!("Loaded {} vector values for {}", vector.len(), filename);
558        Ok(vector)
559    }
560
561    async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
562        let path = self.file_path(filename);
563        info!("Loading vector {} from {:?}", filename, path);
564
565        let uri = Self::path_to_uri(&path);
566        let batch = self.read_lance_first_batch_async(uri).await?;
567        let arr = batch
568            .column(0)
569            .as_any()
570            .downcast_ref::<UInt32Array>()
571            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
572
573        let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
574        info!("Loaded {} vector values for {}", vector.len(), filename);
575        Ok(vector)
576    }
577
578    /// Save dense matrix to file in columnar format (col_0, col_1, ..., col_N)
579    ///
580    /// Async test helper that avoids any internal blocking runtimes.
581    async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
582        use tokio::fs as tokio_fs;
583
584        info!("Saving dense matrix to file (async): {:?}", path);
585
586        // Ensure parent dir exists for the test file.
587        if let Some(parent) = path.parent() {
588            tokio_fs::try_exists(parent).await.map_err(|e| {
589                StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
590            })?;
591        }
592
593        // Create a temporary storage only to store the file.
594        let tmp_storage = Self::new(
595            String::from(path.parent().unwrap().to_str().unwrap()),
596            String::from("tmp_storage"),
597        );
598
599        let extension = path
600            .extension()
601            .and_then(|e| e.to_str())
602            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
603
604        let (n_rows, n_cols) = data.shape();
605        info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
606
607        match extension {
608            "lance" => {
609                let batch = tmp_storage.to_dense_record_batch(data)?;
610                debug!(
611                    "Created RecordBatch with {} rows for Lance",
612                    batch.num_rows()
613                );
614
615                // Verify all rows are in the batch
616                if batch.num_rows() != n_rows {
617                    return Err(StorageError::Invalid(format!(
618                        "RecordBatch has {} rows but matrix has {} rows",
619                        batch.num_rows(),
620                        n_rows
621                    )));
622                }
623
624                let uri = Self::path_to_uri(path);
625                tmp_storage.write_lance_batch_async(uri, batch).await?;
626                info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
627                Ok(())
628            }
629            "parquet" => {
630                use parquet::arrow::ArrowWriter;
631                use parquet::file::properties::WriterProperties;
632                use std::fs::File;
633
634                // For tests we still use sync parquet writer; directory was created with tokio_fs.
635                let batch = tmp_storage.to_dense_record_batch(data)?;
636                debug!(
637                    "Created RecordBatch with {} rows for Parquet",
638                    batch.num_rows()
639                );
640
641                if batch.num_rows() != n_rows {
642                    return Err(StorageError::Invalid(format!(
643                        "RecordBatch has {} rows but matrix has {} rows",
644                        batch.num_rows(),
645                        n_rows
646                    )));
647                }
648
649                let file = File::create(path).map_err(|e| {
650                    StorageError::Io(format!("Failed to create parquet file: {}", e))
651                })?;
652
653                let props = WriterProperties::builder()
654                    .set_compression(parquet::basic::Compression::SNAPPY)
655                    .build();
656
657                let mut writer =
658                    ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
659                        StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
660                    })?;
661
662                writer
663                    .write(&batch)
664                    .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
665
666                writer
667                    .close()
668                    .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
669
670                info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
671                Ok(())
672            }
673            _ => Err(StorageError::Invalid(format!(
674                "Unsupported file format: {}. Only .lance and .parquet are supported",
675                extension
676            ))),
677        }
678    }
679
680    /// Save centroid_map (item-to-centroid assignments)
681    async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
682        self.validate_initialized(md_path)?;
683        let key = "centroid_map";
684        let path = self.file_path(key);
685        info!("Saving {} centroid map entries", map.len());
686
687        let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
688        let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
689        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
690            .map_err(|e| StorageError::Lance(e.to_string()))?;
691
692        {
693            let mut metadata = self.load_metadata().await?;
694            metadata = metadata.add_file(
695                key,
696                FileInfo::new(
697                    format!("{}_{}.lance", self.get_name(), key),
698                    "vector",
699                    (map.len(), 1),
700                    None,
701                    None,
702                ),
703            );
704            self.save_metadata(&metadata).await?;
705
706            let uri = Self::path_to_uri(&path);
707            self.write_lance_batch_async(uri, batch).await?;
708        }
709        info!("Centroid map saved successfully");
710        Ok(())
711    }
712
713    /// Load centroid_map
714    async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
715        let path = self.file_path("centroid_map");
716        info!("Loading centroid map from {:?}", path);
717
718        let uri = Self::path_to_uri(&path);
719        let batch = self.read_lance_first_batch_async(uri).await?;
720        let arr = batch
721            .column(0)
722            .as_any()
723            .downcast_ref::<UInt32Array>()
724            .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
725
726        let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
727        info!("Loaded {} centroid map entries", map.len());
728        Ok(map)
729    }
730
731    /// Save subcentroid_lambdas (tau values for subcentroids)
732    async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
733        self.validate_initialized(md_path)?;
734        let key = "subcentroid_lambdas";
735        let path = self.file_path(key);
736        info!("Saving {} subcentroid lambda values", lambdas.len());
737
738        let schema = Schema::new(vec![Field::new(
739            "subcentroid_lambda",
740            DataType::Float64,
741            false,
742        )]);
743        let batch = RecordBatch::try_new(
744            Arc::new(schema),
745            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
746        )
747        .map_err(|e| StorageError::Lance(e.to_string()))?;
748        {
749            let mut metadata = self.load_metadata().await?;
750            metadata = metadata.add_file(
751                key,
752                FileInfo::new(
753                    format!("{}_{}.lance", self.get_name(), key),
754                    "vector",
755                    (lambdas.len(), 1),
756                    None,
757                    None,
758                ),
759            );
760            self.save_metadata(&metadata).await?;
761
762            let uri = Self::path_to_uri(&path);
763            self.write_lance_batch_async(uri, batch).await?;
764        }
765        info!("Subcentroid lambda values saved successfully");
766        Ok(())
767    }
768
769    /// Load subcentroid_lambdas
770    async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
771        let path = self.file_path("subcentroid_lambdas");
772        info!("Loading subcentroid lambda values from {:?}", path);
773
774        let uri = Self::path_to_uri(&path);
775        let batch = self.read_lance_first_batch_async(uri).await?;
776        let arr = batch
777            .column(0)
778            .as_any()
779            .downcast_ref::<Float64Array>()
780            .ok_or_else(|| {
781                StorageError::Invalid("subcentroid_lambda column type mismatch".into())
782            })?;
783
784        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
785        info!("Loaded {} subcentroid lambda values", lambdas.len());
786        Ok(lambdas)
787    }
788
789    /// Save subcentroids (dense matrix)
790    async fn save_subcentroids(
791        &self,
792        subcentroids: &DenseMatrix<f64>,
793        md_path: &Path,
794    ) -> StorageResult<()> {
795        self.validate_initialized(md_path)?;
796        let key = "sub_centroids";
797        let path = self.file_path(key);
798        let (n_rows, n_cols) = subcentroids.shape();
799        info!(
800            "Saving subcentroids matrix {} x {} at {:?}",
801            n_rows, n_cols, path
802        );
803
804        let batch = self.to_dense_record_batch(subcentroids)?;
805        {
806            let mut metadata = self.load_metadata().await?;
807            metadata = metadata.add_file(
808                key,
809                FileInfo::new(
810                    format!("{}_{}.lance", self.get_name(), key),
811                    "vector",
812                    subcentroids.shape(),
813                    None,
814                    None,
815                ),
816            );
817            self.save_metadata(&metadata).await?;
818
819            let uri = Self::path_to_uri(&path);
820            self.write_lance_batch_async(uri, batch).await?;
821        }
822        debug!("Subcentroids matrix saved successfully");
823        Ok(())
824    }
825
826    /// Load subcentroids as Vec<Vec<f64>>
827    async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
828        let path = self.file_path("sub_centroids");
829        info!("Loading sub_centroids from {:?}", path);
830
831        let uri = Self::path_to_uri(&path);
832        let batch = self.read_lance_all_batches_async(uri).await?;
833        let matrix = self.from_dense_record_batch(&batch)?;
834
835        // Convert DenseMatrix to Vec<Vec<f64>>
836        let (n_rows, n_cols) = matrix.shape();
837        let mut result = Vec::with_capacity(n_rows);
838
839        for row_idx in 0..n_rows {
840            let row: Vec<f64> = (0..n_cols)
841                .map(|col_idx| *matrix.get((row_idx, col_idx)))
842                .collect();
843            result.push(row);
844        }
845
846        info!(
847            "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
848            n_rows, n_cols
849        );
850        Ok(result)
851    }
852
853    /// Save item norms vector
854    async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
855        self.validate_initialized(md_path)?;
856        let key = "item_norms";
857        let path = self.file_path(key);
858        info!("Saving {} item norm values", item_norms.len());
859
860        let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
861        let batch = RecordBatch::try_new(
862            Arc::new(schema),
863            vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
864        )
865        .map_err(|e| StorageError::Lance(e.to_string()))?;
866
867        {
868            let mut metadata = self.load_metadata().await?;
869            metadata = metadata.add_file(
870                key,
871                FileInfo::new(
872                    format!("{}_{}.lance", self.get_name(), key),
873                    "vector",
874                    (item_norms.len(), 1),
875                    None,
876                    None,
877                ),
878            );
879            self.save_metadata(&metadata).await?;
880
881            let uri = Self::path_to_uri(&path);
882            self.write_lance_batch_async(uri, batch).await?;
883        }
884        info!("Item norms saved successfully");
885        Ok(())
886    }
887
888    /// Load item norms vector
889    async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
890        let path = self.file_path("item_norms");
891        info!("Loading item norms from {:?}", path);
892
893        let uri = Self::path_to_uri(&path);
894        let batch = self.read_lance_first_batch_async(uri).await?;
895        let arr = batch
896            .column(0)
897            .as_any()
898            .downcast_ref::<Float64Array>()
899            .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
900
901        let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
902        info!("Loaded {} item norm values", norms.len());
903        Ok(norms)
904    }
905
906    async fn save_cluster_assignments(
907        &self,
908        assignments: &[Option<usize>],
909        md_path: &Path,
910    ) -> StorageResult<()> {
911        self.validate_initialized(md_path)?;
912        let key = "cluster_assignments";
913        let path = self.file_path(key);
914        info!("Saving {} cluster assignments", assignments.len());
915
916        // Convert Option<usize> to i64 (-1 for None)
917        let values: Vec<i64> = assignments
918            .iter()
919            .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
920            .collect();
921
922        let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
923        let batch = RecordBatch::try_new(
924            Arc::new(schema),
925            vec![Arc::new(Int64Array::from(values)) as _],
926        )
927        .map_err(|e| StorageError::Lance(e.to_string()))?;
928
929        {
930            let mut metadata = self.load_metadata().await?;
931            metadata = metadata.add_file(
932                key,
933                FileInfo::new(
934                    format!("{}_{}.lance", self.get_name(), key),
935                    "vector",
936                    (assignments.len(), 1),
937                    None,
938                    None,
939                ),
940            );
941            self.save_metadata(&metadata).await?;
942
943            let uri = Self::path_to_uri(&path);
944            self.write_lance_batch_async(uri, batch).await?;
945        }
946        info!("Cluster assignments saved successfully");
947        Ok(())
948    }
949
950    async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
951        let path = self.file_path("cluster_assignments");
952        info!("Loading cluster assignments from {:?}", path);
953
954        let uri = Self::path_to_uri(&path);
955        let batch = self.read_lance_first_batch_async(uri).await?;
956        let arr = batch
957            .column(0)
958            .as_any()
959            .downcast_ref::<Int64Array>()
960            .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
961
962        let assignments: Vec<Option<usize>> = (0..arr.len())
963            .map(|i| {
964                let val = arr.value(i);
965                if val < 0 { None } else { Some(val as usize) }
966            })
967            .collect();
968
969        info!("Loaded {} cluster assignments", assignments.len());
970        Ok(assignments)
971    }
972}