1use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::{Float64Array, Int64Array, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow_array::{Array as ArrowArray, RecordBatch};
13use log::{debug, info};
14use smartcore::linalg::basic::arrays::Array;
15use smartcore::linalg::basic::matrix::DenseMatrix;
16use sprs::CsMat;
17
18use crate::metadata::FileInfo;
19use crate::metadata::GeneMetadata;
20use crate::traits::backend::StorageBackend;
21use crate::traits::lance::LanceStorage;
22use crate::traits::metadata::Metadata;
23use crate::{StorageError, StorageResult};
24
25#[derive(Debug, Clone)]
31pub struct LanceStorageGraph {
32 pub(crate) _base: String,
33 pub(crate) _name: String,
34}
35
36impl LanceStorageGraph {
37 pub fn new(_base: String, _name: String) -> Self {
46 info!("Creating LanceStorage at base={}, name={}", _base, _name);
47 Self { _base, _name }
48 }
49
50 pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
52 let (exists, md_path) = <Self as StorageBackend>::exists(&base_path);
54 assert!(
55 exists && md_path.is_some(),
56 "Metadata does not exist in this base path"
57 );
58
59 let metadata = GeneMetadata::read(md_path.unwrap()).await?;
61
62 let storage = Self::new(base_path.clone(), metadata.name_id.clone());
64
65 Ok((storage, metadata))
66 }
67}
68
69impl LanceStorage for LanceStorageGraph {}
70
71impl StorageBackend for LanceStorageGraph {
72 fn get_base(&self) -> String {
73 self._base.clone()
74 }
75
76 fn get_name(&self) -> String {
77 self._name.clone()
78 }
79
80 fn base_path(&self) -> PathBuf {
81 PathBuf::from(&self._base)
82 }
83
84 fn metadata_path(&self) -> PathBuf {
85 self.base_path()
86 .join(format!("{}_metadata.json", self._name))
87 }
88
89 fn basepath_to_uri(&self) -> String {
91 Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
92 }
93
94 async fn save_dense(
106 &self,
107 key: &str,
108 matrix: &DenseMatrix<f64>,
109 md_path: &Path,
110 ) -> StorageResult<()> {
111 self.validate_initialized(md_path)?;
112 let path = self.file_path(key);
113 let (n_rows, n_cols) = matrix.shape();
114
115 info!(
116 "Saving dense {} matrix: {} x {} at {:?}",
117 key, n_rows, n_cols, path
118 );
119
120 let batch = self.to_dense_record_batch(matrix)?;
122
123 if batch.num_rows() != n_rows {
125 return Err(StorageError::Invalid(format!(
126 "RecordBatch has {} rows but matrix has {} rows",
127 batch.num_rows(),
128 n_rows
129 )));
130 }
131
132 let uri = Self::path_to_uri(&path);
134 self.write_lance_batch_async(uri, batch).await?;
135
136 info!("Dense {} matrix saved successfully", key);
137 Ok(())
138 }
139
140 async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
150 let path = self.file_path(key);
151 info!("Loading dense {} matrix from {:?}", key, path);
152
153 let uri = Self::path_to_uri(&path);
155 let batch = self.read_lance_all_batches_async(uri).await?;
156
157 let matrix = self.from_dense_record_batch(&batch)?;
159
160 let (n_rows, n_cols) = matrix.shape();
161 info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
162
163 Ok(matrix)
164 }
165
166 async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
170 info!("Loading dense matrix from file (async): {:?}", path);
171
172 if !path.exists() {
173 return Err(StorageError::Invalid(format!(
174 "Dense file does not exist: {:?}",
175 path
176 )));
177 }
178
179 let extension = path
180 .extension()
181 .and_then(|e| e.to_str())
182 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
183
184 match extension {
185 "lance" => {
186 let parent = path
189 .parent()
190 .ok_or_else(|| {
191 StorageError::Invalid(format!("Path has no parent: {:?}", path))
192 })?
193 .to_str()
194 .ok_or_else(|| {
195 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
196 })?
197 .to_string();
198
199 let tmp_storage = Self::new(parent, String::from("tmp_storage"));
200
201 let uri = Self::path_to_uri(path);
203 let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
204 let matrix = tmp_storage.from_dense_record_batch(&batch)?;
205 info!(
206 "Loaded dense matrix from Lance: {} x {}",
207 matrix.shape().0,
208 matrix.shape().1
209 );
210 Ok(matrix)
211 }
212 "parquet" => {
213 use arrow::datatypes::DataType;
214 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
215 use std::fs::File;
216
217 let file = File::open(path)
219 .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
220
221 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
222 StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
223 })?;
224 let mut reader = builder.build().map_err(|e| {
225 StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
226 })?;
227
228 let mut batches = Vec::new();
229 #[allow(clippy::while_let_on_iterator)]
230 while let Some(batch) = reader.next() {
231 let batch = batch.map_err(|e| {
232 StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
233 })?;
234 batches.push(batch);
235 }
236
237 if batches.is_empty() {
238 return Err(StorageError::Invalid(format!(
239 "Empty parquet dataset at {:?}",
240 path
241 )));
242 }
243
244 let schema = batches[0].schema();
245 let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
246 StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
247 })?;
248
249 let fields = schema.fields();
251 let is_vector = fields.len() == 1
252 && matches!(
253 fields[0].data_type(),
254 DataType::FixedSizeList(inner, _)
255 if matches!(inner.data_type(), DataType::Float64)
256 );
257
258 let is_wide_col = !is_vector
259 && !fields.is_empty()
260 && fields
261 .iter()
262 .all(|f| matches!(f.data_type(), DataType::Float64))
263 && fields.iter().any(|f| f.name().starts_with("col_"));
264
265 let matrix = if is_vector {
267 let parent = path
270 .parent()
271 .ok_or_else(|| {
272 StorageError::Invalid(format!("Path has no parent: {:?}", path))
273 })?
274 .to_str()
275 .ok_or_else(|| {
276 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
277 })?
278 .to_string();
279
280 let tmp_storage = Self::new(parent, String::from("tmp_storage"));
281 tmp_storage.from_dense_record_batch(&combined)?
282 } else if is_wide_col {
283 let n_rows = combined.num_rows();
285 let n_cols = combined.num_columns();
286 if n_rows == 0 || n_cols == 0 {
287 return Err(StorageError::Invalid(format!(
288 "Cannot load empty wide-column parquet at {:?}",
289 path
290 )));
291 }
292
293 let mut data = Vec::with_capacity(n_rows * n_cols);
294 for col_idx in 0..n_cols {
295 let col = combined.column(col_idx);
296 let arr = col
297 .as_any()
298 .downcast_ref::<arrow_array::Float64Array>()
299 .ok_or_else(|| {
300 StorageError::Invalid(format!(
301 "Wide-column parquet expects Float64, got {:?} in column {}",
302 col.data_type(),
303 col_idx
304 ))
305 })?;
306 for row_idx in 0..n_rows {
308 data.push(arr.value(row_idx));
309 }
310 }
311
312 DenseMatrix::new(n_rows, n_cols, data, true)
313 .map_err(|e| StorageError::Invalid(e.to_string()))?
314 } else {
315 return Err(StorageError::Invalid(format!(
316 "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
317 or wide Float64 columns named col_*",
318 path
319 )));
320 };
321
322 info!(
323 "Loaded dense matrix from Parquet: {} x {}",
324 matrix.shape().0,
325 matrix.shape().1
326 );
327
328 Ok(matrix)
329 }
330 _ => Err(StorageError::Invalid(format!(
331 "Unsupported file format: {}. Only .lance and .parquet are supported",
332 extension
333 ))),
334 }
335 }
336
337 fn file_path(&self, key: &str) -> PathBuf {
338 self.base_path()
339 .join(format!("{}_{}.lance", self._name, key))
340 }
341
342 async fn save_sparse(
347 &self,
348 key: &str,
349 matrix: &CsMat<f64>,
350 md_path: &Path,
351 ) -> StorageResult<()> {
352 self.validate_initialized(md_path)?;
353 let path = self.file_path(key);
354 info!(
355 "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
356 key,
357 matrix.rows(),
358 matrix.cols(),
359 matrix.nnz(),
360 path
361 );
362
363 let mut metadata = self.load_metadata().await?;
364 let filetype = FileInfo::which_filetype(key);
365 metadata.files.insert(
366 key.to_string(),
367 metadata.new_fileinfo(
368 key,
369 filetype.as_str(),
370 (matrix.rows(), matrix.cols()),
371 Some(matrix.nnz()),
372 None,
373 ),
374 );
375 self.save_metadata(&metadata).await?;
376
377 let batch = self.to_sparse_record_batch(matrix)?;
378 let uri = Self::path_to_uri(&path);
379 self.write_lance_batch_async(uri, batch).await?;
380 info!("Sparse matrix {} saved successfully", filetype);
381 Ok(())
382 }
383
384 async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
385 info!("Loading sparse {} matrix", key);
386
387 let metadata = self.load_metadata().await?;
388 let filetype = FileInfo::which_filetype(key);
389 let file_info = metadata
390 .files
391 .get(key)
392 .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
393
394 let expected_rows = file_info.rows;
395 let expected_cols = file_info.cols;
396 debug!(
397 "Expected dimensions from storage metadata: {} x {}",
398 expected_rows, expected_cols
399 );
400
401 let path = self.file_path(key);
402 let uri = Self::path_to_uri(&path);
403 let batch = self.read_lance_first_batch_async(uri).await?;
404 let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
405 info!(
406 "Sparse {} matrix loaded: {} x {}, nnz={}",
407 filetype,
408 matrix.rows(),
409 matrix.cols(),
410 matrix.nnz()
411 );
412 Ok(matrix)
413 }
414
415 async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
416 self.validate_initialized(md_path)?;
417 let path = self.file_path("lambdas");
418 info!("Saving {} lambda values", lambdas.len());
419
420 let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
421 let batch = RecordBatch::try_new(
422 Arc::new(schema),
423 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
424 )
425 .map_err(|e| StorageError::Lance(e.to_string()))?;
426
427 let uri = Self::path_to_uri(&path);
428 self.write_lance_batch_async(uri, batch).await?;
429 info!("Lambda values saved successfully");
430 Ok(())
431 }
432
433 async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
434 let path = self.file_path("lambdas");
435 info!("Loading lambda values from {:?}", path);
436
437 let uri = Self::path_to_uri(&path);
438 let batch = self.read_lance_first_batch_async(uri).await?;
439 let arr = batch
440 .column(0)
441 .as_any()
442 .downcast_ref::<Float64Array>()
443 .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
444
445 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
446 info!("Loaded {} lambda values", lambdas.len());
447 Ok(lambdas)
448 }
449
450 async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
451 self.validate_initialized(md_path)?;
452 let path = self.file_path(key);
453 info!("Saving {} values for vector {}", vector.len(), key);
454
455 let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
456 let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
457 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
458 .map_err(|e| StorageError::Lance(e.to_string()))?;
459
460 let uri = Self::path_to_uri(&path);
461 self.write_lance_batch_async(uri, batch).await?;
462 info!("Index {} saved successfully", key);
463 Ok(())
464 }
465
466 async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
467 self.validate_initialized(md_path)?;
468 let path = self.file_path(key);
469 info!("Saving {} values for index {}", vector.len(), key);
470
471 let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
472 let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
473 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
474 .map_err(|e| StorageError::Lance(e.to_string()))?;
475
476 let uri = Self::path_to_uri(&path);
477 self.write_lance_batch_async(uri, batch).await?;
478 info!("Index {} saved successfully", key);
479 Ok(())
480 }
481
482 async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
483 let path = self.file_path(filename);
484 info!("Loading vector {} from {:?}", filename, path);
485
486 let uri = Self::path_to_uri(&path);
487 let batch = self.read_lance_first_batch_async(uri).await?;
488 let arr = batch
489 .column(0)
490 .as_any()
491 .downcast_ref::<Float64Array>()
492 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
493
494 let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
495 info!("Loaded {} vector values for {}", vector.len(), filename);
496 Ok(vector)
497 }
498
499 async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
500 let path = self.file_path(filename);
501 info!("Loading vector {} from {:?}", filename, path);
502
503 let uri = Self::path_to_uri(&path);
504 let batch = self.read_lance_first_batch_async(uri).await?;
505 let arr = batch
506 .column(0)
507 .as_any()
508 .downcast_ref::<UInt32Array>()
509 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
510
511 let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
512 info!("Loaded {} vector values for {}", vector.len(), filename);
513 Ok(vector)
514 }
515
516 async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
520 use tokio::fs as tokio_fs;
521
522 info!("Saving dense matrix to file (async): {:?}", path);
523
524 if let Some(parent) = path.parent() {
526 tokio_fs::try_exists(parent).await.map_err(|e| {
527 StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
528 })?;
529 }
530
531 let tmp_storage = Self::new(
533 String::from(path.parent().unwrap().to_str().unwrap()),
534 String::from("tmp_storage"),
535 );
536
537 let extension = path
538 .extension()
539 .and_then(|e| e.to_str())
540 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
541
542 let (n_rows, n_cols) = data.shape();
543 info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
544
545 match extension {
546 "lance" => {
547 let batch = tmp_storage.to_dense_record_batch(data)?;
548 debug!(
549 "Created RecordBatch with {} rows for Lance",
550 batch.num_rows()
551 );
552
553 if batch.num_rows() != n_rows {
555 return Err(StorageError::Invalid(format!(
556 "RecordBatch has {} rows but matrix has {} rows",
557 batch.num_rows(),
558 n_rows
559 )));
560 }
561
562 let uri = Self::path_to_uri(path);
563 tmp_storage.write_lance_batch_async(uri, batch).await?;
564 info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
565 Ok(())
566 }
567 "parquet" => {
568 use parquet::arrow::ArrowWriter;
569 use parquet::file::properties::WriterProperties;
570 use std::fs::File;
571
572 let batch = tmp_storage.to_dense_record_batch(data)?;
574 debug!(
575 "Created RecordBatch with {} rows for Parquet",
576 batch.num_rows()
577 );
578
579 if batch.num_rows() != n_rows {
580 return Err(StorageError::Invalid(format!(
581 "RecordBatch has {} rows but matrix has {} rows",
582 batch.num_rows(),
583 n_rows
584 )));
585 }
586
587 let file = File::create(path).map_err(|e| {
588 StorageError::Io(format!("Failed to create parquet file: {}", e))
589 })?;
590
591 let props = WriterProperties::builder()
592 .set_compression(parquet::basic::Compression::SNAPPY)
593 .build();
594
595 let mut writer =
596 ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
597 StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
598 })?;
599
600 writer
601 .write(&batch)
602 .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
603
604 writer
605 .close()
606 .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
607
608 info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
609 Ok(())
610 }
611 _ => Err(StorageError::Invalid(format!(
612 "Unsupported file format: {}. Only .lance and .parquet are supported",
613 extension
614 ))),
615 }
616 }
617
618 async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
620 self.validate_initialized(md_path)?;
621 let path = self.file_path("centroid_map");
622 info!("Saving {} centroid map entries", map.len());
623
624 let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
625 let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
626 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
627 .map_err(|e| StorageError::Lance(e.to_string()))?;
628
629 let uri = Self::path_to_uri(&path);
630 self.write_lance_batch_async(uri, batch).await?;
631 info!("Centroid map saved successfully");
632 Ok(())
633 }
634
635 async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
637 let path = self.file_path("centroid_map");
638 info!("Loading centroid map from {:?}", path);
639
640 let uri = Self::path_to_uri(&path);
641 let batch = self.read_lance_first_batch_async(uri).await?;
642 let arr = batch
643 .column(0)
644 .as_any()
645 .downcast_ref::<UInt32Array>()
646 .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
647
648 let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
649 info!("Loaded {} centroid map entries", map.len());
650 Ok(map)
651 }
652
653 async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
655 self.validate_initialized(md_path)?;
656 let path = self.file_path("subcentroid_lambdas");
657 info!("Saving {} subcentroid lambda values", lambdas.len());
658
659 let schema = Schema::new(vec![Field::new(
660 "subcentroid_lambda",
661 DataType::Float64,
662 false,
663 )]);
664 let batch = RecordBatch::try_new(
665 Arc::new(schema),
666 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
667 )
668 .map_err(|e| StorageError::Lance(e.to_string()))?;
669
670 let uri = Self::path_to_uri(&path);
671 self.write_lance_batch_async(uri, batch).await?;
672 info!("Subcentroid lambda values saved successfully");
673 Ok(())
674 }
675
676 async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
678 let path = self.file_path("subcentroid_lambdas");
679 info!("Loading subcentroid lambda values from {:?}", path);
680
681 let uri = Self::path_to_uri(&path);
682 let batch = self.read_lance_first_batch_async(uri).await?;
683 let arr = batch
684 .column(0)
685 .as_any()
686 .downcast_ref::<Float64Array>()
687 .ok_or_else(|| {
688 StorageError::Invalid("subcentroid_lambda column type mismatch".into())
689 })?;
690
691 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
692 info!("Loaded {} subcentroid lambda values", lambdas.len());
693 Ok(lambdas)
694 }
695
696 async fn save_subcentroids(
698 &self,
699 subcentroids: &DenseMatrix<f64>,
700 md_path: &Path,
701 ) -> StorageResult<()> {
702 self.validate_initialized(md_path)?;
703 let path = self.file_path("sub_centroids");
704 let (n_rows, n_cols) = subcentroids.shape();
705 info!(
706 "Saving subcentroids matrix {} x {} at {:?}",
707 n_rows, n_cols, path
708 );
709
710 let batch = self.to_dense_record_batch(subcentroids)?;
711 let uri = Self::path_to_uri(&path);
712 self.write_lance_batch_async(uri, batch).await?;
713 debug!("Subcentroids matrix saved successfully");
714 Ok(())
715 }
716
717 async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
719 let path = self.file_path("sub_centroids");
720 info!("Loading sub_centroids from {:?}", path);
721
722 let uri = Self::path_to_uri(&path);
723 let batch = self.read_lance_all_batches_async(uri).await?;
724 let matrix = self.from_dense_record_batch(&batch)?;
725
726 let (n_rows, n_cols) = matrix.shape();
728 let mut result = Vec::with_capacity(n_rows);
729
730 for row_idx in 0..n_rows {
731 let row: Vec<f64> = (0..n_cols)
732 .map(|col_idx| *matrix.get((row_idx, col_idx)))
733 .collect();
734 result.push(row);
735 }
736
737 info!(
738 "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
739 n_rows, n_cols
740 );
741 Ok(result)
742 }
743
744 async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
746 self.validate_initialized(md_path)?;
747 let path = self.file_path("item_norms");
748 info!("Saving {} item norm values", item_norms.len());
749
750 let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
751 let batch = RecordBatch::try_new(
752 Arc::new(schema),
753 vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
754 )
755 .map_err(|e| StorageError::Lance(e.to_string()))?;
756
757 let uri = Self::path_to_uri(&path);
758 self.write_lance_batch_async(uri, batch).await?;
759 info!("Item norms saved successfully");
760 Ok(())
761 }
762
763 async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
765 let path = self.file_path("item_norms");
766 info!("Loading item norms from {:?}", path);
767
768 let uri = Self::path_to_uri(&path);
769 let batch = self.read_lance_first_batch_async(uri).await?;
770 let arr = batch
771 .column(0)
772 .as_any()
773 .downcast_ref::<Float64Array>()
774 .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
775
776 let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
777 info!("Loaded {} item norm values", norms.len());
778 Ok(norms)
779 }
780
781 async fn save_cluster_assignments(
782 &self,
783 assignments: &[Option<usize>],
784 md_path: &Path,
785 ) -> StorageResult<()> {
786 self.validate_initialized(md_path)?;
787 let path = self.file_path("cluster_assignments");
788 info!("Saving {} cluster assignments", assignments.len());
789
790 let values: Vec<i64> = assignments
792 .iter()
793 .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
794 .collect();
795
796 let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
797 let batch = RecordBatch::try_new(
798 Arc::new(schema),
799 vec![Arc::new(Int64Array::from(values)) as _],
800 )
801 .map_err(|e| StorageError::Lance(e.to_string()))?;
802
803 let uri = Self::path_to_uri(&path);
804 self.write_lance_batch_async(uri, batch).await?;
805 info!("Cluster assignments saved successfully");
806 Ok(())
807 }
808
809 async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
810 let path = self.file_path("cluster_assignments");
811 info!("Loading cluster assignments from {:?}", path);
812
813 let uri = Self::path_to_uri(&path);
814 let batch = self.read_lance_first_batch_async(uri).await?;
815 let arr = batch
816 .column(0)
817 .as_any()
818 .downcast_ref::<Int64Array>()
819 .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
820
821 let assignments: Vec<Option<usize>> = (0..arr.len())
822 .map(|i| {
823 let val = arr.value(i);
824 if val < 0 { None } else { Some(val as usize) }
825 })
826 .collect();
827
828 info!("Loaded {} cluster assignments", assignments.len());
829 Ok(assignments)
830 }
831}