1use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::{Float64Array, Int64Array, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow_array::{Array as ArrowArray, RecordBatch};
13use log::{debug, info};
14use smartcore::linalg::basic::arrays::Array;
15use smartcore::linalg::basic::matrix::DenseMatrix;
16use sprs::CsMat;
17
18use crate::metadata::FileInfo;
19use crate::metadata::GeneMetadata;
20use crate::traits::backend::StorageBackend;
21use crate::traits::lance::LanceStorage;
22use crate::traits::metadata::Metadata;
23use crate::{StorageError, StorageResult};
24
25#[derive(Debug, Clone)]
31pub struct LanceStorageGraph {
32 pub(crate) _base: String,
33 pub(crate) _name: String,
34}
35
36impl LanceStorageGraph {
37 pub fn new(_base: String, _name: String) -> Self {
46 info!("Creating LanceStorage at base={}, name={}", _base, _name);
47 Self { _base, _name }
48 }
49
50 pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
52 let (exists, md_path) = Self::exists(&base_path);
54
55 if !exists || md_path.is_none() {
57 return Err(StorageError::Invalid(format!(
58 "Metadata does not exist in base path: {}",
59 base_path
60 )));
61 }
62
63 let metadata = GeneMetadata::read(md_path.unwrap()).await?;
65
66 let storage = Self::new(base_path.clone(), metadata.name_id.clone());
68 Ok((storage, metadata))
69 }
70}
71
72impl LanceStorage for LanceStorageGraph {}
73
74impl StorageBackend for LanceStorageGraph {
75 fn get_base(&self) -> String {
76 self._base.clone()
77 }
78
79 fn get_name(&self) -> String {
80 self._name.clone()
81 }
82
83 fn base_path(&self) -> PathBuf {
84 PathBuf::from(&self._base)
85 }
86
87 fn metadata_path(&self) -> PathBuf {
88 self.base_path()
89 .join(format!("{}_metadata.json", self._name))
90 }
91
92 fn basepath_to_uri(&self) -> String {
94 Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
95 }
96
97 async fn save_dense(
107 &self,
108 key: &str,
109 matrix: &DenseMatrix<f64>,
110 md_path: &Path,
111 ) -> StorageResult<()> {
112 self.validate_initialized(md_path)?;
113 let path = self.file_path(key);
114 let (n_rows, n_cols) = matrix.shape();
115
116 info!(
117 "Saving dense {} matrix: {} x {} at {:?}",
118 key, n_rows, n_cols, path
119 );
120
121 let batch = self.to_dense_record_batch(matrix)?;
123
124 if batch.num_rows() != n_rows {
126 return Err(StorageError::Invalid(format!(
127 "RecordBatch has {} rows but matrix has {} rows",
128 batch.num_rows(),
129 n_rows
130 )));
131 }
132
133 {
134 let uri = Self::path_to_uri(&path);
136 self.write_lance_batch_async(uri, batch).await?;
137 let mut md = self.load_metadata().await?;
138 md = md.add_file(
139 key,
140 FileInfo::new(
141 format!("{}_{}.lance", self.get_name(), key),
142 "dense",
143 matrix.shape(),
144 None,
145 None,
146 ),
147 );
148 self.save_metadata(&md).await?;
149 info!("Dense {} matrix saved successfully", key);
150 }
151 Ok(())
152 }
153
154 async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
164 let path = self.file_path(key);
165 info!("Loading dense {} matrix from {:?}", key, path);
166
167 let uri = Self::path_to_uri(&path);
169 let batch = self.read_lance_all_batches_async(uri).await?;
170
171 let matrix = self.from_dense_record_batch(&batch)?;
173
174 let (n_rows, n_cols) = matrix.shape();
175 info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
176
177 Ok(matrix)
178 }
179
180 async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
184 info!("Loading dense matrix from file (async): {:?}", path);
185
186 if !path.exists() {
187 return Err(StorageError::Invalid(format!(
188 "Dense file does not exist: {:?}",
189 path
190 )));
191 }
192
193 let extension = path
194 .extension()
195 .and_then(|e| e.to_str())
196 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
197
198 match extension {
199 "lance" => {
200 let parent = path
203 .parent()
204 .ok_or_else(|| {
205 StorageError::Invalid(format!("Path has no parent: {:?}", path))
206 })?
207 .to_str()
208 .ok_or_else(|| {
209 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
210 })?
211 .to_string();
212
213 let tmp_storage = Self::new(parent, String::from("tmp_storage"));
214
215 let uri = Self::path_to_uri(path);
217 let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
218 let matrix = tmp_storage.from_dense_record_batch(&batch)?;
219 info!(
220 "Loaded dense matrix from Lance: {} x {}",
221 matrix.shape().0,
222 matrix.shape().1
223 );
224 Ok(matrix)
225 }
226 "parquet" => {
227 use arrow::datatypes::DataType;
228 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
229 use std::fs::File;
230
231 let file = File::open(path)
233 .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
234
235 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
236 StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
237 })?;
238 let mut reader = builder.build().map_err(|e| {
239 StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
240 })?;
241
242 let mut batches = Vec::new();
243 #[allow(clippy::while_let_on_iterator)]
244 while let Some(batch) = reader.next() {
245 let batch = batch.map_err(|e| {
246 StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
247 })?;
248 batches.push(batch);
249 }
250
251 if batches.is_empty() {
252 return Err(StorageError::Invalid(format!(
253 "Empty parquet dataset at {:?}",
254 path
255 )));
256 }
257
258 let schema = batches[0].schema();
259 let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
260 StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
261 })?;
262
263 let fields = schema.fields();
265 let is_vector = fields.len() == 1
266 && matches!(
267 fields[0].data_type(),
268 DataType::FixedSizeList(inner, _)
269 if matches!(inner.data_type(), DataType::Float64)
270 );
271
272 let is_wide_col = !is_vector
273 && !fields.is_empty()
274 && fields
275 .iter()
276 .all(|f| matches!(f.data_type(), DataType::Float64))
277 && fields.iter().any(|f| f.name().starts_with("col_"));
278
279 let matrix = if is_vector {
281 let parent = path
284 .parent()
285 .ok_or_else(|| {
286 StorageError::Invalid(format!("Path has no parent: {:?}", path))
287 })?
288 .to_str()
289 .ok_or_else(|| {
290 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
291 })?
292 .to_string();
293
294 let tmp_storage = Self::new(parent, String::from("tmp_storage"));
295 tmp_storage.from_dense_record_batch(&combined)?
296 } else if is_wide_col {
297 let n_rows = combined.num_rows();
299 let n_cols = combined.num_columns();
300 if n_rows == 0 || n_cols == 0 {
301 return Err(StorageError::Invalid(format!(
302 "Cannot load empty wide-column parquet at {:?}",
303 path
304 )));
305 }
306
307 let mut data = Vec::with_capacity(n_rows * n_cols);
308 for col_idx in 0..n_cols {
309 let col = combined.column(col_idx);
310 let arr = col
311 .as_any()
312 .downcast_ref::<arrow_array::Float64Array>()
313 .ok_or_else(|| {
314 StorageError::Invalid(format!(
315 "Wide-column parquet expects Float64, got {:?} in column {}",
316 col.data_type(),
317 col_idx
318 ))
319 })?;
320 for row_idx in 0..n_rows {
322 data.push(arr.value(row_idx));
323 }
324 }
325
326 DenseMatrix::new(n_rows, n_cols, data, true)
327 .map_err(|e| StorageError::Invalid(e.to_string()))?
328 } else {
329 return Err(StorageError::Invalid(format!(
330 "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
331 or wide Float64 columns named col_*",
332 path
333 )));
334 };
335
336 info!(
337 "Loaded dense matrix from Parquet: {} x {}",
338 matrix.shape().0,
339 matrix.shape().1
340 );
341
342 Ok(matrix)
343 }
344 _ => Err(StorageError::Invalid(format!(
345 "Unsupported file format: {}. Only .lance and .parquet are supported",
346 extension
347 ))),
348 }
349 }
350
351 fn file_path(&self, key: &str) -> PathBuf {
352 self.base_path()
353 .join(format!("{}_{}.lance", self._name, key))
354 }
355
356 async fn save_sparse(
361 &self,
362 key: &str,
363 matrix: &CsMat<f64>,
364 md_path: &Path,
365 ) -> StorageResult<()> {
366 self.validate_initialized(md_path)?;
367 let path = self.file_path(key);
368 info!(
369 "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
370 key,
371 matrix.rows(),
372 matrix.cols(),
373 matrix.nnz(),
374 path
375 );
376
377 let filetype = FileInfo::which_filetype(key);
378 {
379 let mut metadata = self.load_metadata().await?;
380 metadata = metadata.add_file(
381 key,
382 FileInfo::new(
383 format!("{}_{}.lance", self.get_name(), key),
384 filetype.as_str(),
385 (matrix.rows(), matrix.cols()),
386 Some(matrix.nnz()),
387 None,
388 ),
389 );
390 self.save_metadata(&metadata).await?;
391
392 let batch = self.to_sparse_record_batch(matrix)?;
393 let uri = Self::path_to_uri(&path);
394 self.write_lance_batch_async(uri, batch).await?;
395 }
396 info!("Sparse matrix {} saved successfully", filetype);
397 Ok(())
398 }
399
400 async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
401 info!("Loading sparse {} matrix", key);
402
403 let metadata = self.load_metadata().await?;
404 let filetype = FileInfo::which_filetype(key);
405 let file_info = metadata
406 .files
407 .get(key)
408 .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
409
410 let expected_rows = file_info.rows;
411 let expected_cols = file_info.cols;
412 debug!(
413 "Expected dimensions from storage metadata: {} x {}",
414 expected_rows, expected_cols
415 );
416
417 let path = self.file_path(key);
418 let uri = Self::path_to_uri(&path);
419 let batch = self.read_lance_first_batch_async(uri).await?;
420 let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
421 info!(
422 "Sparse {} matrix loaded: {} x {}, nnz={}",
423 filetype,
424 matrix.rows(),
425 matrix.cols(),
426 matrix.nnz()
427 );
428 Ok(matrix)
429 }
430
431 async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
432 self.validate_initialized(md_path)?;
433 let key = "lambdas";
434 let path = self.file_path("lambdas");
435 info!("Saving {} lambda values", lambdas.len());
436
437 let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
438 let batch = RecordBatch::try_new(
439 Arc::new(schema),
440 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
441 )
442 .map_err(|e| StorageError::Lance(e.to_string()))?;
443
444 {
445 let mut metadata = self.load_metadata().await?;
446 metadata = metadata.add_file(
447 key,
448 FileInfo::new(
449 format!("{}_{}.lance", self.get_name(), key),
450 "vector",
451 (lambdas.len(), 1),
452 None,
453 None,
454 ),
455 );
456 self.save_metadata(&metadata).await?;
457
458 let uri = Self::path_to_uri(&path);
459 self.write_lance_batch_async(uri, batch).await?;
460 }
461 info!("Lambda values saved successfully");
462 Ok(())
463 }
464
465 async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
466 let path = self.file_path("lambdas");
467 info!("Loading lambda values from {:?}", path);
468
469 let uri = Self::path_to_uri(&path);
470 let batch = self.read_lance_first_batch_async(uri).await?;
471 let arr = batch
472 .column(0)
473 .as_any()
474 .downcast_ref::<Float64Array>()
475 .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
476
477 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
478 info!("Loaded {} lambda values", lambdas.len());
479 Ok(lambdas)
480 }
481
482 async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
483 self.validate_initialized(md_path)?;
484 let path = self.file_path(key);
485 info!("Saving {} values for vector {}", vector.len(), key);
486
487 let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
488 let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
489 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
490 .map_err(|e| StorageError::Lance(e.to_string()))?;
491
492 {
493 let mut metadata = self.load_metadata().await?;
494 metadata = metadata.add_file(
495 key,
496 FileInfo::new(
497 format!("{}_{}.lance", self.get_name(), key),
498 "vector",
499 (vector.len(), 1),
500 None,
501 None,
502 ),
503 );
504 self.save_metadata(&metadata).await?;
505
506 let uri = Self::path_to_uri(&path);
507 self.write_lance_batch_async(uri, batch).await?;
508 }
509 info!("Index {} saved successfully", key);
510 Ok(())
511 }
512
513 async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
514 self.validate_initialized(md_path)?;
515 let path = self.file_path(key);
516 info!("Saving {} values for index {}", vector.len(), key);
517
518 let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
519 let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
520 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
521 .map_err(|e| StorageError::Lance(e.to_string()))?;
522
523 {
524 let mut metadata = self.load_metadata().await?;
525 metadata = metadata.add_file(
526 key,
527 FileInfo::new(
528 format!("{}_{}.lance", self.get_name(), key),
529 "vector",
530 (vector.len(), 1),
531 None,
532 None,
533 ),
534 );
535 self.save_metadata(&metadata).await?;
536
537 let uri = Self::path_to_uri(&path);
538 self.write_lance_batch_async(uri, batch).await?;
539 }
540 info!("Index {} saved successfully", key);
541 Ok(())
542 }
543
544 async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
545 let path = self.file_path(filename);
546 info!("Loading vector {} from {:?}", filename, path);
547
548 let uri = Self::path_to_uri(&path);
549 let batch = self.read_lance_first_batch_async(uri).await?;
550 let arr = batch
551 .column(0)
552 .as_any()
553 .downcast_ref::<Float64Array>()
554 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
555
556 let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
557 info!("Loaded {} vector values for {}", vector.len(), filename);
558 Ok(vector)
559 }
560
561 async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
562 let path = self.file_path(filename);
563 info!("Loading vector {} from {:?}", filename, path);
564
565 let uri = Self::path_to_uri(&path);
566 let batch = self.read_lance_first_batch_async(uri).await?;
567 let arr = batch
568 .column(0)
569 .as_any()
570 .downcast_ref::<UInt32Array>()
571 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
572
573 let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
574 info!("Loaded {} vector values for {}", vector.len(), filename);
575 Ok(vector)
576 }
577
578 async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
582 use tokio::fs as tokio_fs;
583
584 info!("Saving dense matrix to file (async): {:?}", path);
585
586 if let Some(parent) = path.parent() {
588 tokio_fs::try_exists(parent).await.map_err(|e| {
589 StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
590 })?;
591 }
592
593 let tmp_storage = Self::new(
595 String::from(path.parent().unwrap().to_str().unwrap()),
596 String::from("tmp_storage"),
597 );
598
599 let extension = path
600 .extension()
601 .and_then(|e| e.to_str())
602 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
603
604 let (n_rows, n_cols) = data.shape();
605 info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
606
607 match extension {
608 "lance" => {
609 let batch = tmp_storage.to_dense_record_batch(data)?;
610 debug!(
611 "Created RecordBatch with {} rows for Lance",
612 batch.num_rows()
613 );
614
615 if batch.num_rows() != n_rows {
617 return Err(StorageError::Invalid(format!(
618 "RecordBatch has {} rows but matrix has {} rows",
619 batch.num_rows(),
620 n_rows
621 )));
622 }
623
624 let uri = Self::path_to_uri(path);
625 tmp_storage.write_lance_batch_async(uri, batch).await?;
626 info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
627 Ok(())
628 }
629 "parquet" => {
630 use parquet::arrow::ArrowWriter;
631 use parquet::file::properties::WriterProperties;
632 use std::fs::File;
633
634 let batch = tmp_storage.to_dense_record_batch(data)?;
636 debug!(
637 "Created RecordBatch with {} rows for Parquet",
638 batch.num_rows()
639 );
640
641 if batch.num_rows() != n_rows {
642 return Err(StorageError::Invalid(format!(
643 "RecordBatch has {} rows but matrix has {} rows",
644 batch.num_rows(),
645 n_rows
646 )));
647 }
648
649 let file = File::create(path).map_err(|e| {
650 StorageError::Io(format!("Failed to create parquet file: {}", e))
651 })?;
652
653 let props = WriterProperties::builder()
654 .set_compression(parquet::basic::Compression::SNAPPY)
655 .build();
656
657 let mut writer =
658 ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
659 StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
660 })?;
661
662 writer
663 .write(&batch)
664 .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
665
666 writer
667 .close()
668 .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
669
670 info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
671 Ok(())
672 }
673 _ => Err(StorageError::Invalid(format!(
674 "Unsupported file format: {}. Only .lance and .parquet are supported",
675 extension
676 ))),
677 }
678 }
679
680 async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
682 self.validate_initialized(md_path)?;
683 let key = "centroid_map";
684 let path = self.file_path(key);
685 info!("Saving {} centroid map entries", map.len());
686
687 let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
688 let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
689 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
690 .map_err(|e| StorageError::Lance(e.to_string()))?;
691
692 {
693 let mut metadata = self.load_metadata().await?;
694 metadata = metadata.add_file(
695 key,
696 FileInfo::new(
697 format!("{}_{}.lance", self.get_name(), key),
698 "vector",
699 (map.len(), 1),
700 None,
701 None,
702 ),
703 );
704 self.save_metadata(&metadata).await?;
705
706 let uri = Self::path_to_uri(&path);
707 self.write_lance_batch_async(uri, batch).await?;
708 }
709 info!("Centroid map saved successfully");
710 Ok(())
711 }
712
713 async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
715 let path = self.file_path("centroid_map");
716 info!("Loading centroid map from {:?}", path);
717
718 let uri = Self::path_to_uri(&path);
719 let batch = self.read_lance_first_batch_async(uri).await?;
720 let arr = batch
721 .column(0)
722 .as_any()
723 .downcast_ref::<UInt32Array>()
724 .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
725
726 let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
727 info!("Loaded {} centroid map entries", map.len());
728 Ok(map)
729 }
730
731 async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
733 self.validate_initialized(md_path)?;
734 let key = "subcentroid_lambdas";
735 let path = self.file_path(key);
736 info!("Saving {} subcentroid lambda values", lambdas.len());
737
738 let schema = Schema::new(vec![Field::new(
739 "subcentroid_lambda",
740 DataType::Float64,
741 false,
742 )]);
743 let batch = RecordBatch::try_new(
744 Arc::new(schema),
745 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
746 )
747 .map_err(|e| StorageError::Lance(e.to_string()))?;
748 {
749 let mut metadata = self.load_metadata().await?;
750 metadata = metadata.add_file(
751 key,
752 FileInfo::new(
753 format!("{}_{}.lance", self.get_name(), key),
754 "vector",
755 (lambdas.len(), 1),
756 None,
757 None,
758 ),
759 );
760 self.save_metadata(&metadata).await?;
761
762 let uri = Self::path_to_uri(&path);
763 self.write_lance_batch_async(uri, batch).await?;
764 }
765 info!("Subcentroid lambda values saved successfully");
766 Ok(())
767 }
768
769 async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
771 let path = self.file_path("subcentroid_lambdas");
772 info!("Loading subcentroid lambda values from {:?}", path);
773
774 let uri = Self::path_to_uri(&path);
775 let batch = self.read_lance_first_batch_async(uri).await?;
776 let arr = batch
777 .column(0)
778 .as_any()
779 .downcast_ref::<Float64Array>()
780 .ok_or_else(|| {
781 StorageError::Invalid("subcentroid_lambda column type mismatch".into())
782 })?;
783
784 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
785 info!("Loaded {} subcentroid lambda values", lambdas.len());
786 Ok(lambdas)
787 }
788
789 async fn save_subcentroids(
791 &self,
792 subcentroids: &DenseMatrix<f64>,
793 md_path: &Path,
794 ) -> StorageResult<()> {
795 self.validate_initialized(md_path)?;
796 let key = "sub_centroids";
797 let path = self.file_path(key);
798 let (n_rows, n_cols) = subcentroids.shape();
799 info!(
800 "Saving subcentroids matrix {} x {} at {:?}",
801 n_rows, n_cols, path
802 );
803
804 let batch = self.to_dense_record_batch(subcentroids)?;
805 {
806 let mut metadata = self.load_metadata().await?;
807 metadata = metadata.add_file(
808 key,
809 FileInfo::new(
810 format!("{}_{}.lance", self.get_name(), key),
811 "vector",
812 subcentroids.shape(),
813 None,
814 None,
815 ),
816 );
817 self.save_metadata(&metadata).await?;
818
819 let uri = Self::path_to_uri(&path);
820 self.write_lance_batch_async(uri, batch).await?;
821 }
822 debug!("Subcentroids matrix saved successfully");
823 Ok(())
824 }
825
826 async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
828 let path = self.file_path("sub_centroids");
829 info!("Loading sub_centroids from {:?}", path);
830
831 let uri = Self::path_to_uri(&path);
832 let batch = self.read_lance_all_batches_async(uri).await?;
833 let matrix = self.from_dense_record_batch(&batch)?;
834
835 let (n_rows, n_cols) = matrix.shape();
837 let mut result = Vec::with_capacity(n_rows);
838
839 for row_idx in 0..n_rows {
840 let row: Vec<f64> = (0..n_cols)
841 .map(|col_idx| *matrix.get((row_idx, col_idx)))
842 .collect();
843 result.push(row);
844 }
845
846 info!(
847 "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
848 n_rows, n_cols
849 );
850 Ok(result)
851 }
852
853 async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
855 self.validate_initialized(md_path)?;
856 let key = "item_norms";
857 let path = self.file_path(key);
858 info!("Saving {} item norm values", item_norms.len());
859
860 let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
861 let batch = RecordBatch::try_new(
862 Arc::new(schema),
863 vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
864 )
865 .map_err(|e| StorageError::Lance(e.to_string()))?;
866
867 {
868 let mut metadata = self.load_metadata().await?;
869 metadata = metadata.add_file(
870 key,
871 FileInfo::new(
872 format!("{}_{}.lance", self.get_name(), key),
873 "vector",
874 (item_norms.len(), 1),
875 None,
876 None,
877 ),
878 );
879 self.save_metadata(&metadata).await?;
880
881 let uri = Self::path_to_uri(&path);
882 self.write_lance_batch_async(uri, batch).await?;
883 }
884 info!("Item norms saved successfully");
885 Ok(())
886 }
887
888 async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
890 let path = self.file_path("item_norms");
891 info!("Loading item norms from {:?}", path);
892
893 let uri = Self::path_to_uri(&path);
894 let batch = self.read_lance_first_batch_async(uri).await?;
895 let arr = batch
896 .column(0)
897 .as_any()
898 .downcast_ref::<Float64Array>()
899 .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
900
901 let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
902 info!("Loaded {} item norm values", norms.len());
903 Ok(norms)
904 }
905
906 async fn save_cluster_assignments(
907 &self,
908 assignments: &[Option<usize>],
909 md_path: &Path,
910 ) -> StorageResult<()> {
911 self.validate_initialized(md_path)?;
912 let key = "cluster_assignments";
913 let path = self.file_path(key);
914 info!("Saving {} cluster assignments", assignments.len());
915
916 let values: Vec<i64> = assignments
918 .iter()
919 .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
920 .collect();
921
922 let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
923 let batch = RecordBatch::try_new(
924 Arc::new(schema),
925 vec![Arc::new(Int64Array::from(values)) as _],
926 )
927 .map_err(|e| StorageError::Lance(e.to_string()))?;
928
929 {
930 let mut metadata = self.load_metadata().await?;
931 metadata = metadata.add_file(
932 key,
933 FileInfo::new(
934 format!("{}_{}.lance", self.get_name(), key),
935 "vector",
936 (assignments.len(), 1),
937 None,
938 None,
939 ),
940 );
941 self.save_metadata(&metadata).await?;
942
943 let uri = Self::path_to_uri(&path);
944 self.write_lance_batch_async(uri, batch).await?;
945 }
946 info!("Cluster assignments saved successfully");
947 Ok(())
948 }
949
950 async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
951 let path = self.file_path("cluster_assignments");
952 info!("Loading cluster assignments from {:?}", path);
953
954 let uri = Self::path_to_uri(&path);
955 let batch = self.read_lance_first_batch_async(uri).await?;
956 let arr = batch
957 .column(0)
958 .as_any()
959 .downcast_ref::<Int64Array>()
960 .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
961
962 let assignments: Vec<Option<usize>> = (0..arr.len())
963 .map(|i| {
964 let val = arr.value(i);
965 if val < 0 { None } else { Some(val as usize) }
966 })
967 .collect();
968
969 info!("Loaded {} cluster assignments", assignments.len());
970 Ok(assignments)
971 }
972}