genegraph_storage/
lance_storage_graph.rs

1//! Lance storage backend for graph embeddings.
2//!
3//! Async-first implementation that matches the async `StorageBackend` trait:
4//! - All I/O is async, no internal `block_on` or runtime creation.
5//! - Callers (CLI, tests, services) are responsible for providing a Tokio runtime.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::{Float64Array, Int64Array, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow_array::{Array as ArrowArray, RecordBatch};
13use log::{debug, info};
14use smartcore::linalg::basic::arrays::Array;
15use smartcore::linalg::basic::matrix::DenseMatrix;
16use sprs::CsMat;
17
18use crate::metadata::FileInfo;
19use crate::metadata::GeneMetadata;
20use crate::traits::backend::StorageBackend;
21use crate::traits::lance::LanceStorage;
22use crate::traits::metadata::Metadata;
23use crate::{StorageError, StorageResult};
24
25/// Lance-based storage backend for ArrowSpace graph embeddings.
26///
27/// Stores dense and sparse matrices as Lance datasets using a columnar format
28/// (`row`, `col`, `value` for sparse; `col_*` for dense) schema for efficient
29/// random and columnar access.
30#[derive(Debug, Clone)]
31pub struct LanceStorageGraph {
32    pub(crate) _base: String,
33    pub(crate) _name: String,
34}
35
36impl LanceStorageGraph {
37    /// Creates a new Lance storage backend.
38    ///
39    /// This is used for on-the-fly creation. For proper setup use `Genefold<...>::seed`.
40    ///
41    /// # Arguments
42    ///
43    /// * `_base` - Base directory path for all storage files
44    /// * `_name` - Name prefix for this storage instance
45    pub fn new(_base: String, _name: String) -> Self {
46        info!("Creating LanceStorage at base={}, name={}", _base, _name);
47        Self { _base, _name }
48    }
49
50    /// Spawn a LanceStorage from an existing seeded directory (with metadata.json)
51    pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
52        // Reuse the generic `exists` helper from the StorageBackend trait
53        let (exists, md_path) = <Self as StorageBackend>::exists(&base_path);
54        assert!(
55            exists && md_path.is_some(),
56            "Metadata does not exist in this base path"
57        );
58
59        // Load metadata from the discovered metadata.json
60        let metadata = GeneMetadata::read(md_path.unwrap()).await?;
61
62        // Construct the LanceStorage using the metadata-provided nameid
63        let storage = Self::new(base_path.clone(), metadata.name_id.clone());
64
65        Ok((storage, metadata))
66    }
67}
68
69impl LanceStorage for LanceStorageGraph {}
70
71impl StorageBackend for LanceStorageGraph {
72    fn get_base(&self) -> String {
73        self._base.clone()
74    }
75
76    fn get_name(&self) -> String {
77        self._name.clone()
78    }
79
80    fn base_path(&self) -> PathBuf {
81        PathBuf::from(&self._base)
82    }
83
84    fn metadata_path(&self) -> PathBuf {
85        self.base_path()
86            .join(format!("{}_metadata.json", self._name))
87    }
88
89    /// Converts the base path for the store to a `file://` URI for Lance.
90    fn basepath_to_uri(&self) -> String {
91        Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
92    }
93
94    // Add these methods to impl LanceStorage {} block
95
96    /// Save dense matrix using Lance-optimized vector format.
97    ///
98    /// Each row of the matrix becomes a FixedSizeList entry for efficient vector operations.
99    /// This format is optimized for vector search and enables Lance's full-zip encoding.
100    ///
101    /// # Arguments
102    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
103    /// * `matrix` - Dense matrix to save (N rows × F cols)
104    /// * `md_path` - Metadata file path for validation
105    async fn save_dense(
106        &self,
107        key: &str,
108        matrix: &DenseMatrix<f64>,
109        md_path: &Path,
110    ) -> StorageResult<()> {
111        self.validate_initialized(md_path)?;
112        let path = self.file_path(key);
113        let (n_rows, n_cols) = matrix.shape();
114
115        info!(
116            "Saving dense {} matrix: {} x {} at {:?}",
117            key, n_rows, n_cols, path
118        );
119
120        // Convert to Lance-optimized RecordBatch (FixedSizeList format)
121        let batch = self.to_dense_record_batch(matrix)?;
122
123        // Verify batch has correct number of rows
124        if batch.num_rows() != n_rows {
125            return Err(StorageError::Invalid(format!(
126                "RecordBatch has {} rows but matrix has {} rows",
127                batch.num_rows(),
128                n_rows
129            )));
130        }
131
132        // Write to Lance
133        let uri = Self::path_to_uri(&path);
134        self.write_lance_batch_async(uri, batch).await?;
135
136        info!("Dense {} matrix saved successfully", key);
137        Ok(())
138    }
139
140    /// Load dense matrix from Lance-optimized vector format.
141    ///
142    /// Reads FixedSizeList vectors and reconstructs a column-major DenseMatrix.
143    ///
144    /// # Arguments
145    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
146    ///
147    /// # Returns
148    /// Column-major DenseMatrix matching smartcore conventions
149    async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
150        let path = self.file_path(key);
151        info!("Loading dense {} matrix from {:?}", key, path);
152
153        // Read all batches from Lance (may span multiple batches for large datasets)
154        let uri = Self::path_to_uri(&path);
155        let batch = self.read_lance_all_batches_async(uri).await?;
156
157        // Convert from FixedSizeList format to DenseMatrix
158        let matrix = self.from_dense_record_batch(&batch)?;
159
160        let (n_rows, n_cols) = matrix.shape();
161        info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
162
163        Ok(matrix)
164    }
165
166    /// Load initial data using columnar format from a file path.
167    ///
168    /// Async test helper that avoids any internal blocking runtimes.
169    async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
170        info!("Loading dense matrix from file (async): {:?}", path);
171
172        if !path.exists() {
173            return Err(StorageError::Invalid(format!(
174                "Dense file does not exist: {:?}",
175                path
176            )));
177        }
178
179        let extension = path
180            .extension()
181            .and_then(|e| e.to_str())
182            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
183
184        match extension {
185            "lance" => {
186                // Use a temporary LanceStorage rooted at the file's parent dir,
187                // same pattern as save_dense_to_file_async.
188                let parent = path
189                    .parent()
190                    .ok_or_else(|| {
191                        StorageError::Invalid(format!("Path has no parent: {:?}", path))
192                    })?
193                    .to_str()
194                    .ok_or_else(|| {
195                        StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
196                    })?
197                    .to_string();
198
199                let tmp_storage = Self::new(parent, String::from("tmp_storage"));
200
201                // Reuse the async Lance reader logic.
202                let uri = Self::path_to_uri(path);
203                let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
204                let matrix = tmp_storage.from_dense_record_batch(&batch)?;
205                info!(
206                    "Loaded dense matrix from Lance: {} x {}",
207                    matrix.shape().0,
208                    matrix.shape().1
209                );
210                Ok(matrix)
211            }
212            "parquet" => {
213                use arrow::datatypes::DataType;
214                use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
215                use std::fs::File;
216
217                // 1. Read from Parquet into a single RecordBatch
218                let file = File::open(path)
219                    .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
220
221                let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
222                    StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
223                })?;
224                let mut reader = builder.build().map_err(|e| {
225                    StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
226                })?;
227
228                let mut batches = Vec::new();
229                #[allow(clippy::while_let_on_iterator)]
230                while let Some(batch) = reader.next() {
231                    let batch = batch.map_err(|e| {
232                        StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
233                    })?;
234                    batches.push(batch);
235                }
236
237                if batches.is_empty() {
238                    return Err(StorageError::Invalid(format!(
239                        "Empty parquet dataset at {:?}",
240                        path
241                    )));
242                }
243
244                let schema = batches[0].schema();
245                let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
246                    StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
247                })?;
248
249                // 2. Detect layout: vector (FixedSizeList) vs old wide columnar (col_* Float64)
250                let fields = schema.fields();
251                let is_vector = fields.len() == 1
252                    && matches!(
253                        fields[0].data_type(),
254                        DataType::FixedSizeList(inner, _)
255                            if matches!(inner.data_type(), DataType::Float64)
256                    );
257
258                let is_wide_col = !is_vector
259                    && !fields.is_empty()
260                    && fields
261                        .iter()
262                        .all(|f| matches!(f.data_type(), DataType::Float64))
263                    && fields.iter().any(|f| f.name().starts_with("col_"));
264
265                // 3. Build DenseMatrix from the RecordBatch
266                let matrix = if is_vector {
267                    // New format already: vector column (FixedSizeList<Float64>)
268                    // Reuse the same decoding as Lance.
269                    let parent = path
270                        .parent()
271                        .ok_or_else(|| {
272                            StorageError::Invalid(format!("Path has no parent: {:?}", path))
273                        })?
274                        .to_str()
275                        .ok_or_else(|| {
276                            StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
277                        })?
278                        .to_string();
279
280                    let tmp_storage = Self::new(parent, String::from("tmp_storage"));
281                    tmp_storage.from_dense_record_batch(&combined)?
282                } else if is_wide_col {
283                    // Old wide columnar: columns like col_0, col_1, ... as Float64
284                    let n_rows = combined.num_rows();
285                    let n_cols = combined.num_columns();
286                    if n_rows == 0 || n_cols == 0 {
287                        return Err(StorageError::Invalid(format!(
288                            "Cannot load empty wide-column parquet at {:?}",
289                            path
290                        )));
291                    }
292
293                    let mut data = Vec::with_capacity(n_rows * n_cols);
294                    for col_idx in 0..n_cols {
295                        let col = combined.column(col_idx);
296                        let arr = col
297                            .as_any()
298                            .downcast_ref::<arrow_array::Float64Array>()
299                            .ok_or_else(|| {
300                                StorageError::Invalid(format!(
301                                    "Wide-column parquet expects Float64, got {:?} in column {}",
302                                    col.data_type(),
303                                    col_idx
304                                ))
305                            })?;
306                        // Build column-major storage: all rows for col 0, then col 1, ...
307                        for row_idx in 0..n_rows {
308                            data.push(arr.value(row_idx));
309                        }
310                    }
311
312                    DenseMatrix::new(n_rows, n_cols, data, true)
313                        .map_err(|e| StorageError::Invalid(e.to_string()))?
314                } else {
315                    return Err(StorageError::Invalid(format!(
316                        "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
317                         or wide Float64 columns named col_*",
318                        path
319                    )));
320                };
321
322                info!(
323                    "Loaded dense matrix from Parquet: {} x {}",
324                    matrix.shape().0,
325                    matrix.shape().1
326                );
327
328                Ok(matrix)
329            }
330            _ => Err(StorageError::Invalid(format!(
331                "Unsupported file format: {}. Only .lance and .parquet are supported",
332                extension
333            ))),
334        }
335    }
336
337    fn file_path(&self, key: &str) -> PathBuf {
338        self.base_path()
339            .join(format!("{}_{}.lance", self._name, key))
340    }
341
342    // =========
343    // ASYNC API (matches StorageBackend)
344    // =========
345
346    async fn save_sparse(
347        &self,
348        key: &str,
349        matrix: &CsMat<f64>,
350        md_path: &Path,
351    ) -> StorageResult<()> {
352        self.validate_initialized(md_path)?;
353        let path = self.file_path(key);
354        info!(
355            "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
356            key,
357            matrix.rows(),
358            matrix.cols(),
359            matrix.nnz(),
360            path
361        );
362
363        let mut metadata = self.load_metadata().await?;
364        let filetype = FileInfo::which_filetype(key);
365        metadata.files.insert(
366            key.to_string(),
367            metadata.new_fileinfo(
368                key,
369                filetype.as_str(),
370                (matrix.rows(), matrix.cols()),
371                Some(matrix.nnz()),
372                None,
373            ),
374        );
375        self.save_metadata(&metadata).await?;
376
377        let batch = self.to_sparse_record_batch(matrix)?;
378        let uri = Self::path_to_uri(&path);
379        self.write_lance_batch_async(uri, batch).await?;
380        info!("Sparse matrix {} saved successfully", filetype);
381        Ok(())
382    }
383
384    async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
385        info!("Loading sparse {} matrix", key);
386
387        let metadata = self.load_metadata().await?;
388        let filetype = FileInfo::which_filetype(key);
389        let file_info = metadata
390            .files
391            .get(key)
392            .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
393
394        let expected_rows = file_info.rows;
395        let expected_cols = file_info.cols;
396        debug!(
397            "Expected dimensions from storage metadata: {} x {}",
398            expected_rows, expected_cols
399        );
400
401        let path = self.file_path(key);
402        let uri = Self::path_to_uri(&path);
403        let batch = self.read_lance_first_batch_async(uri).await?;
404        let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
405        info!(
406            "Sparse {} matrix loaded: {} x {}, nnz={}",
407            filetype,
408            matrix.rows(),
409            matrix.cols(),
410            matrix.nnz()
411        );
412        Ok(matrix)
413    }
414
415    async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
416        self.validate_initialized(md_path)?;
417        let path = self.file_path("lambdas");
418        info!("Saving {} lambda values", lambdas.len());
419
420        let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
421        let batch = RecordBatch::try_new(
422            Arc::new(schema),
423            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
424        )
425        .map_err(|e| StorageError::Lance(e.to_string()))?;
426
427        let uri = Self::path_to_uri(&path);
428        self.write_lance_batch_async(uri, batch).await?;
429        info!("Lambda values saved successfully");
430        Ok(())
431    }
432
433    async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
434        let path = self.file_path("lambdas");
435        info!("Loading lambda values from {:?}", path);
436
437        let uri = Self::path_to_uri(&path);
438        let batch = self.read_lance_first_batch_async(uri).await?;
439        let arr = batch
440            .column(0)
441            .as_any()
442            .downcast_ref::<Float64Array>()
443            .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
444
445        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
446        info!("Loaded {} lambda values", lambdas.len());
447        Ok(lambdas)
448    }
449
450    async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
451        self.validate_initialized(md_path)?;
452        let path = self.file_path(key);
453        info!("Saving {} values for vector {}", vector.len(), key);
454
455        let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
456        let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
457        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
458            .map_err(|e| StorageError::Lance(e.to_string()))?;
459
460        let uri = Self::path_to_uri(&path);
461        self.write_lance_batch_async(uri, batch).await?;
462        info!("Index {} saved successfully", key);
463        Ok(())
464    }
465
466    async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
467        self.validate_initialized(md_path)?;
468        let path = self.file_path(key);
469        info!("Saving {} values for index {}", vector.len(), key);
470
471        let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
472        let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
473        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
474            .map_err(|e| StorageError::Lance(e.to_string()))?;
475
476        let uri = Self::path_to_uri(&path);
477        self.write_lance_batch_async(uri, batch).await?;
478        info!("Index {} saved successfully", key);
479        Ok(())
480    }
481
482    async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
483        let path = self.file_path(filename);
484        info!("Loading vector {} from {:?}", filename, path);
485
486        let uri = Self::path_to_uri(&path);
487        let batch = self.read_lance_first_batch_async(uri).await?;
488        let arr = batch
489            .column(0)
490            .as_any()
491            .downcast_ref::<Float64Array>()
492            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
493
494        let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
495        info!("Loaded {} vector values for {}", vector.len(), filename);
496        Ok(vector)
497    }
498
499    async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
500        let path = self.file_path(filename);
501        info!("Loading vector {} from {:?}", filename, path);
502
503        let uri = Self::path_to_uri(&path);
504        let batch = self.read_lance_first_batch_async(uri).await?;
505        let arr = batch
506            .column(0)
507            .as_any()
508            .downcast_ref::<UInt32Array>()
509            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
510
511        let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
512        info!("Loaded {} vector values for {}", vector.len(), filename);
513        Ok(vector)
514    }
515
516    /// Save dense matrix to file in columnar format (col_0, col_1, ..., col_N)
517    ///
518    /// Async test helper that avoids any internal blocking runtimes.
519    async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
520        use tokio::fs as tokio_fs;
521
522        info!("Saving dense matrix to file (async): {:?}", path);
523
524        // Ensure parent dir exists for the test file.
525        if let Some(parent) = path.parent() {
526            tokio_fs::try_exists(parent).await.map_err(|e| {
527                StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
528            })?;
529        }
530
531        // Create a temporary storage only to store the file.
532        let tmp_storage = Self::new(
533            String::from(path.parent().unwrap().to_str().unwrap()),
534            String::from("tmp_storage"),
535        );
536
537        let extension = path
538            .extension()
539            .and_then(|e| e.to_str())
540            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
541
542        let (n_rows, n_cols) = data.shape();
543        info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
544
545        match extension {
546            "lance" => {
547                let batch = tmp_storage.to_dense_record_batch(data)?;
548                debug!(
549                    "Created RecordBatch with {} rows for Lance",
550                    batch.num_rows()
551                );
552
553                // Verify all rows are in the batch
554                if batch.num_rows() != n_rows {
555                    return Err(StorageError::Invalid(format!(
556                        "RecordBatch has {} rows but matrix has {} rows",
557                        batch.num_rows(),
558                        n_rows
559                    )));
560                }
561
562                let uri = Self::path_to_uri(path);
563                tmp_storage.write_lance_batch_async(uri, batch).await?;
564                info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
565                Ok(())
566            }
567            "parquet" => {
568                use parquet::arrow::ArrowWriter;
569                use parquet::file::properties::WriterProperties;
570                use std::fs::File;
571
572                // For tests we still use sync parquet writer; directory was created with tokio_fs.
573                let batch = tmp_storage.to_dense_record_batch(data)?;
574                debug!(
575                    "Created RecordBatch with {} rows for Parquet",
576                    batch.num_rows()
577                );
578
579                if batch.num_rows() != n_rows {
580                    return Err(StorageError::Invalid(format!(
581                        "RecordBatch has {} rows but matrix has {} rows",
582                        batch.num_rows(),
583                        n_rows
584                    )));
585                }
586
587                let file = File::create(path).map_err(|e| {
588                    StorageError::Io(format!("Failed to create parquet file: {}", e))
589                })?;
590
591                let props = WriterProperties::builder()
592                    .set_compression(parquet::basic::Compression::SNAPPY)
593                    .build();
594
595                let mut writer =
596                    ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
597                        StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
598                    })?;
599
600                writer
601                    .write(&batch)
602                    .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
603
604                writer
605                    .close()
606                    .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
607
608                info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
609                Ok(())
610            }
611            _ => Err(StorageError::Invalid(format!(
612                "Unsupported file format: {}. Only .lance and .parquet are supported",
613                extension
614            ))),
615        }
616    }
617
618    /// Save centroid_map (item-to-centroid assignments)
619    async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
620        self.validate_initialized(md_path)?;
621        let path = self.file_path("centroid_map");
622        info!("Saving {} centroid map entries", map.len());
623
624        let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
625        let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
626        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
627            .map_err(|e| StorageError::Lance(e.to_string()))?;
628
629        let uri = Self::path_to_uri(&path);
630        self.write_lance_batch_async(uri, batch).await?;
631        info!("Centroid map saved successfully");
632        Ok(())
633    }
634
635    /// Load centroid_map
636    async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
637        let path = self.file_path("centroid_map");
638        info!("Loading centroid map from {:?}", path);
639
640        let uri = Self::path_to_uri(&path);
641        let batch = self.read_lance_first_batch_async(uri).await?;
642        let arr = batch
643            .column(0)
644            .as_any()
645            .downcast_ref::<UInt32Array>()
646            .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
647
648        let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
649        info!("Loaded {} centroid map entries", map.len());
650        Ok(map)
651    }
652
653    /// Save subcentroid_lambdas (tau values for subcentroids)
654    async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
655        self.validate_initialized(md_path)?;
656        let path = self.file_path("subcentroid_lambdas");
657        info!("Saving {} subcentroid lambda values", lambdas.len());
658
659        let schema = Schema::new(vec![Field::new(
660            "subcentroid_lambda",
661            DataType::Float64,
662            false,
663        )]);
664        let batch = RecordBatch::try_new(
665            Arc::new(schema),
666            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
667        )
668        .map_err(|e| StorageError::Lance(e.to_string()))?;
669
670        let uri = Self::path_to_uri(&path);
671        self.write_lance_batch_async(uri, batch).await?;
672        info!("Subcentroid lambda values saved successfully");
673        Ok(())
674    }
675
676    /// Load subcentroid_lambdas
677    async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
678        let path = self.file_path("subcentroid_lambdas");
679        info!("Loading subcentroid lambda values from {:?}", path);
680
681        let uri = Self::path_to_uri(&path);
682        let batch = self.read_lance_first_batch_async(uri).await?;
683        let arr = batch
684            .column(0)
685            .as_any()
686            .downcast_ref::<Float64Array>()
687            .ok_or_else(|| {
688                StorageError::Invalid("subcentroid_lambda column type mismatch".into())
689            })?;
690
691        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
692        info!("Loaded {} subcentroid lambda values", lambdas.len());
693        Ok(lambdas)
694    }
695
696    /// Save subcentroids (dense matrix)
697    async fn save_subcentroids(
698        &self,
699        subcentroids: &DenseMatrix<f64>,
700        md_path: &Path,
701    ) -> StorageResult<()> {
702        self.validate_initialized(md_path)?;
703        let path = self.file_path("sub_centroids");
704        let (n_rows, n_cols) = subcentroids.shape();
705        info!(
706            "Saving subcentroids matrix {} x {} at {:?}",
707            n_rows, n_cols, path
708        );
709
710        let batch = self.to_dense_record_batch(subcentroids)?;
711        let uri = Self::path_to_uri(&path);
712        self.write_lance_batch_async(uri, batch).await?;
713        debug!("Subcentroids matrix saved successfully");
714        Ok(())
715    }
716
717    /// Load subcentroids as Vec<Vec<f64>>
718    async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
719        let path = self.file_path("sub_centroids");
720        info!("Loading sub_centroids from {:?}", path);
721
722        let uri = Self::path_to_uri(&path);
723        let batch = self.read_lance_all_batches_async(uri).await?;
724        let matrix = self.from_dense_record_batch(&batch)?;
725
726        // Convert DenseMatrix to Vec<Vec<f64>>
727        let (n_rows, n_cols) = matrix.shape();
728        let mut result = Vec::with_capacity(n_rows);
729
730        for row_idx in 0..n_rows {
731            let row: Vec<f64> = (0..n_cols)
732                .map(|col_idx| *matrix.get((row_idx, col_idx)))
733                .collect();
734            result.push(row);
735        }
736
737        info!(
738            "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
739            n_rows, n_cols
740        );
741        Ok(result)
742    }
743
744    /// Save item norms vector
745    async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
746        self.validate_initialized(md_path)?;
747        let path = self.file_path("item_norms");
748        info!("Saving {} item norm values", item_norms.len());
749
750        let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
751        let batch = RecordBatch::try_new(
752            Arc::new(schema),
753            vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
754        )
755        .map_err(|e| StorageError::Lance(e.to_string()))?;
756
757        let uri = Self::path_to_uri(&path);
758        self.write_lance_batch_async(uri, batch).await?;
759        info!("Item norms saved successfully");
760        Ok(())
761    }
762
763    /// Load item norms vector
764    async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
765        let path = self.file_path("item_norms");
766        info!("Loading item norms from {:?}", path);
767
768        let uri = Self::path_to_uri(&path);
769        let batch = self.read_lance_first_batch_async(uri).await?;
770        let arr = batch
771            .column(0)
772            .as_any()
773            .downcast_ref::<Float64Array>()
774            .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
775
776        let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
777        info!("Loaded {} item norm values", norms.len());
778        Ok(norms)
779    }
780
781    async fn save_cluster_assignments(
782        &self,
783        assignments: &[Option<usize>],
784        md_path: &Path,
785    ) -> StorageResult<()> {
786        self.validate_initialized(md_path)?;
787        let path = self.file_path("cluster_assignments");
788        info!("Saving {} cluster assignments", assignments.len());
789
790        // Convert Option<usize> to i64 (-1 for None)
791        let values: Vec<i64> = assignments
792            .iter()
793            .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
794            .collect();
795
796        let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
797        let batch = RecordBatch::try_new(
798            Arc::new(schema),
799            vec![Arc::new(Int64Array::from(values)) as _],
800        )
801        .map_err(|e| StorageError::Lance(e.to_string()))?;
802
803        let uri = Self::path_to_uri(&path);
804        self.write_lance_batch_async(uri, batch).await?;
805        info!("Cluster assignments saved successfully");
806        Ok(())
807    }
808
809    async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
810        let path = self.file_path("cluster_assignments");
811        info!("Loading cluster assignments from {:?}", path);
812
813        let uri = Self::path_to_uri(&path);
814        let batch = self.read_lance_first_batch_async(uri).await?;
815        let arr = batch
816            .column(0)
817            .as_any()
818            .downcast_ref::<Int64Array>()
819            .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
820
821        let assignments: Vec<Option<usize>> = (0..arr.len())
822            .map(|i| {
823                let val = arr.value(i);
824                if val < 0 { None } else { Some(val as usize) }
825            })
826            .collect();
827
828        info!("Loaded {} cluster assignments", assignments.len());
829        Ok(assignments)
830    }
831}