1use std::path::{Path, PathBuf};
8use std::sync::Arc;
9
10use arrow::array::{Float64Array, Int64Array, UInt32Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow_array::{Array as ArrowArray, RecordBatch};
13use log::{debug, info};
14use smartcore::linalg::basic::arrays::Array;
15use smartcore::linalg::basic::matrix::DenseMatrix;
16use sprs::CsMat;
17
18use crate::metadata::FileInfo;
19use crate::metadata::GeneMetadata;
20use crate::traits::backend::StorageBackend;
21use crate::traits::lance::LanceStorage;
22use crate::traits::metadata::Metadata;
23use crate::{StorageError, StorageResult};
24
25#[derive(Debug, Clone)]
31pub struct LanceStorageGraph {
32 pub(crate) _base: String,
33 pub(crate) _name: String,
34}
35
36impl LanceStorageGraph {
37 pub fn new(_base: String, _name: String) -> Self {
46 info!("Creating LanceStorage at base={}, name={}", _base, _name);
47 Self { _base, _name }
48 }
49
50 pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
52 let (exists, md_path) = Self::exists(&base_path);
54
55 if !exists || md_path.is_none() {
57 return Err(StorageError::Invalid(format!(
58 "Metadata does not exist in base path: {}",
59 base_path
60 )));
61 }
62
63 let metadata = GeneMetadata::read(md_path.unwrap()).await?;
65
66 let storage = Self::new(base_path.clone(), metadata.name_id.clone());
68 Ok((storage, metadata))
69 }
70}
71
72impl LanceStorage for LanceStorageGraph {}
73
74impl StorageBackend for LanceStorageGraph {
75 fn get_base(&self) -> String {
76 self._base.clone()
77 }
78
79 fn get_name(&self) -> String {
80 self._name.clone()
81 }
82
83 fn base_path(&self) -> PathBuf {
84 PathBuf::from(&self._base)
85 }
86
87 fn metadata_path(&self) -> PathBuf {
88 self.base_path()
89 .join(format!("{}_metadata.json", self._name))
90 }
91
92 fn basepath_to_uri(&self) -> String {
94 Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
95 }
96
97 async fn save_dense(
107 &self,
108 key: &str,
109 matrix: &DenseMatrix<f64>,
110 md_path: &Path,
111 ) -> StorageResult<()> {
112 self.validate_initialized(md_path)?;
113 let path = self.file_path(key);
114 let (n_rows, n_cols) = matrix.shape();
115
116 info!(
117 "Saving dense {} matrix: {} x {} at {:?}",
118 key, n_rows, n_cols, path
119 );
120
121 let batch = self.to_dense_record_batch(matrix)?;
123
124 if batch.num_rows() != n_rows {
126 return Err(StorageError::Invalid(format!(
127 "RecordBatch has {} rows but matrix has {} rows",
128 batch.num_rows(),
129 n_rows
130 )));
131 }
132
133 let uri = Self::path_to_uri(&path);
135 self.write_lance_batch_async(uri, batch).await?;
136
137 info!("Dense {} matrix saved successfully", key);
138 Ok(())
139 }
140
141 async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
151 let path = self.file_path(key);
152 info!("Loading dense {} matrix from {:?}", key, path);
153
154 let uri = Self::path_to_uri(&path);
156 let batch = self.read_lance_all_batches_async(uri).await?;
157
158 let matrix = self.from_dense_record_batch(&batch)?;
160
161 let (n_rows, n_cols) = matrix.shape();
162 info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
163
164 Ok(matrix)
165 }
166
167 async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
171 info!("Loading dense matrix from file (async): {:?}", path);
172
173 if !path.exists() {
174 return Err(StorageError::Invalid(format!(
175 "Dense file does not exist: {:?}",
176 path
177 )));
178 }
179
180 let extension = path
181 .extension()
182 .and_then(|e| e.to_str())
183 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
184
185 match extension {
186 "lance" => {
187 let parent = path
190 .parent()
191 .ok_or_else(|| {
192 StorageError::Invalid(format!("Path has no parent: {:?}", path))
193 })?
194 .to_str()
195 .ok_or_else(|| {
196 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
197 })?
198 .to_string();
199
200 let tmp_storage = Self::new(parent, String::from("tmp_storage"));
201
202 let uri = Self::path_to_uri(path);
204 let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
205 let matrix = tmp_storage.from_dense_record_batch(&batch)?;
206 info!(
207 "Loaded dense matrix from Lance: {} x {}",
208 matrix.shape().0,
209 matrix.shape().1
210 );
211 Ok(matrix)
212 }
213 "parquet" => {
214 use arrow::datatypes::DataType;
215 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
216 use std::fs::File;
217
218 let file = File::open(path)
220 .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
221
222 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
223 StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
224 })?;
225 let mut reader = builder.build().map_err(|e| {
226 StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
227 })?;
228
229 let mut batches = Vec::new();
230 #[allow(clippy::while_let_on_iterator)]
231 while let Some(batch) = reader.next() {
232 let batch = batch.map_err(|e| {
233 StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
234 })?;
235 batches.push(batch);
236 }
237
238 if batches.is_empty() {
239 return Err(StorageError::Invalid(format!(
240 "Empty parquet dataset at {:?}",
241 path
242 )));
243 }
244
245 let schema = batches[0].schema();
246 let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
247 StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
248 })?;
249
250 let fields = schema.fields();
252 let is_vector = fields.len() == 1
253 && matches!(
254 fields[0].data_type(),
255 DataType::FixedSizeList(inner, _)
256 if matches!(inner.data_type(), DataType::Float64)
257 );
258
259 let is_wide_col = !is_vector
260 && !fields.is_empty()
261 && fields
262 .iter()
263 .all(|f| matches!(f.data_type(), DataType::Float64))
264 && fields.iter().any(|f| f.name().starts_with("col_"));
265
266 let matrix = if is_vector {
268 let parent = path
271 .parent()
272 .ok_or_else(|| {
273 StorageError::Invalid(format!("Path has no parent: {:?}", path))
274 })?
275 .to_str()
276 .ok_or_else(|| {
277 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
278 })?
279 .to_string();
280
281 let tmp_storage = Self::new(parent, String::from("tmp_storage"));
282 tmp_storage.from_dense_record_batch(&combined)?
283 } else if is_wide_col {
284 let n_rows = combined.num_rows();
286 let n_cols = combined.num_columns();
287 if n_rows == 0 || n_cols == 0 {
288 return Err(StorageError::Invalid(format!(
289 "Cannot load empty wide-column parquet at {:?}",
290 path
291 )));
292 }
293
294 let mut data = Vec::with_capacity(n_rows * n_cols);
295 for col_idx in 0..n_cols {
296 let col = combined.column(col_idx);
297 let arr = col
298 .as_any()
299 .downcast_ref::<arrow_array::Float64Array>()
300 .ok_or_else(|| {
301 StorageError::Invalid(format!(
302 "Wide-column parquet expects Float64, got {:?} in column {}",
303 col.data_type(),
304 col_idx
305 ))
306 })?;
307 for row_idx in 0..n_rows {
309 data.push(arr.value(row_idx));
310 }
311 }
312
313 DenseMatrix::new(n_rows, n_cols, data, true)
314 .map_err(|e| StorageError::Invalid(e.to_string()))?
315 } else {
316 return Err(StorageError::Invalid(format!(
317 "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
318 or wide Float64 columns named col_*",
319 path
320 )));
321 };
322
323 info!(
324 "Loaded dense matrix from Parquet: {} x {}",
325 matrix.shape().0,
326 matrix.shape().1
327 );
328
329 Ok(matrix)
330 }
331 _ => Err(StorageError::Invalid(format!(
332 "Unsupported file format: {}. Only .lance and .parquet are supported",
333 extension
334 ))),
335 }
336 }
337
338 fn file_path(&self, key: &str) -> PathBuf {
339 self.base_path()
340 .join(format!("{}_{}.lance", self._name, key))
341 }
342
343 async fn save_sparse(
348 &self,
349 key: &str,
350 matrix: &CsMat<f64>,
351 md_path: &Path,
352 ) -> StorageResult<()> {
353 self.validate_initialized(md_path)?;
354 let path = self.file_path(key);
355 info!(
356 "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
357 key,
358 matrix.rows(),
359 matrix.cols(),
360 matrix.nnz(),
361 path
362 );
363
364 let mut metadata = self.load_metadata().await?;
365 let filetype = FileInfo::which_filetype(key);
366 metadata.files.insert(
367 key.to_string(),
368 metadata.new_fileinfo(
369 key,
370 filetype.as_str(),
371 (matrix.rows(), matrix.cols()),
372 Some(matrix.nnz()),
373 None,
374 ),
375 );
376 self.save_metadata(&metadata).await?;
377
378 let batch = self.to_sparse_record_batch(matrix)?;
379 let uri = Self::path_to_uri(&path);
380 self.write_lance_batch_async(uri, batch).await?;
381 info!("Sparse matrix {} saved successfully", filetype);
382 Ok(())
383 }
384
385 async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
386 info!("Loading sparse {} matrix", key);
387
388 let metadata = self.load_metadata().await?;
389 let filetype = FileInfo::which_filetype(key);
390 let file_info = metadata
391 .files
392 .get(key)
393 .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
394
395 let expected_rows = file_info.rows;
396 let expected_cols = file_info.cols;
397 debug!(
398 "Expected dimensions from storage metadata: {} x {}",
399 expected_rows, expected_cols
400 );
401
402 let path = self.file_path(key);
403 let uri = Self::path_to_uri(&path);
404 let batch = self.read_lance_first_batch_async(uri).await?;
405 let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
406 info!(
407 "Sparse {} matrix loaded: {} x {}, nnz={}",
408 filetype,
409 matrix.rows(),
410 matrix.cols(),
411 matrix.nnz()
412 );
413 Ok(matrix)
414 }
415
416 async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
417 self.validate_initialized(md_path)?;
418 let path = self.file_path("lambdas");
419 info!("Saving {} lambda values", lambdas.len());
420
421 let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
422 let batch = RecordBatch::try_new(
423 Arc::new(schema),
424 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
425 )
426 .map_err(|e| StorageError::Lance(e.to_string()))?;
427
428 let uri = Self::path_to_uri(&path);
429 self.write_lance_batch_async(uri, batch).await?;
430 info!("Lambda values saved successfully");
431 Ok(())
432 }
433
434 async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
435 let path = self.file_path("lambdas");
436 info!("Loading lambda values from {:?}", path);
437
438 let uri = Self::path_to_uri(&path);
439 let batch = self.read_lance_first_batch_async(uri).await?;
440 let arr = batch
441 .column(0)
442 .as_any()
443 .downcast_ref::<Float64Array>()
444 .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
445
446 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
447 info!("Loaded {} lambda values", lambdas.len());
448 Ok(lambdas)
449 }
450
451 async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
452 self.validate_initialized(md_path)?;
453 let path = self.file_path(key);
454 info!("Saving {} values for vector {}", vector.len(), key);
455
456 let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
457 let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
458 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
459 .map_err(|e| StorageError::Lance(e.to_string()))?;
460
461 let uri = Self::path_to_uri(&path);
462 self.write_lance_batch_async(uri, batch).await?;
463 info!("Index {} saved successfully", key);
464 Ok(())
465 }
466
467 async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
468 self.validate_initialized(md_path)?;
469 let path = self.file_path(key);
470 info!("Saving {} values for index {}", vector.len(), key);
471
472 let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
473 let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
474 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
475 .map_err(|e| StorageError::Lance(e.to_string()))?;
476
477 let uri = Self::path_to_uri(&path);
478 self.write_lance_batch_async(uri, batch).await?;
479 info!("Index {} saved successfully", key);
480 Ok(())
481 }
482
483 async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
484 let path = self.file_path(filename);
485 info!("Loading vector {} from {:?}", filename, path);
486
487 let uri = Self::path_to_uri(&path);
488 let batch = self.read_lance_first_batch_async(uri).await?;
489 let arr = batch
490 .column(0)
491 .as_any()
492 .downcast_ref::<Float64Array>()
493 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
494
495 let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
496 info!("Loaded {} vector values for {}", vector.len(), filename);
497 Ok(vector)
498 }
499
500 async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
501 let path = self.file_path(filename);
502 info!("Loading vector {} from {:?}", filename, path);
503
504 let uri = Self::path_to_uri(&path);
505 let batch = self.read_lance_first_batch_async(uri).await?;
506 let arr = batch
507 .column(0)
508 .as_any()
509 .downcast_ref::<UInt32Array>()
510 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
511
512 let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
513 info!("Loaded {} vector values for {}", vector.len(), filename);
514 Ok(vector)
515 }
516
517 async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
521 use tokio::fs as tokio_fs;
522
523 info!("Saving dense matrix to file (async): {:?}", path);
524
525 if let Some(parent) = path.parent() {
527 tokio_fs::try_exists(parent).await.map_err(|e| {
528 StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
529 })?;
530 }
531
532 let tmp_storage = Self::new(
534 String::from(path.parent().unwrap().to_str().unwrap()),
535 String::from("tmp_storage"),
536 );
537
538 let extension = path
539 .extension()
540 .and_then(|e| e.to_str())
541 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
542
543 let (n_rows, n_cols) = data.shape();
544 info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
545
546 match extension {
547 "lance" => {
548 let batch = tmp_storage.to_dense_record_batch(data)?;
549 debug!(
550 "Created RecordBatch with {} rows for Lance",
551 batch.num_rows()
552 );
553
554 if batch.num_rows() != n_rows {
556 return Err(StorageError::Invalid(format!(
557 "RecordBatch has {} rows but matrix has {} rows",
558 batch.num_rows(),
559 n_rows
560 )));
561 }
562
563 let uri = Self::path_to_uri(path);
564 tmp_storage.write_lance_batch_async(uri, batch).await?;
565 info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
566 Ok(())
567 }
568 "parquet" => {
569 use parquet::arrow::ArrowWriter;
570 use parquet::file::properties::WriterProperties;
571 use std::fs::File;
572
573 let batch = tmp_storage.to_dense_record_batch(data)?;
575 debug!(
576 "Created RecordBatch with {} rows for Parquet",
577 batch.num_rows()
578 );
579
580 if batch.num_rows() != n_rows {
581 return Err(StorageError::Invalid(format!(
582 "RecordBatch has {} rows but matrix has {} rows",
583 batch.num_rows(),
584 n_rows
585 )));
586 }
587
588 let file = File::create(path).map_err(|e| {
589 StorageError::Io(format!("Failed to create parquet file: {}", e))
590 })?;
591
592 let props = WriterProperties::builder()
593 .set_compression(parquet::basic::Compression::SNAPPY)
594 .build();
595
596 let mut writer =
597 ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
598 StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
599 })?;
600
601 writer
602 .write(&batch)
603 .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
604
605 writer
606 .close()
607 .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
608
609 info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
610 Ok(())
611 }
612 _ => Err(StorageError::Invalid(format!(
613 "Unsupported file format: {}. Only .lance and .parquet are supported",
614 extension
615 ))),
616 }
617 }
618
619 async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
621 self.validate_initialized(md_path)?;
622 let path = self.file_path("centroid_map");
623 info!("Saving {} centroid map entries", map.len());
624
625 let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
626 let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
627 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
628 .map_err(|e| StorageError::Lance(e.to_string()))?;
629
630 let uri = Self::path_to_uri(&path);
631 self.write_lance_batch_async(uri, batch).await?;
632 info!("Centroid map saved successfully");
633 Ok(())
634 }
635
636 async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
638 let path = self.file_path("centroid_map");
639 info!("Loading centroid map from {:?}", path);
640
641 let uri = Self::path_to_uri(&path);
642 let batch = self.read_lance_first_batch_async(uri).await?;
643 let arr = batch
644 .column(0)
645 .as_any()
646 .downcast_ref::<UInt32Array>()
647 .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
648
649 let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
650 info!("Loaded {} centroid map entries", map.len());
651 Ok(map)
652 }
653
654 async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
656 self.validate_initialized(md_path)?;
657 let path = self.file_path("subcentroid_lambdas");
658 info!("Saving {} subcentroid lambda values", lambdas.len());
659
660 let schema = Schema::new(vec![Field::new(
661 "subcentroid_lambda",
662 DataType::Float64,
663 false,
664 )]);
665 let batch = RecordBatch::try_new(
666 Arc::new(schema),
667 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
668 )
669 .map_err(|e| StorageError::Lance(e.to_string()))?;
670
671 let uri = Self::path_to_uri(&path);
672 self.write_lance_batch_async(uri, batch).await?;
673 info!("Subcentroid lambda values saved successfully");
674 Ok(())
675 }
676
677 async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
679 let path = self.file_path("subcentroid_lambdas");
680 info!("Loading subcentroid lambda values from {:?}", path);
681
682 let uri = Self::path_to_uri(&path);
683 let batch = self.read_lance_first_batch_async(uri).await?;
684 let arr = batch
685 .column(0)
686 .as_any()
687 .downcast_ref::<Float64Array>()
688 .ok_or_else(|| {
689 StorageError::Invalid("subcentroid_lambda column type mismatch".into())
690 })?;
691
692 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
693 info!("Loaded {} subcentroid lambda values", lambdas.len());
694 Ok(lambdas)
695 }
696
697 async fn save_subcentroids(
699 &self,
700 subcentroids: &DenseMatrix<f64>,
701 md_path: &Path,
702 ) -> StorageResult<()> {
703 self.validate_initialized(md_path)?;
704 let path = self.file_path("sub_centroids");
705 let (n_rows, n_cols) = subcentroids.shape();
706 info!(
707 "Saving subcentroids matrix {} x {} at {:?}",
708 n_rows, n_cols, path
709 );
710
711 let batch = self.to_dense_record_batch(subcentroids)?;
712 let uri = Self::path_to_uri(&path);
713 self.write_lance_batch_async(uri, batch).await?;
714 debug!("Subcentroids matrix saved successfully");
715 Ok(())
716 }
717
718 async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
720 let path = self.file_path("sub_centroids");
721 info!("Loading sub_centroids from {:?}", path);
722
723 let uri = Self::path_to_uri(&path);
724 let batch = self.read_lance_all_batches_async(uri).await?;
725 let matrix = self.from_dense_record_batch(&batch)?;
726
727 let (n_rows, n_cols) = matrix.shape();
729 let mut result = Vec::with_capacity(n_rows);
730
731 for row_idx in 0..n_rows {
732 let row: Vec<f64> = (0..n_cols)
733 .map(|col_idx| *matrix.get((row_idx, col_idx)))
734 .collect();
735 result.push(row);
736 }
737
738 info!(
739 "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
740 n_rows, n_cols
741 );
742 Ok(result)
743 }
744
745 async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
747 self.validate_initialized(md_path)?;
748 let path = self.file_path("item_norms");
749 info!("Saving {} item norm values", item_norms.len());
750
751 let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
752 let batch = RecordBatch::try_new(
753 Arc::new(schema),
754 vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
755 )
756 .map_err(|e| StorageError::Lance(e.to_string()))?;
757
758 let uri = Self::path_to_uri(&path);
759 self.write_lance_batch_async(uri, batch).await?;
760 info!("Item norms saved successfully");
761 Ok(())
762 }
763
764 async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
766 let path = self.file_path("item_norms");
767 info!("Loading item norms from {:?}", path);
768
769 let uri = Self::path_to_uri(&path);
770 let batch = self.read_lance_first_batch_async(uri).await?;
771 let arr = batch
772 .column(0)
773 .as_any()
774 .downcast_ref::<Float64Array>()
775 .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
776
777 let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
778 info!("Loaded {} item norm values", norms.len());
779 Ok(norms)
780 }
781
782 async fn save_cluster_assignments(
783 &self,
784 assignments: &[Option<usize>],
785 md_path: &Path,
786 ) -> StorageResult<()> {
787 self.validate_initialized(md_path)?;
788 let path = self.file_path("cluster_assignments");
789 info!("Saving {} cluster assignments", assignments.len());
790
791 let values: Vec<i64> = assignments
793 .iter()
794 .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
795 .collect();
796
797 let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
798 let batch = RecordBatch::try_new(
799 Arc::new(schema),
800 vec![Arc::new(Int64Array::from(values)) as _],
801 )
802 .map_err(|e| StorageError::Lance(e.to_string()))?;
803
804 let uri = Self::path_to_uri(&path);
805 self.write_lance_batch_async(uri, batch).await?;
806 info!("Cluster assignments saved successfully");
807 Ok(())
808 }
809
810 async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
811 let path = self.file_path("cluster_assignments");
812 info!("Loading cluster assignments from {:?}", path);
813
814 let uri = Self::path_to_uri(&path);
815 let batch = self.read_lance_first_batch_async(uri).await?;
816 let arr = batch
817 .column(0)
818 .as_any()
819 .downcast_ref::<Int64Array>()
820 .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
821
822 let assignments: Vec<Option<usize>> = (0..arr.len())
823 .map(|i| {
824 let val = arr.value(i);
825 if val < 0 { None } else { Some(val as usize) }
826 })
827 .collect();
828
829 info!("Loaded {} cluster assignments", assignments.len());
830 Ok(assignments)
831 }
832}