1use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use arrow::array::{Float64Array, Int64Array, UInt32Array};
12use arrow::datatypes::{DataType, Field, Schema};
13use arrow_array::{Array as ArrowArray, FixedSizeListArray, RecordBatch, RecordBatchIterator};
14use futures::StreamExt;
15use lance::dataset::{Dataset, WriteMode, WriteParams};
16use log::{debug, info, trace};
17use smartcore::linalg::basic::arrays::Array;
18use smartcore::linalg::basic::matrix::DenseMatrix;
19use sprs::{CsMat, TriMat};
20
21use crate::metadata::FileInfo;
22use crate::metadata::GeneMetadata;
23use crate::traits::StorageBackend;
24use crate::{StorageError, StorageResult};
25
26#[derive(Debug, Clone)]
32pub struct LanceStorage {
33 pub(crate) _base: String,
34 pub(crate) _name: String,
35}
36
37impl LanceStorage {
38 pub fn new(_base: String, _name: String) -> Self {
47 info!("Creating LanceStorage at base={}, name={}", _base, _name);
48 Self { _base, _name }
49 }
50
51 fn validate_initialized(&self, md_path: &Path) -> StorageResult<()> {
57 assert_eq!(self.metadata_path(), *md_path);
58 if !md_path.exists() {
59 return Err(StorageError::Invalid(format!(
60 "Storage not initialized: metadata file missing at {:?}. \
61Call save_metadata() or save_eigenmaps_all()/save_energymaps_all() first.",
62 md_path
63 )));
64 }
65 Ok(())
66 }
67
68 fn to_dense_record_batch(
77 &self,
78 matrix: &DenseMatrix<f64>,
79 ) -> Result<RecordBatch, StorageError> {
80 let (rows, cols) = (matrix.shape().0, matrix.shape().1);
81
82 debug!(
83 "Converting dense matrix to RecordBatch (vector format): {}x{}",
84 rows, cols
85 );
86
87 if rows == 0 || cols == 0 {
88 return Err(StorageError::Invalid(
89 "Cannot convert empty matrix to RecordBatch".to_string(),
90 ));
91 }
92
93 let mut values: Vec<f64> = Vec::with_capacity(rows * cols);
95 for r in 0..rows {
96 for c in 0..cols {
97 values.push(*matrix.get((r, c)));
98 }
99 }
100
101 let value_field = Field::new("item", DataType::Float64, false);
103 let list_field = Field::new(
104 "vector",
105 DataType::FixedSizeList(Arc::new(value_field), cols as i32),
106 false,
107 );
108
109 let schema = Schema::new(vec![list_field]);
110
111 let values_array = Float64Array::from(values);
113 let list_array = FixedSizeListArray::new(
114 Arc::new(Field::new("item", DataType::Float64, false)),
115 cols as i32,
116 Arc::new(values_array),
117 None, );
119
120 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)])
121 .map_err(|e| StorageError::Lance(e.to_string()))?;
122
123 trace!(
124 "RecordBatch created with {} rows (vectors of length {})",
125 batch.num_rows(),
126 cols
127 );
128
129 Ok(batch)
130 }
131
132 fn from_dense_record_batch(
140 &self,
141 batch: &RecordBatch,
142 ) -> Result<DenseMatrix<f64>, StorageError> {
143 use std::mem;
144
145 debug!("Reconstructing dense matrix from RecordBatch (vector format)");
146 debug!("Batch has {} columns", batch.num_columns());
147
148 if batch.num_columns() != 1 {
149 return Err(StorageError::Invalid(format!(
150 "Expected Lance row-major format with 1 FixedSizeList<Float64> column, but found {} columns. \
151 This parquet file appears to be in wide format (feature-per-column). \
152 Convert it first using: \
153 `python -c \"import pyarrow.parquet as pq; import pyarrow.compute as pc; \
154 tbl = pq.read_table('input.parquet'); \
155 import pyarrow as pa; \
156 vectors = pa.array([row.as_py() for row in tbl.to_pylist()], type=pa.list_(pa.float64(), len(tbl.column_names))); \
157 new_tbl = pa.table({{'vector': vectors}}); \
158 pq.write_table(new_tbl, 'output.parquet')\"` \
159 or use a Lance-native writer in your data pipeline.",
160 batch.num_columns()
161 )));
162 }
163
164 debug!("Extracting FixedSizeList column");
165 let column = batch.column(0);
166 let list_array = column
167 .as_any()
168 .downcast_ref::<FixedSizeListArray>()
169 .ok_or_else(|| {
170 StorageError::Invalid(format!(
171 "Column 0 is not FixedSizeList (found type: {:?}). \
172 Expected Lance row-major format with a single FixedSizeList<Float64> column.",
173 column.data_type()
174 ))
175 })?;
176
177 let rows = list_array.len();
178 let cols = list_array.value_length() as usize;
179
180 debug!("Matrix dimensions: {}x{}", rows, cols);
181
182 let total = rows
184 .checked_mul(cols)
185 .ok_or_else(|| StorageError::Invalid("Matrix size overflow (rows*cols)".to_string()))?;
186 let bytes = total
187 .checked_mul(mem::size_of::<f64>())
188 .ok_or_else(|| StorageError::Invalid("Byte size overflow".to_string()))?;
189
190 const MAX_BYTES: usize = 4usize * 1024 * 1024 * 1024; if bytes > MAX_BYTES {
192 return Err(StorageError::Invalid(format!(
193 "Dense load would allocate {} bytes for {}x{} matrix; exceeds 4GiB cap. \
194 Enable --reduce-dim or shard your input data.",
195 bytes, rows, cols
196 )));
197 }
198
199 let values_array = list_array
201 .values()
202 .as_any()
203 .downcast_ref::<Float64Array>()
204 .ok_or_else(|| {
205 StorageError::Invalid("FixedSizeList values are not Float64Array".to_string())
206 })?;
207
208 debug!("Converting row-major to column-major");
209 let mut data = vec![0.0f64; total];
210 for r in 0..rows {
211 for c in 0..cols {
212 let row_major_idx = r * cols + c;
213 let col_major_idx = c * rows + r;
214 data[col_major_idx] = values_array.value(row_major_idx);
215 }
216 }
217
218 debug!("Creating DenseMatrix");
219 DenseMatrix::new(rows, cols, data, true).map_err(|e| StorageError::Invalid(e.to_string()))
220 }
221
222 fn to_sparse_record_batch(&self, m: &CsMat<f64>) -> StorageResult<RecordBatch> {
226 debug!(
227 "Converting sparse matrix to RecordBatch: {} x {}, nnz={}",
228 m.rows(),
229 m.cols(),
230 m.nnz()
231 );
232
233 let mut row_idx = Vec::with_capacity(m.nnz());
234 let mut col_idx = Vec::with_capacity(m.nnz());
235 let mut vals = Vec::with_capacity(m.nnz());
236
237 for (v, (r, c)) in m.iter() {
238 row_idx.push(r as u32);
239 col_idx.push(c as u32);
240 vals.push(*v);
241 }
242
243 let mut schema_metadata = std::collections::HashMap::new();
245 schema_metadata.insert("rows".to_string(), m.rows().to_string());
246 schema_metadata.insert("cols".to_string(), m.cols().to_string());
247 schema_metadata.insert("nnz".to_string(), m.nnz().to_string());
248
249 let schema = Schema::new(vec![
250 Field::new("row", DataType::UInt32, false),
251 Field::new("col", DataType::UInt32, false),
252 Field::new("value", DataType::Float64, false),
253 ])
254 .with_metadata(schema_metadata);
255
256 let batch = RecordBatch::try_new(
257 Arc::new(schema),
258 vec![
259 Arc::new(UInt32Array::from(row_idx)) as _,
260 Arc::new(UInt32Array::from(col_idx)) as _,
261 Arc::new(Float64Array::from(vals)) as _,
262 ],
263 )
264 .map_err(|e| StorageError::Lance(e.to_string()))?;
265
266 trace!(
267 "Sparse RecordBatch created with {} entries",
268 batch.num_rows()
269 );
270 Ok(batch)
271 }
272
273 #[allow(clippy::wrong_self_convention)]
278 fn from_sparse_record_batch(
279 &self,
280 batch: RecordBatch,
281 expected_rows: usize,
282 expected_cols: usize,
283 ) -> StorageResult<CsMat<f64>> {
284 use arrow::array::UInt32Array;
285
286 debug!("Reconstructing sparse matrix from RecordBatch");
287
288 let row_arr = batch
289 .column(0)
290 .as_any()
291 .downcast_ref::<UInt32Array>()
292 .ok_or_else(|| StorageError::Invalid("row column type mismatch".into()))?;
293 let col_arr = batch
294 .column(1)
295 .as_any()
296 .downcast_ref::<UInt32Array>()
297 .ok_or_else(|| StorageError::Invalid("col column type mismatch".into()))?;
298 let val_arr = batch
299 .column(2)
300 .as_any()
301 .downcast_ref::<Float64Array>()
302 .ok_or_else(|| StorageError::Invalid("value column type mismatch".into()))?;
303
304 let n = row_arr.len();
305 if n == 0 {
306 debug!(
307 "Empty RecordBatch, returning {}x{} sparse matrix",
308 expected_rows, expected_cols
309 );
310 return Ok(CsMat::zero((expected_rows, expected_cols)));
311 }
312
313 let schema = batch.schema();
315 let schema_metadata = schema.metadata();
316 if let (Some(rows_str), Some(cols_str)) =
317 (schema_metadata.get("rows"), schema_metadata.get("cols"))
318 {
319 let schema_rows = rows_str.parse::<usize>().ok();
320 let schema_cols = cols_str.parse::<usize>().ok();
321 if schema_rows != Some(expected_rows) || schema_cols != Some(expected_cols) {
322 panic!(
323 "Schema metadata dimensions ({:?}x{:?}) don't match storage metadata ({}x{})",
324 schema_rows, schema_cols, expected_rows, expected_cols
325 );
326 } else {
327 debug!(
328 "Schema metadata matches storage metadata: {}x{}",
329 expected_rows, expected_cols
330 );
331 }
332 }
333
334 let rows = expected_rows;
335 let cols = expected_cols;
336 debug!(
337 "Reconstructing {}x{} sparse matrix from {} entries",
338 rows, cols, n
339 );
340
341 let mut trimat = TriMat::new((rows, cols));
342 for i in 0..n {
343 let r = row_arr.value(i) as usize;
344 let c = col_arr.value(i) as usize;
345 let v = val_arr.value(i);
346
347 if r >= rows || c >= cols {
348 return Err(StorageError::Invalid(format!(
349 "Index out of bounds: ({}, {}) in {}x{} matrix",
350 r, c, rows, cols
351 )));
352 }
353 trimat.add_triplet(r, c, v);
354 }
355
356 let result = trimat.to_csr();
357 if result.rows() != rows || result.cols() != cols {
358 return Err(StorageError::Invalid(format!(
359 "Dimension mismatch after reconstruction: expected {}x{}, got {}x{}",
360 rows,
361 cols,
362 result.rows(),
363 result.cols()
364 )));
365 }
366
367 Ok(result)
368 }
369
370 async fn write_lance_batch_async(&self, path: &Path, batch: RecordBatch) -> StorageResult<()> {
372 let uri = Self::path_to_uri(path);
373 info!("Writing Lance dataset to {}", uri);
374
375 let schema = batch.schema();
376 let batches = vec![batch];
377 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
378
379 let params = WriteParams {
380 mode: WriteMode::Create,
381 ..WriteParams::default()
382 };
383
384 Dataset::write(reader, &uri, Some(params))
385 .await
386 .map_err(|e| StorageError::Lance(e.to_string()))?;
387
388 info!("Successfully wrote Lance dataset to {}", uri);
389 Ok(())
390 }
391
392 async fn read_lance_all_batches_async(&self, path: &Path) -> StorageResult<RecordBatch> {
394 let uri = Self::path_to_uri(path);
395 info!("Reading Lance dataset from {}", uri);
396
397 let dataset = Dataset::open(&uri)
398 .await
399 .map_err(|e| StorageError::Lance(e.to_string()))?;
400 let scanner = dataset.scan();
401 let mut stream = scanner
402 .try_into_stream()
403 .await
404 .map_err(|e| StorageError::Lance(e.to_string()))?;
405
406 let mut batches = Vec::new();
407 while let Some(batch_result) = stream.next().await {
408 let batch = batch_result.map_err(|e| StorageError::Lance(e.to_string()))?;
409 batches.push(batch);
410 }
411
412 if batches.is_empty() {
413 return Err(StorageError::Invalid("Empty Lance dataset".into()));
414 }
415
416 let schema = batches[0].schema();
417 let combined = arrow::compute::concat_batches(&schema, &batches)
418 .map_err(|e| StorageError::Lance(format!("Failed to concatenate batches: {}", e)))?;
419
420 debug!(
421 "Combined Lance batch for {:?} has {} rows",
422 path,
423 combined.num_rows()
424 );
425 Ok(combined)
426 }
427
428 async fn read_lance_first_batch_async(&self, path: &Path) -> StorageResult<RecordBatch> {
430 let uri = Self::path_to_uri(path);
431 info!("Reading first batch from Lance dataset {}", uri);
432
433 let dataset = Dataset::open(&uri)
434 .await
435 .map_err(|e| StorageError::Lance(e.to_string()))?;
436 let scanner = dataset.scan();
437 let mut stream = scanner
438 .try_into_stream()
439 .await
440 .map_err(|e| StorageError::Lance(e.to_string()))?;
441
442 let batch = stream
443 .next()
444 .await
445 .ok_or_else(|| StorageError::Lance("empty Lance dataset".to_string()))?
446 .map_err(|e| StorageError::Lance(e.to_string()))?;
447
448 debug!(
449 "Read first RecordBatch for path {:?} with {} rows",
450 path,
451 batch.num_rows()
452 );
453 Ok(batch)
454 }
455}
456
457impl StorageBackend for LanceStorage {
458 fn get_base(&self) -> String {
459 self._base.clone()
460 }
461
462 fn get_name(&self) -> String {
463 self._name.clone()
464 }
465
466 fn base_path(&self) -> PathBuf {
467 PathBuf::from(&self._base)
468 }
469
470 fn metadata_path(&self) -> PathBuf {
471 self.base_path()
472 .join(format!("{}_metadata.json", self._name))
473 }
474
475 fn path_to_uri(path: &Path) -> String {
477 let abs_path = path.canonicalize().unwrap_or_else(|_| {
479 if path.is_absolute() {
480 path.to_path_buf()
481 } else {
482 std::env::current_dir()
483 .unwrap_or_else(|_| PathBuf::from("/"))
484 .join(path)
485 }
486 });
487 format!("file://{}", abs_path.display())
488 }
489
490 async fn save_dense(
502 &self,
503 key: &str,
504 matrix: &DenseMatrix<f64>,
505 md_path: &Path,
506 ) -> StorageResult<()> {
507 self.validate_initialized(md_path)?;
508 let path = self.file_path(key);
509 let (n_rows, n_cols) = matrix.shape();
510
511 info!(
512 "Saving dense {} matrix: {} x {} at {:?}",
513 key, n_rows, n_cols, path
514 );
515
516 let batch = self.to_dense_record_batch(matrix)?;
518
519 if batch.num_rows() != n_rows {
521 return Err(StorageError::Invalid(format!(
522 "RecordBatch has {} rows but matrix has {} rows",
523 batch.num_rows(),
524 n_rows
525 )));
526 }
527
528 self.write_lance_batch_async(&path, batch).await?;
530
531 info!("Dense {} matrix saved successfully", key);
532 Ok(())
533 }
534
535 async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
545 let path = self.file_path(key);
546 info!("Loading dense {} matrix from {:?}", key, path);
547
548 let batch = self.read_lance_all_batches_async(&path).await?;
550
551 let matrix = self.from_dense_record_batch(&batch)?;
553
554 let (n_rows, n_cols) = matrix.shape();
555 info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
556
557 Ok(matrix)
558 }
559
560 async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
564 info!("Loading dense matrix from file (async): {:?}", path);
565
566 if !path.exists() {
567 return Err(StorageError::Invalid(format!(
568 "Dense file does not exist: {:?}",
569 path
570 )));
571 }
572
573 let extension = path
574 .extension()
575 .and_then(|e| e.to_str())
576 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
577
578 match extension {
579 "lance" => {
580 let parent = path
583 .parent()
584 .ok_or_else(|| {
585 StorageError::Invalid(format!("Path has no parent: {:?}", path))
586 })?
587 .to_str()
588 .ok_or_else(|| {
589 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
590 })?
591 .to_string();
592
593 let tmp_storage =
594 crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
595
596 let batch = tmp_storage.read_lance_all_batches_async(path).await?;
598 let matrix = tmp_storage.from_dense_record_batch(&batch)?;
599 info!(
600 "Loaded dense matrix from Lance: {} x {}",
601 matrix.shape().0,
602 matrix.shape().1
603 );
604 Ok(matrix)
605 }
606 "parquet" => {
607 use arrow::datatypes::DataType;
608 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
609 use std::fs::File;
610
611 let file = File::open(path)
613 .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
614
615 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
616 StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
617 })?;
618 let mut reader = builder.build().map_err(|e| {
619 StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
620 })?;
621
622 let mut batches = Vec::new();
623 #[allow(clippy::while_let_on_iterator)]
624 while let Some(batch) = reader.next() {
625 let batch = batch.map_err(|e| {
626 StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
627 })?;
628 batches.push(batch);
629 }
630
631 if batches.is_empty() {
632 return Err(StorageError::Invalid(format!(
633 "Empty parquet dataset at {:?}",
634 path
635 )));
636 }
637
638 let schema = batches[0].schema();
639 let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
640 StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
641 })?;
642
643 let fields = schema.fields();
645 let is_vector = fields.len() == 1
646 && matches!(
647 fields[0].data_type(),
648 DataType::FixedSizeList(inner, _)
649 if matches!(inner.data_type(), DataType::Float64)
650 );
651
652 let is_wide_col = !is_vector
653 && !fields.is_empty()
654 && fields
655 .iter()
656 .all(|f| matches!(f.data_type(), DataType::Float64))
657 && fields.iter().any(|f| f.name().starts_with("col_"));
658
659 let matrix = if is_vector {
661 let parent = path
664 .parent()
665 .ok_or_else(|| {
666 StorageError::Invalid(format!("Path has no parent: {:?}", path))
667 })?
668 .to_str()
669 .ok_or_else(|| {
670 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
671 })?
672 .to_string();
673
674 let tmp_storage =
675 crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
676 tmp_storage.from_dense_record_batch(&combined)?
677 } else if is_wide_col {
678 let n_rows = combined.num_rows();
680 let n_cols = combined.num_columns();
681 if n_rows == 0 || n_cols == 0 {
682 return Err(StorageError::Invalid(format!(
683 "Cannot load empty wide-column parquet at {:?}",
684 path
685 )));
686 }
687
688 let mut data = Vec::with_capacity(n_rows * n_cols);
689 for col_idx in 0..n_cols {
690 let col = combined.column(col_idx);
691 let arr = col
692 .as_any()
693 .downcast_ref::<arrow_array::Float64Array>()
694 .ok_or_else(|| {
695 StorageError::Invalid(format!(
696 "Wide-column parquet expects Float64, got {:?} in column {}",
697 col.data_type(),
698 col_idx
699 ))
700 })?;
701 for row_idx in 0..n_rows {
703 data.push(arr.value(row_idx));
704 }
705 }
706
707 DenseMatrix::new(n_rows, n_cols, data, true)
708 .map_err(|e| StorageError::Invalid(e.to_string()))?
709 } else {
710 return Err(StorageError::Invalid(format!(
711 "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
712 or wide Float64 columns named col_*",
713 path
714 )));
715 };
716
717 info!(
718 "Loaded dense matrix from Parquet: {} x {}",
719 matrix.shape().0,
720 matrix.shape().1
721 );
722
723 Ok(matrix)
724 }
725 _ => Err(StorageError::Invalid(format!(
726 "Unsupported file format: {}. Only .lance and .parquet are supported",
727 extension
728 ))),
729 }
730 }
731
732 fn file_path(&self, key: &str) -> PathBuf {
733 self.base_path()
734 .join(format!("{}_{}.lance", self._name, key))
735 }
736
737 async fn save_sparse(
742 &self,
743 key: &str,
744 matrix: &CsMat<f64>,
745 md_path: &Path,
746 ) -> StorageResult<()> {
747 self.validate_initialized(md_path)?;
748 let path = self.file_path(key);
749 info!(
750 "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
751 key,
752 matrix.rows(),
753 matrix.cols(),
754 matrix.nnz(),
755 path
756 );
757
758 let mut metadata = self.load_metadata().await?;
759 let filetype = FileInfo::which_filetype(key);
760 metadata.files.insert(
761 key.to_string(),
762 FileInfo {
763 filename: format!("{}_{}.lance", self.get_name(), key),
764 filetype: filetype.to_string(),
765 storage_format: FileInfo::which_format(&filetype),
766 rows: matrix.rows(),
767 cols: matrix.cols(),
768 nnz: Some(matrix.nnz()),
769 size_bytes: None,
770 },
771 );
772 self.save_metadata(&metadata).await?;
773
774 let batch = self.to_sparse_record_batch(matrix)?;
775 self.write_lance_batch_async(&path, batch).await?;
776 info!("Sparse matrix {} saved successfully", filetype);
777 Ok(())
778 }
779
780 async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
781 info!("Loading sparse {} matrix", key);
782
783 let metadata = self.load_metadata().await?;
784 let filetype = FileInfo::which_filetype(key);
785 let file_info = metadata
786 .files
787 .get(key)
788 .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
789
790 let expected_rows = file_info.rows;
791 let expected_cols = file_info.cols;
792 debug!(
793 "Expected dimensions from storage metadata: {} x {}",
794 expected_rows, expected_cols
795 );
796
797 let path = self.file_path(key);
798 let batch = self.read_lance_first_batch_async(&path).await?;
799 let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
800 info!(
801 "Sparse {} matrix loaded: {} x {}, nnz={}",
802 filetype,
803 matrix.rows(),
804 matrix.cols(),
805 matrix.nnz()
806 );
807 Ok(matrix)
808 }
809
810 async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
811 self.validate_initialized(md_path)?;
812 let path = self.file_path("lambdas");
813 info!("Saving {} lambda values", lambdas.len());
814
815 let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
816 let batch = RecordBatch::try_new(
817 Arc::new(schema),
818 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
819 )
820 .map_err(|e| StorageError::Lance(e.to_string()))?;
821
822 self.write_lance_batch_async(&path, batch).await?;
823 info!("Lambda values saved successfully");
824 Ok(())
825 }
826
827 async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
828 let path = self.file_path("lambdas");
829 info!("Loading lambda values from {:?}", path);
830
831 let batch = self.read_lance_first_batch_async(&path).await?;
832 let arr = batch
833 .column(0)
834 .as_any()
835 .downcast_ref::<Float64Array>()
836 .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
837
838 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
839 info!("Loaded {} lambda values", lambdas.len());
840 Ok(lambdas)
841 }
842
843 async fn save_metadata(&self, metadata: &GeneMetadata) -> StorageResult<PathBuf> {
844 let path = self.metadata_path();
845 info!("Saving metadata to {:?}", path);
846 fs::create_dir_all(self.base_path()).map_err(|e| StorageError::Io(e.to_string()))?;
847 let s = serde_json::to_string_pretty(metadata).map_err(StorageError::Serde)?;
848 fs::write(&path, s).map_err(|e| StorageError::Io(e.to_string()))?;
849 info!("Metadata saved successfully");
850 Ok(path)
851 }
852
853 async fn load_metadata(&self) -> StorageResult<GeneMetadata> {
854 let filename = self.metadata_path();
855 info!("Loading metadata from {:?}", filename);
856 let s = fs::read_to_string(filename).map_err(|e| StorageError::Io(e.to_string()))?;
857 let md: GeneMetadata = serde_json::from_str(&s).map_err(StorageError::Serde)?;
858 info!("Metadata loaded successfully");
859 Ok(md)
860 }
861
862 async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
863 self.validate_initialized(md_path)?;
864 let path = self.file_path(key);
865 info!("Saving {} values for vector {}", vector.len(), key);
866
867 let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
868 let float64_array = Float64Array::from_iter_values(vector.iter().map(|&x| x as f64));
869 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
870 .map_err(|e| StorageError::Lance(e.to_string()))?;
871
872 self.write_lance_batch_async(&path, batch).await?;
873 info!("Index {} saved successfully", key);
874 Ok(())
875 }
876
877 async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
878 self.validate_initialized(md_path)?;
879 let path = self.file_path(key);
880 info!("Saving {} values for index {}", vector.len(), key);
881
882 let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
883 let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
884 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
885 .map_err(|e| StorageError::Lance(e.to_string()))?;
886
887 self.write_lance_batch_async(&path, batch).await?;
888 info!("Index {} saved successfully", key);
889 Ok(())
890 }
891
892 async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
893 let path = self.file_path(filename);
894 info!("Loading vector {} from {:?}", filename, path);
895
896 let batch = self.read_lance_first_batch_async(&path).await?;
897 let arr = batch
898 .column(0)
899 .as_any()
900 .downcast_ref::<Float64Array>()
901 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
902
903 let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i) as f64).collect();
904 info!("Loaded {} vector values for {}", vector.len(), filename);
905 Ok(vector)
906 }
907
908 async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
909 let path = self.file_path(filename);
910 info!("Loading vector {} from {:?}", filename, path);
911
912 let batch = self.read_lance_first_batch_async(&path).await?;
913 let arr = batch
914 .column(0)
915 .as_any()
916 .downcast_ref::<UInt32Array>()
917 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
918
919 let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
920 info!("Loaded {} vector values for {}", vector.len(), filename);
921 Ok(vector)
922 }
923
924 #[cfg(test)]
928 async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &PathBuf) -> StorageResult<()> {
929 use tokio::fs as tokio_fs;
930
931 info!("Saving dense matrix to file (async): {:?}", path);
932
933 if let Some(parent) = path.parent() {
935 tokio_fs::try_exists(parent).await.map_err(|e| {
936 StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
937 })?;
938 }
939
940 let tmp_storage = LanceStorage::new(
942 String::from(path.parent().unwrap().to_str().unwrap()),
943 String::from("tmp_storage"),
944 );
945
946 let extension = path
947 .extension()
948 .and_then(|e| e.to_str())
949 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
950
951 let (n_rows, n_cols) = data.shape();
952 info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
953
954 match extension {
955 "lance" => {
956 let batch = tmp_storage.to_dense_record_batch(data)?;
957 debug!(
958 "Created RecordBatch with {} rows for Lance",
959 batch.num_rows()
960 );
961
962 if batch.num_rows() != n_rows {
964 return Err(StorageError::Invalid(format!(
965 "RecordBatch has {} rows but matrix has {} rows",
966 batch.num_rows(),
967 n_rows
968 )));
969 }
970
971 tmp_storage.write_lance_batch_async(path, batch).await?;
972 info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
973 Ok(())
974 }
975 "parquet" => {
976 use parquet::arrow::ArrowWriter;
977 use parquet::file::properties::WriterProperties;
978 use std::fs::File;
979
980 let batch = tmp_storage.to_dense_record_batch(data)?;
982 debug!(
983 "Created RecordBatch with {} rows for Parquet",
984 batch.num_rows()
985 );
986
987 if batch.num_rows() != n_rows {
988 return Err(StorageError::Invalid(format!(
989 "RecordBatch has {} rows but matrix has {} rows",
990 batch.num_rows(),
991 n_rows
992 )));
993 }
994
995 let file = File::create(path).map_err(|e| {
996 StorageError::Io(format!("Failed to create parquet file: {}", e))
997 })?;
998
999 let props = WriterProperties::builder()
1000 .set_compression(parquet::basic::Compression::SNAPPY)
1001 .build();
1002
1003 let mut writer =
1004 ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
1005 StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
1006 })?;
1007
1008 writer
1009 .write(&batch)
1010 .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
1011
1012 writer
1013 .close()
1014 .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
1015
1016 info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
1017 Ok(())
1018 }
1019 _ => Err(StorageError::Invalid(format!(
1020 "Unsupported file format: {}. Only .lance and .parquet are supported",
1021 extension
1022 ))),
1023 }
1024 }
1025
1026 async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
1028 self.validate_initialized(md_path)?;
1029 let path = self.file_path("centroid_map");
1030 info!("Saving {} centroid map entries", map.len());
1031
1032 let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
1033 let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
1034 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
1035 .map_err(|e| StorageError::Lance(e.to_string()))?;
1036
1037 self.write_lance_batch_async(&path, batch).await?;
1038 info!("Centroid map saved successfully");
1039 Ok(())
1040 }
1041
1042 async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
1044 let path = self.file_path("centroid_map");
1045 info!("Loading centroid map from {:?}", path);
1046
1047 let batch = self.read_lance_first_batch_async(&path).await?;
1048 let arr = batch
1049 .column(0)
1050 .as_any()
1051 .downcast_ref::<UInt32Array>()
1052 .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
1053
1054 let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
1055 info!("Loaded {} centroid map entries", map.len());
1056 Ok(map)
1057 }
1058
1059 async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
1061 self.validate_initialized(md_path)?;
1062 let path = self.file_path("subcentroid_lambdas");
1063 info!("Saving {} subcentroid lambda values", lambdas.len());
1064
1065 let schema = Schema::new(vec![Field::new(
1066 "subcentroid_lambda",
1067 DataType::Float64,
1068 false,
1069 )]);
1070 let batch = RecordBatch::try_new(
1071 Arc::new(schema),
1072 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
1073 )
1074 .map_err(|e| StorageError::Lance(e.to_string()))?;
1075
1076 self.write_lance_batch_async(&path, batch).await?;
1077 info!("Subcentroid lambda values saved successfully");
1078 Ok(())
1079 }
1080
1081 async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
1083 let path = self.file_path("subcentroid_lambdas");
1084 info!("Loading subcentroid lambda values from {:?}", path);
1085
1086 let batch = self.read_lance_first_batch_async(&path).await?;
1087 let arr = batch
1088 .column(0)
1089 .as_any()
1090 .downcast_ref::<Float64Array>()
1091 .ok_or_else(|| {
1092 StorageError::Invalid("subcentroid_lambda column type mismatch".into())
1093 })?;
1094
1095 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1096 info!("Loaded {} subcentroid lambda values", lambdas.len());
1097 Ok(lambdas)
1098 }
1099
1100 async fn save_subcentroids(
1102 &self,
1103 subcentroids: &DenseMatrix<f64>,
1104 md_path: &Path,
1105 ) -> StorageResult<()> {
1106 self.validate_initialized(md_path)?;
1107 let path = self.file_path("sub_centroids");
1108 let (n_rows, n_cols) = subcentroids.shape();
1109 info!(
1110 "Saving subcentroids matrix {} x {} at {:?}",
1111 n_rows, n_cols, path
1112 );
1113
1114 let batch = self.to_dense_record_batch(subcentroids)?;
1115 self.write_lance_batch_async(&path, batch).await?;
1116 debug!("Subcentroids matrix saved successfully");
1117 Ok(())
1118 }
1119
1120 async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
1122 let path = self.file_path("sub_centroids");
1123 info!("Loading sub_centroids from {:?}", path);
1124
1125 let batch = self.read_lance_all_batches_async(&path).await?;
1126 let matrix = self.from_dense_record_batch(&batch)?;
1127
1128 let (n_rows, n_cols) = matrix.shape();
1130 let mut result = Vec::with_capacity(n_rows);
1131
1132 for row_idx in 0..n_rows {
1133 let row: Vec<f64> = (0..n_cols)
1134 .map(|col_idx| *matrix.get((row_idx, col_idx)))
1135 .collect();
1136 result.push(row);
1137 }
1138
1139 info!(
1140 "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
1141 n_rows, n_cols
1142 );
1143 Ok(result)
1144 }
1145
1146 async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
1148 self.validate_initialized(md_path)?;
1149 let path = self.file_path("item_norms");
1150 info!("Saving {} item norm values", item_norms.len());
1151
1152 let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
1153 let batch = RecordBatch::try_new(
1154 Arc::new(schema),
1155 vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
1156 )
1157 .map_err(|e| StorageError::Lance(e.to_string()))?;
1158
1159 self.write_lance_batch_async(&path, batch).await?;
1160 info!("Item norms saved successfully");
1161 Ok(())
1162 }
1163
1164 async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
1166 let path = self.file_path("item_norms");
1167 info!("Loading item norms from {:?}", path);
1168
1169 let batch = self.read_lance_first_batch_async(&path).await?;
1170 let arr = batch
1171 .column(0)
1172 .as_any()
1173 .downcast_ref::<Float64Array>()
1174 .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
1175
1176 let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1177 info!("Loaded {} item norm values", norms.len());
1178 Ok(norms)
1179 }
1180
1181 async fn save_cluster_assignments(
1182 &self,
1183 assignments: &[Option<usize>],
1184 md_path: &Path,
1185 ) -> StorageResult<()> {
1186 self.validate_initialized(md_path)?;
1187 let path = self.file_path("cluster_assignments");
1188 info!("Saving {} cluster assignments", assignments.len());
1189
1190 let values: Vec<i64> = assignments
1192 .iter()
1193 .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
1194 .collect();
1195
1196 let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
1197 let batch = RecordBatch::try_new(
1198 Arc::new(schema),
1199 vec![Arc::new(Int64Array::from(values)) as _],
1200 )
1201 .map_err(|e| StorageError::Lance(e.to_string()))?;
1202
1203 self.write_lance_batch_async(&path, batch).await?;
1204 info!("Cluster assignments saved successfully");
1205 Ok(())
1206 }
1207
1208 async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
1209 let path = self.file_path("cluster_assignments");
1210 info!("Loading cluster assignments from {:?}", path);
1211
1212 let batch = self.read_lance_first_batch_async(&path).await?;
1213 let arr = batch
1214 .column(0)
1215 .as_any()
1216 .downcast_ref::<Int64Array>()
1217 .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
1218
1219 let assignments: Vec<Option<usize>> = (0..arr.len())
1220 .map(|i| {
1221 let val = arr.value(i);
1222 if val < 0 { None } else { Some(val as usize) }
1223 })
1224 .collect();
1225
1226 info!("Loaded {} cluster assignments", assignments.len());
1227 Ok(assignments)
1228 }
1229}