genegraph_storage/
lance_storage_graph.rs

1//! Lance storage backend for graph embeddings.
2//!
3//! Async-first implementation that matches the async `StorageBackend` trait:
4//! - All I/O is async, no internal `block_on` or runtime creation.
5//! - Callers (CLI, tests, services) are responsible for providing a Tokio runtime.
6
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::{Float64Array, Int64Array, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow_array::{Array as ArrowArray, RecordBatch};
13use log::{debug, info};
14use smartcore::linalg::basic::arrays::Array;
15use smartcore::linalg::basic::matrix::DenseMatrix;
16use sprs::CsMat;
17
18use crate::metadata::FileInfo;
19use crate::metadata::GeneMetadata;
20use crate::traits::backend::StorageBackend;
21use crate::traits::lance::LanceStorage;
22use crate::traits::metadata::Metadata;
23use crate::{StorageError, StorageResult};
24
25/// Lance-based storage backend for ArrowSpace graph embeddings.
26///
27/// Stores dense and sparse matrices as Lance datasets using a columnar format
28/// (`row`, `col`, `value` for sparse; `col_*` for dense) schema for efficient
29/// random and columnar access.
30#[derive(Debug, Clone)]
31pub struct LanceStorageGraph {
32    pub(crate) _base: String,
33    pub(crate) _name: String,
34}
35
36impl LanceStorageGraph {
37    /// Creates a new Lance storage backend.
38    ///
39    /// This is used for on-the-fly creation. For proper setup use `Genefold<...>::seed`.
40    ///
41    /// # Arguments
42    ///
43    /// * `_base` - Base directory path for all storage files
44    /// * `_name` - Name prefix for this storage instance
45    pub fn new(_base: String, _name: String) -> Self {
46        info!("Creating LanceStorage at base={}, name={}", _base, _name);
47        Self { _base, _name }
48    }
49
50    /// Spawn a LanceStorage from an existing seeded directory (with metadata.json)
51    pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
52        // Reuse the generic `exists` helper from the StorageBackend trait
53        let (exists, md_path) = Self::exists(&base_path);
54
55        // Replace assert! with proper error handling
56        if !exists || md_path.is_none() {
57            return Err(StorageError::Invalid(format!(
58                "Metadata does not exist in base path: {}",
59                base_path
60            )));
61        }
62
63        // Load metadata from the discovered metadata.json
64        let metadata = GeneMetadata::read(md_path.unwrap()).await?;
65
66        // Construct the LanceStorage using the metadata-provided nameid
67        let storage = Self::new(base_path.clone(), metadata.name_id.clone());
68        Ok((storage, metadata))
69    }
70}
71
72impl LanceStorage for LanceStorageGraph {}
73
74impl StorageBackend for LanceStorageGraph {
75    fn get_base(&self) -> String {
76        self._base.clone()
77    }
78
79    fn get_name(&self) -> String {
80        self._name.clone()
81    }
82
83    fn base_path(&self) -> PathBuf {
84        PathBuf::from(&self._base)
85    }
86
87    fn metadata_path(&self) -> PathBuf {
88        self.base_path()
89            .join(format!("{}_metadata.json", self._name))
90    }
91
92    /// Converts the base path for the store to a `file://` URI for Lance.
93    fn basepath_to_uri(&self) -> String {
94        Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
95    }
96
97    /// Save dense matrix using Lance-optimized vector format.
98    ///
99    /// Each row of the matrix becomes a FixedSizeList entry for efficient vector operations.
100    /// This format is optimized for vector search and enables Lance's full-zip encoding.
101    ///
102    /// # Arguments
103    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
104    /// * `matrix` - Dense matrix to save (N rows × F cols)
105    /// * `md_path` - Metadata file path for validation
106    async fn save_dense(
107        &self,
108        key: &str,
109        matrix: &DenseMatrix<f64>,
110        md_path: &Path,
111    ) -> StorageResult<()> {
112        self.validate_initialized(md_path)?;
113        let path = self.file_path(key);
114        let (n_rows, n_cols) = matrix.shape();
115
116        info!(
117            "Saving dense {} matrix: {} x {} at {:?}",
118            key, n_rows, n_cols, path
119        );
120
121        // Convert to Lance-optimized RecordBatch (FixedSizeList format)
122        let batch = self.to_dense_record_batch(matrix)?;
123
124        // Verify batch has correct number of rows
125        if batch.num_rows() != n_rows {
126            return Err(StorageError::Invalid(format!(
127                "RecordBatch has {} rows but matrix has {} rows",
128                batch.num_rows(),
129                n_rows
130            )));
131        }
132
133        // Write to Lance
134        let uri = Self::path_to_uri(&path);
135        self.write_lance_batch_async(uri, batch).await?;
136
137        info!("Dense {} matrix saved successfully", key);
138        Ok(())
139    }
140
141    /// Load dense matrix from Lance-optimized vector format.
142    ///
143    /// Reads FixedSizeList vectors and reconstructs a column-major DenseMatrix.
144    ///
145    /// # Arguments
146    /// * `filename` - Type identifier (e.g., "rawinput", "sub_centroids")
147    ///
148    /// # Returns
149    /// Column-major DenseMatrix matching smartcore conventions
150    async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
151        let path = self.file_path(key);
152        info!("Loading dense {} matrix from {:?}", key, path);
153
154        // Read all batches from Lance (may span multiple batches for large datasets)
155        let uri = Self::path_to_uri(&path);
156        let batch = self.read_lance_all_batches_async(uri).await?;
157
158        // Convert from FixedSizeList format to DenseMatrix
159        let matrix = self.from_dense_record_batch(&batch)?;
160
161        let (n_rows, n_cols) = matrix.shape();
162        info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
163
164        Ok(matrix)
165    }
166
167    /// Load initial data using columnar format from a file path.
168    ///
169    /// Async test helper that avoids any internal blocking runtimes.
170    async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
171        info!("Loading dense matrix from file (async): {:?}", path);
172
173        if !path.exists() {
174            return Err(StorageError::Invalid(format!(
175                "Dense file does not exist: {:?}",
176                path
177            )));
178        }
179
180        let extension = path
181            .extension()
182            .and_then(|e| e.to_str())
183            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
184
185        match extension {
186            "lance" => {
187                // Use a temporary LanceStorage rooted at the file's parent dir,
188                // same pattern as save_dense_to_file_async.
189                let parent = path
190                    .parent()
191                    .ok_or_else(|| {
192                        StorageError::Invalid(format!("Path has no parent: {:?}", path))
193                    })?
194                    .to_str()
195                    .ok_or_else(|| {
196                        StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
197                    })?
198                    .to_string();
199
200                let tmp_storage = Self::new(parent, String::from("tmp_storage"));
201
202                // Reuse the async Lance reader logic.
203                let uri = Self::path_to_uri(path);
204                let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
205                let matrix = tmp_storage.from_dense_record_batch(&batch)?;
206                info!(
207                    "Loaded dense matrix from Lance: {} x {}",
208                    matrix.shape().0,
209                    matrix.shape().1
210                );
211                Ok(matrix)
212            }
213            "parquet" => {
214                use arrow::datatypes::DataType;
215                use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
216                use std::fs::File;
217
218                // 1. Read from Parquet into a single RecordBatch
219                let file = File::open(path)
220                    .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
221
222                let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
223                    StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
224                })?;
225                let mut reader = builder.build().map_err(|e| {
226                    StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
227                })?;
228
229                let mut batches = Vec::new();
230                #[allow(clippy::while_let_on_iterator)]
231                while let Some(batch) = reader.next() {
232                    let batch = batch.map_err(|e| {
233                        StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
234                    })?;
235                    batches.push(batch);
236                }
237
238                if batches.is_empty() {
239                    return Err(StorageError::Invalid(format!(
240                        "Empty parquet dataset at {:?}",
241                        path
242                    )));
243                }
244
245                let schema = batches[0].schema();
246                let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
247                    StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
248                })?;
249
250                // 2. Detect layout: vector (FixedSizeList) vs old wide columnar (col_* Float64)
251                let fields = schema.fields();
252                let is_vector = fields.len() == 1
253                    && matches!(
254                        fields[0].data_type(),
255                        DataType::FixedSizeList(inner, _)
256                            if matches!(inner.data_type(), DataType::Float64)
257                    );
258
259                let is_wide_col = !is_vector
260                    && !fields.is_empty()
261                    && fields
262                        .iter()
263                        .all(|f| matches!(f.data_type(), DataType::Float64))
264                    && fields.iter().any(|f| f.name().starts_with("col_"));
265
266                // 3. Build DenseMatrix from the RecordBatch
267                let matrix = if is_vector {
268                    // New format already: vector column (FixedSizeList<Float64>)
269                    // Reuse the same decoding as Lance.
270                    let parent = path
271                        .parent()
272                        .ok_or_else(|| {
273                            StorageError::Invalid(format!("Path has no parent: {:?}", path))
274                        })?
275                        .to_str()
276                        .ok_or_else(|| {
277                            StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
278                        })?
279                        .to_string();
280
281                    let tmp_storage = Self::new(parent, String::from("tmp_storage"));
282                    tmp_storage.from_dense_record_batch(&combined)?
283                } else if is_wide_col {
284                    // Old wide columnar: columns like col_0, col_1, ... as Float64
285                    let n_rows = combined.num_rows();
286                    let n_cols = combined.num_columns();
287                    if n_rows == 0 || n_cols == 0 {
288                        return Err(StorageError::Invalid(format!(
289                            "Cannot load empty wide-column parquet at {:?}",
290                            path
291                        )));
292                    }
293
294                    let mut data = Vec::with_capacity(n_rows * n_cols);
295                    for col_idx in 0..n_cols {
296                        let col = combined.column(col_idx);
297                        let arr = col
298                            .as_any()
299                            .downcast_ref::<arrow_array::Float64Array>()
300                            .ok_or_else(|| {
301                                StorageError::Invalid(format!(
302                                    "Wide-column parquet expects Float64, got {:?} in column {}",
303                                    col.data_type(),
304                                    col_idx
305                                ))
306                            })?;
307                        // Build column-major storage: all rows for col 0, then col 1, ...
308                        for row_idx in 0..n_rows {
309                            data.push(arr.value(row_idx));
310                        }
311                    }
312
313                    DenseMatrix::new(n_rows, n_cols, data, true)
314                        .map_err(|e| StorageError::Invalid(e.to_string()))?
315                } else {
316                    return Err(StorageError::Invalid(format!(
317                        "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
318                         or wide Float64 columns named col_*",
319                        path
320                    )));
321                };
322
323                info!(
324                    "Loaded dense matrix from Parquet: {} x {}",
325                    matrix.shape().0,
326                    matrix.shape().1
327                );
328
329                Ok(matrix)
330            }
331            _ => Err(StorageError::Invalid(format!(
332                "Unsupported file format: {}. Only .lance and .parquet are supported",
333                extension
334            ))),
335        }
336    }
337
338    fn file_path(&self, key: &str) -> PathBuf {
339        self.base_path()
340            .join(format!("{}_{}.lance", self._name, key))
341    }
342
343    // =========
344    // ASYNC API (matches StorageBackend)
345    // =========
346
347    async fn save_sparse(
348        &self,
349        key: &str,
350        matrix: &CsMat<f64>,
351        md_path: &Path,
352    ) -> StorageResult<()> {
353        self.validate_initialized(md_path)?;
354        let path = self.file_path(key);
355        info!(
356            "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
357            key,
358            matrix.rows(),
359            matrix.cols(),
360            matrix.nnz(),
361            path
362        );
363
364        let mut metadata = self.load_metadata().await?;
365        let filetype = FileInfo::which_filetype(key);
366        metadata.files.insert(
367            key.to_string(),
368            metadata.new_fileinfo(
369                key,
370                filetype.as_str(),
371                (matrix.rows(), matrix.cols()),
372                Some(matrix.nnz()),
373                None,
374            ),
375        );
376        self.save_metadata(&metadata).await?;
377
378        let batch = self.to_sparse_record_batch(matrix)?;
379        let uri = Self::path_to_uri(&path);
380        self.write_lance_batch_async(uri, batch).await?;
381        info!("Sparse matrix {} saved successfully", filetype);
382        Ok(())
383    }
384
385    async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
386        info!("Loading sparse {} matrix", key);
387
388        let metadata = self.load_metadata().await?;
389        let filetype = FileInfo::which_filetype(key);
390        let file_info = metadata
391            .files
392            .get(key)
393            .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
394
395        let expected_rows = file_info.rows;
396        let expected_cols = file_info.cols;
397        debug!(
398            "Expected dimensions from storage metadata: {} x {}",
399            expected_rows, expected_cols
400        );
401
402        let path = self.file_path(key);
403        let uri = Self::path_to_uri(&path);
404        let batch = self.read_lance_first_batch_async(uri).await?;
405        let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
406        info!(
407            "Sparse {} matrix loaded: {} x {}, nnz={}",
408            filetype,
409            matrix.rows(),
410            matrix.cols(),
411            matrix.nnz()
412        );
413        Ok(matrix)
414    }
415
416    async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
417        self.validate_initialized(md_path)?;
418        let path = self.file_path("lambdas");
419        info!("Saving {} lambda values", lambdas.len());
420
421        let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
422        let batch = RecordBatch::try_new(
423            Arc::new(schema),
424            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
425        )
426        .map_err(|e| StorageError::Lance(e.to_string()))?;
427
428        let uri = Self::path_to_uri(&path);
429        self.write_lance_batch_async(uri, batch).await?;
430        info!("Lambda values saved successfully");
431        Ok(())
432    }
433
434    async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
435        let path = self.file_path("lambdas");
436        info!("Loading lambda values from {:?}", path);
437
438        let uri = Self::path_to_uri(&path);
439        let batch = self.read_lance_first_batch_async(uri).await?;
440        let arr = batch
441            .column(0)
442            .as_any()
443            .downcast_ref::<Float64Array>()
444            .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
445
446        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
447        info!("Loaded {} lambda values", lambdas.len());
448        Ok(lambdas)
449    }
450
451    async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
452        self.validate_initialized(md_path)?;
453        let path = self.file_path(key);
454        info!("Saving {} values for vector {}", vector.len(), key);
455
456        let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
457        let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
458        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
459            .map_err(|e| StorageError::Lance(e.to_string()))?;
460
461        let uri = Self::path_to_uri(&path);
462        self.write_lance_batch_async(uri, batch).await?;
463        info!("Index {} saved successfully", key);
464        Ok(())
465    }
466
467    async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
468        self.validate_initialized(md_path)?;
469        let path = self.file_path(key);
470        info!("Saving {} values for index {}", vector.len(), key);
471
472        let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
473        let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
474        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
475            .map_err(|e| StorageError::Lance(e.to_string()))?;
476
477        let uri = Self::path_to_uri(&path);
478        self.write_lance_batch_async(uri, batch).await?;
479        info!("Index {} saved successfully", key);
480        Ok(())
481    }
482
483    async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
484        let path = self.file_path(filename);
485        info!("Loading vector {} from {:?}", filename, path);
486
487        let uri = Self::path_to_uri(&path);
488        let batch = self.read_lance_first_batch_async(uri).await?;
489        let arr = batch
490            .column(0)
491            .as_any()
492            .downcast_ref::<Float64Array>()
493            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
494
495        let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
496        info!("Loaded {} vector values for {}", vector.len(), filename);
497        Ok(vector)
498    }
499
500    async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
501        let path = self.file_path(filename);
502        info!("Loading vector {} from {:?}", filename, path);
503
504        let uri = Self::path_to_uri(&path);
505        let batch = self.read_lance_first_batch_async(uri).await?;
506        let arr = batch
507            .column(0)
508            .as_any()
509            .downcast_ref::<UInt32Array>()
510            .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
511
512        let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
513        info!("Loaded {} vector values for {}", vector.len(), filename);
514        Ok(vector)
515    }
516
517    /// Save dense matrix to file in columnar format (col_0, col_1, ..., col_N)
518    ///
519    /// Async test helper that avoids any internal blocking runtimes.
520    async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
521        use tokio::fs as tokio_fs;
522
523        info!("Saving dense matrix to file (async): {:?}", path);
524
525        // Ensure parent dir exists for the test file.
526        if let Some(parent) = path.parent() {
527            tokio_fs::try_exists(parent).await.map_err(|e| {
528                StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
529            })?;
530        }
531
532        // Create a temporary storage only to store the file.
533        let tmp_storage = Self::new(
534            String::from(path.parent().unwrap().to_str().unwrap()),
535            String::from("tmp_storage"),
536        );
537
538        let extension = path
539            .extension()
540            .and_then(|e| e.to_str())
541            .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
542
543        let (n_rows, n_cols) = data.shape();
544        info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
545
546        match extension {
547            "lance" => {
548                let batch = tmp_storage.to_dense_record_batch(data)?;
549                debug!(
550                    "Created RecordBatch with {} rows for Lance",
551                    batch.num_rows()
552                );
553
554                // Verify all rows are in the batch
555                if batch.num_rows() != n_rows {
556                    return Err(StorageError::Invalid(format!(
557                        "RecordBatch has {} rows but matrix has {} rows",
558                        batch.num_rows(),
559                        n_rows
560                    )));
561                }
562
563                let uri = Self::path_to_uri(path);
564                tmp_storage.write_lance_batch_async(uri, batch).await?;
565                info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
566                Ok(())
567            }
568            "parquet" => {
569                use parquet::arrow::ArrowWriter;
570                use parquet::file::properties::WriterProperties;
571                use std::fs::File;
572
573                // For tests we still use sync parquet writer; directory was created with tokio_fs.
574                let batch = tmp_storage.to_dense_record_batch(data)?;
575                debug!(
576                    "Created RecordBatch with {} rows for Parquet",
577                    batch.num_rows()
578                );
579
580                if batch.num_rows() != n_rows {
581                    return Err(StorageError::Invalid(format!(
582                        "RecordBatch has {} rows but matrix has {} rows",
583                        batch.num_rows(),
584                        n_rows
585                    )));
586                }
587
588                let file = File::create(path).map_err(|e| {
589                    StorageError::Io(format!("Failed to create parquet file: {}", e))
590                })?;
591
592                let props = WriterProperties::builder()
593                    .set_compression(parquet::basic::Compression::SNAPPY)
594                    .build();
595
596                let mut writer =
597                    ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
598                        StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
599                    })?;
600
601                writer
602                    .write(&batch)
603                    .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
604
605                writer
606                    .close()
607                    .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
608
609                info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
610                Ok(())
611            }
612            _ => Err(StorageError::Invalid(format!(
613                "Unsupported file format: {}. Only .lance and .parquet are supported",
614                extension
615            ))),
616        }
617    }
618
619    /// Save centroid_map (item-to-centroid assignments)
620    async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
621        self.validate_initialized(md_path)?;
622        let path = self.file_path("centroid_map");
623        info!("Saving {} centroid map entries", map.len());
624
625        let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
626        let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
627        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
628            .map_err(|e| StorageError::Lance(e.to_string()))?;
629
630        let uri = Self::path_to_uri(&path);
631        self.write_lance_batch_async(uri, batch).await?;
632        info!("Centroid map saved successfully");
633        Ok(())
634    }
635
636    /// Load centroid_map
637    async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
638        let path = self.file_path("centroid_map");
639        info!("Loading centroid map from {:?}", path);
640
641        let uri = Self::path_to_uri(&path);
642        let batch = self.read_lance_first_batch_async(uri).await?;
643        let arr = batch
644            .column(0)
645            .as_any()
646            .downcast_ref::<UInt32Array>()
647            .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
648
649        let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
650        info!("Loaded {} centroid map entries", map.len());
651        Ok(map)
652    }
653
654    /// Save subcentroid_lambdas (tau values for subcentroids)
655    async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
656        self.validate_initialized(md_path)?;
657        let path = self.file_path("subcentroid_lambdas");
658        info!("Saving {} subcentroid lambda values", lambdas.len());
659
660        let schema = Schema::new(vec![Field::new(
661            "subcentroid_lambda",
662            DataType::Float64,
663            false,
664        )]);
665        let batch = RecordBatch::try_new(
666            Arc::new(schema),
667            vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
668        )
669        .map_err(|e| StorageError::Lance(e.to_string()))?;
670
671        let uri = Self::path_to_uri(&path);
672        self.write_lance_batch_async(uri, batch).await?;
673        info!("Subcentroid lambda values saved successfully");
674        Ok(())
675    }
676
677    /// Load subcentroid_lambdas
678    async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
679        let path = self.file_path("subcentroid_lambdas");
680        info!("Loading subcentroid lambda values from {:?}", path);
681
682        let uri = Self::path_to_uri(&path);
683        let batch = self.read_lance_first_batch_async(uri).await?;
684        let arr = batch
685            .column(0)
686            .as_any()
687            .downcast_ref::<Float64Array>()
688            .ok_or_else(|| {
689                StorageError::Invalid("subcentroid_lambda column type mismatch".into())
690            })?;
691
692        let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
693        info!("Loaded {} subcentroid lambda values", lambdas.len());
694        Ok(lambdas)
695    }
696
697    /// Save subcentroids (dense matrix)
698    async fn save_subcentroids(
699        &self,
700        subcentroids: &DenseMatrix<f64>,
701        md_path: &Path,
702    ) -> StorageResult<()> {
703        self.validate_initialized(md_path)?;
704        let path = self.file_path("sub_centroids");
705        let (n_rows, n_cols) = subcentroids.shape();
706        info!(
707            "Saving subcentroids matrix {} x {} at {:?}",
708            n_rows, n_cols, path
709        );
710
711        let batch = self.to_dense_record_batch(subcentroids)?;
712        let uri = Self::path_to_uri(&path);
713        self.write_lance_batch_async(uri, batch).await?;
714        debug!("Subcentroids matrix saved successfully");
715        Ok(())
716    }
717
718    /// Load subcentroids as Vec<Vec<f64>>
719    async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
720        let path = self.file_path("sub_centroids");
721        info!("Loading sub_centroids from {:?}", path);
722
723        let uri = Self::path_to_uri(&path);
724        let batch = self.read_lance_all_batches_async(uri).await?;
725        let matrix = self.from_dense_record_batch(&batch)?;
726
727        // Convert DenseMatrix to Vec<Vec<f64>>
728        let (n_rows, n_cols) = matrix.shape();
729        let mut result = Vec::with_capacity(n_rows);
730
731        for row_idx in 0..n_rows {
732            let row: Vec<f64> = (0..n_cols)
733                .map(|col_idx| *matrix.get((row_idx, col_idx)))
734                .collect();
735            result.push(row);
736        }
737
738        info!(
739            "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
740            n_rows, n_cols
741        );
742        Ok(result)
743    }
744
745    /// Save item norms vector
746    async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
747        self.validate_initialized(md_path)?;
748        let path = self.file_path("item_norms");
749        info!("Saving {} item norm values", item_norms.len());
750
751        let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
752        let batch = RecordBatch::try_new(
753            Arc::new(schema),
754            vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
755        )
756        .map_err(|e| StorageError::Lance(e.to_string()))?;
757
758        let uri = Self::path_to_uri(&path);
759        self.write_lance_batch_async(uri, batch).await?;
760        info!("Item norms saved successfully");
761        Ok(())
762    }
763
764    /// Load item norms vector
765    async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
766        let path = self.file_path("item_norms");
767        info!("Loading item norms from {:?}", path);
768
769        let uri = Self::path_to_uri(&path);
770        let batch = self.read_lance_first_batch_async(uri).await?;
771        let arr = batch
772            .column(0)
773            .as_any()
774            .downcast_ref::<Float64Array>()
775            .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
776
777        let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
778        info!("Loaded {} item norm values", norms.len());
779        Ok(norms)
780    }
781
782    async fn save_cluster_assignments(
783        &self,
784        assignments: &[Option<usize>],
785        md_path: &Path,
786    ) -> StorageResult<()> {
787        self.validate_initialized(md_path)?;
788        let path = self.file_path("cluster_assignments");
789        info!("Saving {} cluster assignments", assignments.len());
790
791        // Convert Option<usize> to i64 (-1 for None)
792        let values: Vec<i64> = assignments
793            .iter()
794            .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
795            .collect();
796
797        let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
798        let batch = RecordBatch::try_new(
799            Arc::new(schema),
800            vec![Arc::new(Int64Array::from(values)) as _],
801        )
802        .map_err(|e| StorageError::Lance(e.to_string()))?;
803
804        let uri = Self::path_to_uri(&path);
805        self.write_lance_batch_async(uri, batch).await?;
806        info!("Cluster assignments saved successfully");
807        Ok(())
808    }
809
810    async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
811        let path = self.file_path("cluster_assignments");
812        info!("Loading cluster assignments from {:?}", path);
813
814        let uri = Self::path_to_uri(&path);
815        let batch = self.read_lance_first_batch_async(uri).await?;
816        let arr = batch
817            .column(0)
818            .as_any()
819            .downcast_ref::<Int64Array>()
820            .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
821
822        let assignments: Vec<Option<usize>> = (0..arr.len())
823            .map(|i| {
824                let val = arr.value(i);
825                if val < 0 { None } else { Some(val as usize) }
826            })
827            .collect();
828
829        info!("Loaded {} cluster assignments", assignments.len());
830        Ok(assignments)
831    }
832}