1use std::fs;
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10
11use arrow::array::{Float64Array, Int64Array, UInt32Array};
12use arrow::datatypes::{DataType, Field, Schema};
13use arrow_array::{Array as ArrowArray, FixedSizeListArray, RecordBatch, RecordBatchIterator};
14use futures::StreamExt;
15use lance::dataset::{Dataset, WriteMode, WriteParams};
16use log::{debug, info, trace};
17use smartcore::linalg::basic::arrays::Array;
18use smartcore::linalg::basic::matrix::DenseMatrix;
19use sprs::{CsMat, TriMat};
20
21use crate::metadata::FileInfo;
22use crate::metadata::GeneMetadata;
23use crate::traits::StorageBackend;
24use crate::{StorageError, StorageResult};
25
26#[derive(Debug, Clone)]
32pub struct LanceStorage {
33 pub(crate) _base: String,
34 pub(crate) _name: String,
35}
36
37impl LanceStorage {
38 pub fn new(_base: String, _name: String) -> Self {
47 info!("Creating LanceStorage at base={}, name={}", _base, _name);
48 Self { _base, _name }
49 }
50
51 fn validate_initialized(&self, md_path: &Path) -> StorageResult<()> {
57 assert_eq!(self.metadata_path(), *md_path);
58 if !md_path.exists() {
59 return Err(StorageError::Invalid(format!(
60 "Storage not initialized: metadata file missing at {:?}. \
61Call save_metadata() or save_eigenmaps_all()/save_energymaps_all() first.",
62 md_path
63 )));
64 }
65 Ok(())
66 }
67
68 fn to_dense_record_batch(
77 &self,
78 matrix: &DenseMatrix<f64>,
79 ) -> Result<RecordBatch, StorageError> {
80 let (rows, cols) = (matrix.shape().0, matrix.shape().1);
81
82 debug!(
83 "Converting dense matrix to RecordBatch (vector format): {}x{}",
84 rows, cols
85 );
86
87 if rows == 0 || cols == 0 {
88 return Err(StorageError::Invalid(
89 "Cannot convert empty matrix to RecordBatch".to_string(),
90 ));
91 }
92
93 let mut values: Vec<f64> = Vec::with_capacity(rows * cols);
95 for r in 0..rows {
96 for c in 0..cols {
97 values.push(*matrix.get((r, c)));
98 }
99 }
100
101 let value_field = Field::new("item", DataType::Float64, false);
103 let list_field = Field::new(
104 "vector",
105 DataType::FixedSizeList(Arc::new(value_field), cols as i32),
106 false,
107 );
108
109 let schema = Schema::new(vec![list_field]);
110
111 let values_array = Float64Array::from(values);
113 let list_array = FixedSizeListArray::new(
114 Arc::new(Field::new("item", DataType::Float64, false)),
115 cols as i32,
116 Arc::new(values_array),
117 None, );
119
120 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)])
121 .map_err(|e| StorageError::Lance(e.to_string()))?;
122
123 trace!(
124 "RecordBatch created with {} rows (vectors of length {})",
125 batch.num_rows(),
126 cols
127 );
128
129 Ok(batch)
130 }
131
132 #[allow(clippy::wrong_self_convention)]
140 fn from_dense_record_batch(
141 &self,
142 batch: &RecordBatch,
143 ) -> Result<DenseMatrix<f64>, StorageError> {
144 use std::mem;
145
146 debug!("Reconstructing dense matrix from RecordBatch (vector format)");
147 debug!("Batch has {} columns", batch.num_columns());
148
149 if batch.num_columns() != 1 {
150 return Err(StorageError::Invalid(format!(
151 "Expected Lance row-major format with 1 FixedSizeList<Float64> column, but found {} columns. \
152 This parquet file appears to be in wide format (feature-per-column). \
153 Convert it first using: \
154 `python -c \"import pyarrow.parquet as pq; import pyarrow.compute as pc; \
155 tbl = pq.read_table('input.parquet'); \
156 import pyarrow as pa; \
157 vectors = pa.array([row.as_py() for row in tbl.to_pylist()], type=pa.list_(pa.float64(), len(tbl.column_names))); \
158 new_tbl = pa.table({{'vector': vectors}}); \
159 pq.write_table(new_tbl, 'output.parquet')\"` \
160 or use a Lance-native writer in your data pipeline.",
161 batch.num_columns()
162 )));
163 }
164
165 debug!("Extracting FixedSizeList column");
166 let column = batch.column(0);
167 let list_array = column
168 .as_any()
169 .downcast_ref::<FixedSizeListArray>()
170 .ok_or_else(|| {
171 StorageError::Invalid(format!(
172 "Column 0 is not FixedSizeList (found type: {:?}). \
173 Expected Lance row-major format with a single FixedSizeList<Float64> column.",
174 column.data_type()
175 ))
176 })?;
177
178 let rows = list_array.len();
179 let cols = list_array.value_length() as usize;
180
181 debug!("Matrix dimensions: {}x{}", rows, cols);
182
183 let total = rows
185 .checked_mul(cols)
186 .ok_or_else(|| StorageError::Invalid("Matrix size overflow (rows*cols)".to_string()))?;
187 let bytes = total
188 .checked_mul(mem::size_of::<f64>())
189 .ok_or_else(|| StorageError::Invalid("Byte size overflow".to_string()))?;
190
191 const MAX_BYTES: usize = 4usize * 1024 * 1024 * 1024; if bytes > MAX_BYTES {
193 return Err(StorageError::Invalid(format!(
194 "Dense load would allocate {} bytes for {}x{} matrix; exceeds 4GiB cap. \
195 Enable --reduce-dim or shard your input data.",
196 bytes, rows, cols
197 )));
198 }
199
200 let values_array = list_array
202 .values()
203 .as_any()
204 .downcast_ref::<Float64Array>()
205 .ok_or_else(|| {
206 StorageError::Invalid("FixedSizeList values are not Float64Array".to_string())
207 })?;
208
209 debug!("Converting row-major to column-major");
210 let mut data = vec![0.0f64; total];
211 for r in 0..rows {
212 for c in 0..cols {
213 let row_major_idx = r * cols + c;
214 let col_major_idx = c * rows + r;
215 data[col_major_idx] = values_array.value(row_major_idx);
216 }
217 }
218
219 debug!("Creating DenseMatrix");
220 DenseMatrix::new(rows, cols, data, true).map_err(|e| StorageError::Invalid(e.to_string()))
221 }
222
223 pub async fn spawn(base_path: String) -> Result<(LanceStorage, GeneMetadata), StorageError> {
225 let (exists, md_path) = <LanceStorage as StorageBackend>::exists(&base_path);
227 assert!(
228 exists && md_path.is_some(),
229 "Metadata does not exist in this base path"
230 );
231
232 let metadata = GeneMetadata::read(md_path.unwrap()).await?;
234
235 let storage = LanceStorage::new(base_path.clone(), metadata.name_id.clone());
237
238 Ok((storage, metadata))
239 }
240
241 fn to_sparse_record_batch(&self, m: &CsMat<f64>) -> StorageResult<RecordBatch> {
245 debug!(
246 "Converting sparse matrix to RecordBatch: {} x {}, nnz={}",
247 m.rows(),
248 m.cols(),
249 m.nnz()
250 );
251
252 let mut row_idx = Vec::with_capacity(m.nnz());
253 let mut col_idx = Vec::with_capacity(m.nnz());
254 let mut vals = Vec::with_capacity(m.nnz());
255
256 for (v, (r, c)) in m.iter() {
257 row_idx.push(r as u32);
258 col_idx.push(c as u32);
259 vals.push(*v);
260 }
261
262 let mut schema_metadata = std::collections::HashMap::new();
264 schema_metadata.insert("rows".to_string(), m.rows().to_string());
265 schema_metadata.insert("cols".to_string(), m.cols().to_string());
266 schema_metadata.insert("nnz".to_string(), m.nnz().to_string());
267
268 let schema = Schema::new(vec![
269 Field::new("row", DataType::UInt32, false),
270 Field::new("col", DataType::UInt32, false),
271 Field::new("value", DataType::Float64, false),
272 ])
273 .with_metadata(schema_metadata);
274
275 let batch = RecordBatch::try_new(
276 Arc::new(schema),
277 vec![
278 Arc::new(UInt32Array::from(row_idx)) as _,
279 Arc::new(UInt32Array::from(col_idx)) as _,
280 Arc::new(Float64Array::from(vals)) as _,
281 ],
282 )
283 .map_err(|e| StorageError::Lance(e.to_string()))?;
284
285 trace!(
286 "Sparse RecordBatch created with {} entries",
287 batch.num_rows()
288 );
289 Ok(batch)
290 }
291
292 #[allow(clippy::wrong_self_convention)]
297 fn from_sparse_record_batch(
298 &self,
299 batch: RecordBatch,
300 expected_rows: usize,
301 expected_cols: usize,
302 ) -> StorageResult<CsMat<f64>> {
303 use arrow::array::UInt32Array;
304
305 debug!("Reconstructing sparse matrix from RecordBatch");
306
307 let row_arr = batch
308 .column(0)
309 .as_any()
310 .downcast_ref::<UInt32Array>()
311 .ok_or_else(|| StorageError::Invalid("row column type mismatch".into()))?;
312 let col_arr = batch
313 .column(1)
314 .as_any()
315 .downcast_ref::<UInt32Array>()
316 .ok_or_else(|| StorageError::Invalid("col column type mismatch".into()))?;
317 let val_arr = batch
318 .column(2)
319 .as_any()
320 .downcast_ref::<Float64Array>()
321 .ok_or_else(|| StorageError::Invalid("value column type mismatch".into()))?;
322
323 let n = row_arr.len();
324 if n == 0 {
325 debug!(
326 "Empty RecordBatch, returning {}x{} sparse matrix",
327 expected_rows, expected_cols
328 );
329 return Ok(CsMat::zero((expected_rows, expected_cols)));
330 }
331
332 let schema = batch.schema();
334 let schema_metadata = schema.metadata();
335 if let (Some(rows_str), Some(cols_str)) =
336 (schema_metadata.get("rows"), schema_metadata.get("cols"))
337 {
338 let schema_rows = rows_str.parse::<usize>().ok();
339 let schema_cols = cols_str.parse::<usize>().ok();
340 if schema_rows != Some(expected_rows) || schema_cols != Some(expected_cols) {
341 panic!(
342 "Schema metadata dimensions ({:?}x{:?}) don't match storage metadata ({}x{})",
343 schema_rows, schema_cols, expected_rows, expected_cols
344 );
345 } else {
346 debug!(
347 "Schema metadata matches storage metadata: {}x{}",
348 expected_rows, expected_cols
349 );
350 }
351 }
352
353 let rows = expected_rows;
354 let cols = expected_cols;
355 debug!(
356 "Reconstructing {}x{} sparse matrix from {} entries",
357 rows, cols, n
358 );
359
360 let mut trimat = TriMat::new((rows, cols));
361 for i in 0..n {
362 let r = row_arr.value(i) as usize;
363 let c = col_arr.value(i) as usize;
364 let v = val_arr.value(i);
365
366 if r >= rows || c >= cols {
367 return Err(StorageError::Invalid(format!(
368 "Index out of bounds: ({}, {}) in {}x{} matrix",
369 r, c, rows, cols
370 )));
371 }
372 trimat.add_triplet(r, c, v);
373 }
374
375 let result = trimat.to_csr();
376 if result.rows() != rows || result.cols() != cols {
377 return Err(StorageError::Invalid(format!(
378 "Dimension mismatch after reconstruction: expected {}x{}, got {}x{}",
379 rows,
380 cols,
381 result.rows(),
382 result.cols()
383 )));
384 }
385
386 Ok(result)
387 }
388
389 async fn write_lance_batch_async(&self, path: &Path, batch: RecordBatch) -> StorageResult<()> {
391 let uri = Self::path_to_uri(path);
392 info!("Writing Lance dataset to {}", uri);
393
394 let schema = batch.schema();
395 let batches = vec![batch];
396 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
397
398 let params = WriteParams {
399 mode: WriteMode::Create,
400 ..WriteParams::default()
401 };
402
403 Dataset::write(reader, &uri, Some(params))
404 .await
405 .map_err(|e| StorageError::Lance(e.to_string()))?;
406
407 info!("Successfully wrote Lance dataset to {}", uri);
408 Ok(())
409 }
410
411 async fn read_lance_all_batches_async(&self, path: &Path) -> StorageResult<RecordBatch> {
413 let uri = Self::path_to_uri(path);
414 info!("Reading Lance dataset from {}", uri);
415
416 let dataset = Dataset::open(&uri)
417 .await
418 .map_err(|e| StorageError::Lance(e.to_string()))?;
419 let scanner = dataset.scan();
420 let mut stream = scanner
421 .try_into_stream()
422 .await
423 .map_err(|e| StorageError::Lance(e.to_string()))?;
424
425 let mut batches = Vec::new();
426 while let Some(batch_result) = stream.next().await {
427 let batch = batch_result.map_err(|e| StorageError::Lance(e.to_string()))?;
428 batches.push(batch);
429 }
430
431 if batches.is_empty() {
432 return Err(StorageError::Invalid("Empty Lance dataset".into()));
433 }
434
435 let schema = batches[0].schema();
436 let combined = arrow::compute::concat_batches(&schema, &batches)
437 .map_err(|e| StorageError::Lance(format!("Failed to concatenate batches: {}", e)))?;
438
439 debug!(
440 "Combined Lance batch for {:?} has {} rows",
441 path,
442 combined.num_rows()
443 );
444 Ok(combined)
445 }
446
447 async fn read_lance_first_batch_async(&self, path: &Path) -> StorageResult<RecordBatch> {
449 let uri = Self::path_to_uri(path);
450 info!("Reading first batch from Lance dataset {}", uri);
451
452 let dataset = Dataset::open(&uri)
453 .await
454 .map_err(|e| StorageError::Lance(e.to_string()))?;
455 let scanner = dataset.scan();
456 let mut stream = scanner
457 .try_into_stream()
458 .await
459 .map_err(|e| StorageError::Lance(e.to_string()))?;
460
461 let batch = stream
462 .next()
463 .await
464 .ok_or_else(|| StorageError::Lance("empty Lance dataset".to_string()))?
465 .map_err(|e| StorageError::Lance(e.to_string()))?;
466
467 debug!(
468 "Read first RecordBatch for path {:?} with {} rows",
469 path,
470 batch.num_rows()
471 );
472 Ok(batch)
473 }
474}
475
476impl StorageBackend for LanceStorage {
477 fn get_base(&self) -> String {
478 self._base.clone()
479 }
480
481 fn get_name(&self) -> String {
482 self._name.clone()
483 }
484
485 fn base_path(&self) -> PathBuf {
486 PathBuf::from(&self._base)
487 }
488
489 fn metadata_path(&self) -> PathBuf {
490 self.base_path()
491 .join(format!("{}_metadata.json", self._name))
492 }
493
494 fn basepath_to_uri(&self) -> String {
496 Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
497 }
498
499 fn path_to_uri(path: &Path) -> String {
501 path.canonicalize()
502 .unwrap_or_else(|_| {
503 if path.is_absolute() {
504 path.to_path_buf()
505 } else if path.is_relative() {
506 std::env::current_dir()
507 .unwrap_or_else(|_| PathBuf::from("/"))
508 .join(path)
509 } else {
510 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(path)
511 }
512 })
513 .to_string_lossy()
514 .to_string()
515 }
516
517 async fn save_dense(
529 &self,
530 key: &str,
531 matrix: &DenseMatrix<f64>,
532 md_path: &Path,
533 ) -> StorageResult<()> {
534 self.validate_initialized(md_path)?;
535 let path = self.file_path(key);
536 let (n_rows, n_cols) = matrix.shape();
537
538 info!(
539 "Saving dense {} matrix: {} x {} at {:?}",
540 key, n_rows, n_cols, path
541 );
542
543 let batch = self.to_dense_record_batch(matrix)?;
545
546 if batch.num_rows() != n_rows {
548 return Err(StorageError::Invalid(format!(
549 "RecordBatch has {} rows but matrix has {} rows",
550 batch.num_rows(),
551 n_rows
552 )));
553 }
554
555 self.write_lance_batch_async(&path, batch).await?;
557
558 info!("Dense {} matrix saved successfully", key);
559 Ok(())
560 }
561
562 async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
572 let path = self.file_path(key);
573 info!("Loading dense {} matrix from {:?}", key, path);
574
575 let batch = self.read_lance_all_batches_async(&path).await?;
577
578 let matrix = self.from_dense_record_batch(&batch)?;
580
581 let (n_rows, n_cols) = matrix.shape();
582 info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
583
584 Ok(matrix)
585 }
586
587 async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
591 info!("Loading dense matrix from file (async): {:?}", path);
592
593 if !path.exists() {
594 return Err(StorageError::Invalid(format!(
595 "Dense file does not exist: {:?}",
596 path
597 )));
598 }
599
600 let extension = path
601 .extension()
602 .and_then(|e| e.to_str())
603 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
604
605 match extension {
606 "lance" => {
607 let parent = path
610 .parent()
611 .ok_or_else(|| {
612 StorageError::Invalid(format!("Path has no parent: {:?}", path))
613 })?
614 .to_str()
615 .ok_or_else(|| {
616 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
617 })?
618 .to_string();
619
620 let tmp_storage =
621 crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
622
623 let batch = tmp_storage.read_lance_all_batches_async(path).await?;
625 let matrix = tmp_storage.from_dense_record_batch(&batch)?;
626 info!(
627 "Loaded dense matrix from Lance: {} x {}",
628 matrix.shape().0,
629 matrix.shape().1
630 );
631 Ok(matrix)
632 }
633 "parquet" => {
634 use arrow::datatypes::DataType;
635 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
636 use std::fs::File;
637
638 let file = File::open(path)
640 .map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
641
642 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
643 StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
644 })?;
645 let mut reader = builder.build().map_err(|e| {
646 StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
647 })?;
648
649 let mut batches = Vec::new();
650 #[allow(clippy::while_let_on_iterator)]
651 while let Some(batch) = reader.next() {
652 let batch = batch.map_err(|e| {
653 StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
654 })?;
655 batches.push(batch);
656 }
657
658 if batches.is_empty() {
659 return Err(StorageError::Invalid(format!(
660 "Empty parquet dataset at {:?}",
661 path
662 )));
663 }
664
665 let schema = batches[0].schema();
666 let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
667 StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
668 })?;
669
670 let fields = schema.fields();
672 let is_vector = fields.len() == 1
673 && matches!(
674 fields[0].data_type(),
675 DataType::FixedSizeList(inner, _)
676 if matches!(inner.data_type(), DataType::Float64)
677 );
678
679 let is_wide_col = !is_vector
680 && !fields.is_empty()
681 && fields
682 .iter()
683 .all(|f| matches!(f.data_type(), DataType::Float64))
684 && fields.iter().any(|f| f.name().starts_with("col_"));
685
686 let matrix = if is_vector {
688 let parent = path
691 .parent()
692 .ok_or_else(|| {
693 StorageError::Invalid(format!("Path has no parent: {:?}", path))
694 })?
695 .to_str()
696 .ok_or_else(|| {
697 StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
698 })?
699 .to_string();
700
701 let tmp_storage =
702 crate::lance::LanceStorage::new(parent, String::from("tmp_storage"));
703 tmp_storage.from_dense_record_batch(&combined)?
704 } else if is_wide_col {
705 let n_rows = combined.num_rows();
707 let n_cols = combined.num_columns();
708 if n_rows == 0 || n_cols == 0 {
709 return Err(StorageError::Invalid(format!(
710 "Cannot load empty wide-column parquet at {:?}",
711 path
712 )));
713 }
714
715 let mut data = Vec::with_capacity(n_rows * n_cols);
716 for col_idx in 0..n_cols {
717 let col = combined.column(col_idx);
718 let arr = col
719 .as_any()
720 .downcast_ref::<arrow_array::Float64Array>()
721 .ok_or_else(|| {
722 StorageError::Invalid(format!(
723 "Wide-column parquet expects Float64, got {:?} in column {}",
724 col.data_type(),
725 col_idx
726 ))
727 })?;
728 for row_idx in 0..n_rows {
730 data.push(arr.value(row_idx));
731 }
732 }
733
734 DenseMatrix::new(n_rows, n_cols, data, true)
735 .map_err(|e| StorageError::Invalid(e.to_string()))?
736 } else {
737 return Err(StorageError::Invalid(format!(
738 "Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
739 or wide Float64 columns named col_*",
740 path
741 )));
742 };
743
744 info!(
745 "Loaded dense matrix from Parquet: {} x {}",
746 matrix.shape().0,
747 matrix.shape().1
748 );
749
750 Ok(matrix)
751 }
752 _ => Err(StorageError::Invalid(format!(
753 "Unsupported file format: {}. Only .lance and .parquet are supported",
754 extension
755 ))),
756 }
757 }
758
759 fn file_path(&self, key: &str) -> PathBuf {
760 self.base_path()
761 .join(format!("{}_{}.lance", self._name, key))
762 }
763
764 async fn save_sparse(
769 &self,
770 key: &str,
771 matrix: &CsMat<f64>,
772 md_path: &Path,
773 ) -> StorageResult<()> {
774 self.validate_initialized(md_path)?;
775 let path = self.file_path(key);
776 info!(
777 "Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
778 key,
779 matrix.rows(),
780 matrix.cols(),
781 matrix.nnz(),
782 path
783 );
784
785 let mut metadata = self.load_metadata().await?;
786 let filetype = FileInfo::which_filetype(key);
787 metadata.files.insert(
788 key.to_string(),
789 FileInfo {
790 filename: format!("{}_{}.lance", self.get_name(), key),
791 filetype: filetype.to_string(),
792 storage_format: FileInfo::which_format(&filetype),
793 rows: matrix.rows(),
794 cols: matrix.cols(),
795 nnz: Some(matrix.nnz()),
796 size_bytes: None,
797 },
798 );
799 self.save_metadata(&metadata).await?;
800
801 let batch = self.to_sparse_record_batch(matrix)?;
802 self.write_lance_batch_async(&path, batch).await?;
803 info!("Sparse matrix {} saved successfully", filetype);
804 Ok(())
805 }
806
807 async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
808 info!("Loading sparse {} matrix", key);
809
810 let metadata = self.load_metadata().await?;
811 let filetype = FileInfo::which_filetype(key);
812 let file_info = metadata
813 .files
814 .get(key)
815 .ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
816
817 let expected_rows = file_info.rows;
818 let expected_cols = file_info.cols;
819 debug!(
820 "Expected dimensions from storage metadata: {} x {}",
821 expected_rows, expected_cols
822 );
823
824 let path = self.file_path(key);
825 let batch = self.read_lance_first_batch_async(&path).await?;
826 let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
827 info!(
828 "Sparse {} matrix loaded: {} x {}, nnz={}",
829 filetype,
830 matrix.rows(),
831 matrix.cols(),
832 matrix.nnz()
833 );
834 Ok(matrix)
835 }
836
837 async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
838 self.validate_initialized(md_path)?;
839 let path = self.file_path("lambdas");
840 info!("Saving {} lambda values", lambdas.len());
841
842 let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
843 let batch = RecordBatch::try_new(
844 Arc::new(schema),
845 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
846 )
847 .map_err(|e| StorageError::Lance(e.to_string()))?;
848
849 self.write_lance_batch_async(&path, batch).await?;
850 info!("Lambda values saved successfully");
851 Ok(())
852 }
853
854 async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
855 let path = self.file_path("lambdas");
856 info!("Loading lambda values from {:?}", path);
857
858 let batch = self.read_lance_first_batch_async(&path).await?;
859 let arr = batch
860 .column(0)
861 .as_any()
862 .downcast_ref::<Float64Array>()
863 .ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
864
865 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
866 info!("Loaded {} lambda values", lambdas.len());
867 Ok(lambdas)
868 }
869
870 async fn save_metadata(&self, metadata: &GeneMetadata) -> StorageResult<PathBuf> {
871 let path = self.metadata_path();
872 info!("Saving metadata to {:?}", path);
873 fs::create_dir_all(self.base_path()).map_err(|e| StorageError::Io(e.to_string()))?;
874 let s = serde_json::to_string_pretty(metadata).map_err(StorageError::Serde)?;
875 fs::write(&path, s).map_err(|e| StorageError::Io(e.to_string()))?;
876 info!("Metadata saved successfully");
877 Ok(path)
878 }
879
880 async fn load_metadata(&self) -> StorageResult<GeneMetadata> {
881 let filename = self.metadata_path();
882 info!("Loading metadata from {:?}", filename);
883 let s = fs::read_to_string(filename).map_err(|e| StorageError::Io(e.to_string()))?;
884 let md: GeneMetadata = serde_json::from_str(&s).map_err(StorageError::Serde)?;
885 info!("Metadata loaded successfully");
886 Ok(md)
887 }
888
889 async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
890 self.validate_initialized(md_path)?;
891 let path = self.file_path(key);
892 info!("Saving {} values for vector {}", vector.len(), key);
893
894 let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
895 let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
896 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
897 .map_err(|e| StorageError::Lance(e.to_string()))?;
898
899 self.write_lance_batch_async(&path, batch).await?;
900 info!("Index {} saved successfully", key);
901 Ok(())
902 }
903
904 async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
905 self.validate_initialized(md_path)?;
906 let path = self.file_path(key);
907 info!("Saving {} values for index {}", vector.len(), key);
908
909 let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
910 let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
911 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
912 .map_err(|e| StorageError::Lance(e.to_string()))?;
913
914 self.write_lance_batch_async(&path, batch).await?;
915 info!("Index {} saved successfully", key);
916 Ok(())
917 }
918
919 async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
920 let path = self.file_path(filename);
921 info!("Loading vector {} from {:?}", filename, path);
922
923 let batch = self.read_lance_first_batch_async(&path).await?;
924 let arr = batch
925 .column(0)
926 .as_any()
927 .downcast_ref::<Float64Array>()
928 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
929
930 let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
931 info!("Loaded {} vector values for {}", vector.len(), filename);
932 Ok(vector)
933 }
934
935 async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
936 let path = self.file_path(filename);
937 info!("Loading vector {} from {:?}", filename, path);
938
939 let batch = self.read_lance_first_batch_async(&path).await?;
940 let arr = batch
941 .column(0)
942 .as_any()
943 .downcast_ref::<UInt32Array>()
944 .ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
945
946 let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
947 info!("Loaded {} vector values for {}", vector.len(), filename);
948 Ok(vector)
949 }
950
951 #[cfg(test)]
955 async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &PathBuf) -> StorageResult<()> {
956 use tokio::fs as tokio_fs;
957
958 info!("Saving dense matrix to file (async): {:?}", path);
959
960 if let Some(parent) = path.parent() {
962 tokio_fs::try_exists(parent).await.map_err(|e| {
963 StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
964 })?;
965 }
966
967 let tmp_storage = LanceStorage::new(
969 String::from(path.parent().unwrap().to_str().unwrap()),
970 String::from("tmp_storage"),
971 );
972
973 let extension = path
974 .extension()
975 .and_then(|e| e.to_str())
976 .ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
977
978 let (n_rows, n_cols) = data.shape();
979 info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
980
981 match extension {
982 "lance" => {
983 let batch = tmp_storage.to_dense_record_batch(data)?;
984 debug!(
985 "Created RecordBatch with {} rows for Lance",
986 batch.num_rows()
987 );
988
989 if batch.num_rows() != n_rows {
991 return Err(StorageError::Invalid(format!(
992 "RecordBatch has {} rows but matrix has {} rows",
993 batch.num_rows(),
994 n_rows
995 )));
996 }
997
998 tmp_storage.write_lance_batch_async(path, batch).await?;
999 info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
1000 Ok(())
1001 }
1002 "parquet" => {
1003 use parquet::arrow::ArrowWriter;
1004 use parquet::file::properties::WriterProperties;
1005 use std::fs::File;
1006
1007 let batch = tmp_storage.to_dense_record_batch(data)?;
1009 debug!(
1010 "Created RecordBatch with {} rows for Parquet",
1011 batch.num_rows()
1012 );
1013
1014 if batch.num_rows() != n_rows {
1015 return Err(StorageError::Invalid(format!(
1016 "RecordBatch has {} rows but matrix has {} rows",
1017 batch.num_rows(),
1018 n_rows
1019 )));
1020 }
1021
1022 let file = File::create(path).map_err(|e| {
1023 StorageError::Io(format!("Failed to create parquet file: {}", e))
1024 })?;
1025
1026 let props = WriterProperties::builder()
1027 .set_compression(parquet::basic::Compression::SNAPPY)
1028 .build();
1029
1030 let mut writer =
1031 ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
1032 StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
1033 })?;
1034
1035 writer
1036 .write(&batch)
1037 .map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
1038
1039 writer
1040 .close()
1041 .map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
1042
1043 info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
1044 Ok(())
1045 }
1046 _ => Err(StorageError::Invalid(format!(
1047 "Unsupported file format: {}. Only .lance and .parquet are supported",
1048 extension
1049 ))),
1050 }
1051 }
1052
1053 async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
1055 self.validate_initialized(md_path)?;
1056 let path = self.file_path("centroid_map");
1057 info!("Saving {} centroid map entries", map.len());
1058
1059 let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
1060 let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
1061 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
1062 .map_err(|e| StorageError::Lance(e.to_string()))?;
1063
1064 self.write_lance_batch_async(&path, batch).await?;
1065 info!("Centroid map saved successfully");
1066 Ok(())
1067 }
1068
1069 async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
1071 let path = self.file_path("centroid_map");
1072 info!("Loading centroid map from {:?}", path);
1073
1074 let batch = self.read_lance_first_batch_async(&path).await?;
1075 let arr = batch
1076 .column(0)
1077 .as_any()
1078 .downcast_ref::<UInt32Array>()
1079 .ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
1080
1081 let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
1082 info!("Loaded {} centroid map entries", map.len());
1083 Ok(map)
1084 }
1085
1086 async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
1088 self.validate_initialized(md_path)?;
1089 let path = self.file_path("subcentroid_lambdas");
1090 info!("Saving {} subcentroid lambda values", lambdas.len());
1091
1092 let schema = Schema::new(vec![Field::new(
1093 "subcentroid_lambda",
1094 DataType::Float64,
1095 false,
1096 )]);
1097 let batch = RecordBatch::try_new(
1098 Arc::new(schema),
1099 vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
1100 )
1101 .map_err(|e| StorageError::Lance(e.to_string()))?;
1102
1103 self.write_lance_batch_async(&path, batch).await?;
1104 info!("Subcentroid lambda values saved successfully");
1105 Ok(())
1106 }
1107
1108 async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
1110 let path = self.file_path("subcentroid_lambdas");
1111 info!("Loading subcentroid lambda values from {:?}", path);
1112
1113 let batch = self.read_lance_first_batch_async(&path).await?;
1114 let arr = batch
1115 .column(0)
1116 .as_any()
1117 .downcast_ref::<Float64Array>()
1118 .ok_or_else(|| {
1119 StorageError::Invalid("subcentroid_lambda column type mismatch".into())
1120 })?;
1121
1122 let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1123 info!("Loaded {} subcentroid lambda values", lambdas.len());
1124 Ok(lambdas)
1125 }
1126
1127 async fn save_subcentroids(
1129 &self,
1130 subcentroids: &DenseMatrix<f64>,
1131 md_path: &Path,
1132 ) -> StorageResult<()> {
1133 self.validate_initialized(md_path)?;
1134 let path = self.file_path("sub_centroids");
1135 let (n_rows, n_cols) = subcentroids.shape();
1136 info!(
1137 "Saving subcentroids matrix {} x {} at {:?}",
1138 n_rows, n_cols, path
1139 );
1140
1141 let batch = self.to_dense_record_batch(subcentroids)?;
1142 self.write_lance_batch_async(&path, batch).await?;
1143 debug!("Subcentroids matrix saved successfully");
1144 Ok(())
1145 }
1146
1147 async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
1149 let path = self.file_path("sub_centroids");
1150 info!("Loading sub_centroids from {:?}", path);
1151
1152 let batch = self.read_lance_all_batches_async(&path).await?;
1153 let matrix = self.from_dense_record_batch(&batch)?;
1154
1155 let (n_rows, n_cols) = matrix.shape();
1157 let mut result = Vec::with_capacity(n_rows);
1158
1159 for row_idx in 0..n_rows {
1160 let row: Vec<f64> = (0..n_cols)
1161 .map(|col_idx| *matrix.get((row_idx, col_idx)))
1162 .collect();
1163 result.push(row);
1164 }
1165
1166 info!(
1167 "Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
1168 n_rows, n_cols
1169 );
1170 Ok(result)
1171 }
1172
1173 async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
1175 self.validate_initialized(md_path)?;
1176 let path = self.file_path("item_norms");
1177 info!("Saving {} item norm values", item_norms.len());
1178
1179 let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
1180 let batch = RecordBatch::try_new(
1181 Arc::new(schema),
1182 vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
1183 )
1184 .map_err(|e| StorageError::Lance(e.to_string()))?;
1185
1186 self.write_lance_batch_async(&path, batch).await?;
1187 info!("Item norms saved successfully");
1188 Ok(())
1189 }
1190
1191 async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
1193 let path = self.file_path("item_norms");
1194 info!("Loading item norms from {:?}", path);
1195
1196 let batch = self.read_lance_first_batch_async(&path).await?;
1197 let arr = batch
1198 .column(0)
1199 .as_any()
1200 .downcast_ref::<Float64Array>()
1201 .ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
1202
1203 let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
1204 info!("Loaded {} item norm values", norms.len());
1205 Ok(norms)
1206 }
1207
1208 async fn save_cluster_assignments(
1209 &self,
1210 assignments: &[Option<usize>],
1211 md_path: &Path,
1212 ) -> StorageResult<()> {
1213 self.validate_initialized(md_path)?;
1214 let path = self.file_path("cluster_assignments");
1215 info!("Saving {} cluster assignments", assignments.len());
1216
1217 let values: Vec<i64> = assignments
1219 .iter()
1220 .map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
1221 .collect();
1222
1223 let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
1224 let batch = RecordBatch::try_new(
1225 Arc::new(schema),
1226 vec![Arc::new(Int64Array::from(values)) as _],
1227 )
1228 .map_err(|e| StorageError::Lance(e.to_string()))?;
1229
1230 self.write_lance_batch_async(&path, batch).await?;
1231 info!("Cluster assignments saved successfully");
1232 Ok(())
1233 }
1234
1235 async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
1236 let path = self.file_path("cluster_assignments");
1237 info!("Loading cluster assignments from {:?}", path);
1238
1239 let batch = self.read_lance_first_batch_async(&path).await?;
1240 let arr = batch
1241 .column(0)
1242 .as_any()
1243 .downcast_ref::<Int64Array>()
1244 .ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
1245
1246 let assignments: Vec<Option<usize>> = (0..arr.len())
1247 .map(|i| {
1248 let val = arr.value(i);
1249 if val < 0 { None } else { Some(val as usize) }
1250 })
1251 .collect();
1252
1253 info!("Loaded {} cluster assignments", assignments.len());
1254 Ok(assignments)
1255 }
1256}