use std::path::{Path, PathBuf};
use std::sync::Arc;
use arrow::array::{Float64Array, Int64Array, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::{Array as ArrowArray, RecordBatch};
use log::{debug, info};
use smartcore::linalg::basic::arrays::Array;
use smartcore::linalg::basic::matrix::DenseMatrix;
use sprs::CsMat;
use crate::metadata::FileInfo;
use crate::metadata::GeneMetadata;
use crate::traits::backend::StorageBackend;
use crate::traits::lance::LanceStorage;
use crate::traits::metadata::Metadata;
use crate::{StorageError, StorageResult};
#[derive(Debug, Clone)]
pub struct LanceStorageGraph {
pub(crate) _base: String,
pub(crate) _name: String,
}
impl LanceStorageGraph {
pub fn new(_base: String, _name: String) -> Self {
info!("Creating LanceStorage at base={}, name={}", _base, _name);
Self { _base, _name }
}
pub async fn spawn(base_path: String) -> Result<(Self, GeneMetadata), StorageError> {
let (exists, md_path) = Self::exists(&base_path);
if !exists || md_path.is_none() {
return Err(StorageError::Invalid(format!(
"Metadata does not exist in base path: {}",
base_path
)));
}
let metadata = GeneMetadata::read(md_path.unwrap()).await?;
let storage = Self::new(base_path.clone(), metadata.name_id.clone());
Ok((storage, metadata))
}
}
impl LanceStorage for LanceStorageGraph {}
impl StorageBackend for LanceStorageGraph {
fn get_base(&self) -> String {
self._base.clone()
}
fn get_name(&self) -> String {
self._name.clone()
}
fn base_path(&self) -> PathBuf {
PathBuf::from(&self._base)
}
fn metadata_path(&self) -> PathBuf {
self.base_path()
.join(format!("{}_metadata.json", self._name))
}
fn basepath_to_uri(&self) -> String {
Self::path_to_uri(PathBuf::from(self._base.clone()).as_path())
}
async fn save_dense(
&self,
key: &str,
matrix: &DenseMatrix<f64>,
md_path: &Path,
) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let path = self.file_path(key);
let (n_rows, n_cols) = matrix.shape();
info!(
"Saving dense {} matrix: {} x {} at {:?}",
key, n_rows, n_cols, path
);
let batch = self.to_dense_record_batch(matrix)?;
if batch.num_rows() != n_rows {
return Err(StorageError::Invalid(format!(
"RecordBatch has {} rows but matrix has {} rows",
batch.num_rows(),
n_rows
)));
}
{
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
let mut md = self.load_metadata().await?;
md = md.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"dense",
matrix.shape(),
None,
None,
),
);
self.save_metadata(&md).await?;
info!("Dense {} matrix saved successfully", key);
}
Ok(())
}
async fn load_dense(&self, key: &str) -> StorageResult<DenseMatrix<f64>> {
let path = self.file_path(key);
info!("Loading dense {} matrix from {:?}", key, path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_all_batches_async(uri).await?;
let matrix = self.from_dense_record_batch(&batch)?;
let (n_rows, n_cols) = matrix.shape();
info!("Loaded dense {} matrix: {} x {}", key, n_rows, n_cols);
Ok(matrix)
}
async fn load_dense_from_file(&self, path: &Path) -> StorageResult<DenseMatrix<f64>> {
info!("Loading dense matrix from file (async): {:?}", path);
if !path.exists() {
return Err(StorageError::Invalid(format!(
"Dense file does not exist: {:?}",
path
)));
}
let extension = path
.extension()
.and_then(|e| e.to_str())
.ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
match extension {
"lance" => {
let parent = path
.parent()
.ok_or_else(|| {
StorageError::Invalid(format!("Path has no parent: {:?}", path))
})?
.to_str()
.ok_or_else(|| {
StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
})?
.to_string();
let tmp_storage = Self::new(parent, String::from("tmp_storage"));
let uri = Self::path_to_uri(path);
let batch = tmp_storage.read_lance_all_batches_async(uri).await?;
let matrix = tmp_storage.from_dense_record_batch(&batch)?;
info!(
"Loaded dense matrix from Lance: {} x {}",
matrix.shape().0,
matrix.shape().1
);
Ok(matrix)
}
"parquet" => {
use arrow::datatypes::DataType;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use std::fs::File;
let file = File::open(path)
.map_err(|e| StorageError::Io(format!("Failed to open parquet file: {}", e)))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
StorageError::Parquet(format!("Failed to create parquet reader: {}", e))
})?;
let mut reader = builder.build().map_err(|e| {
StorageError::Parquet(format!("Failed to build parquet reader: {}", e))
})?;
let mut batches = Vec::new();
#[allow(clippy::while_let_on_iterator)]
while let Some(batch) = reader.next() {
let batch = batch.map_err(|e| {
StorageError::Parquet(format!("Failed to read parquet batch: {}", e))
})?;
batches.push(batch);
}
if batches.is_empty() {
return Err(StorageError::Invalid(format!(
"Empty parquet dataset at {:?}",
path
)));
}
let schema = batches[0].schema();
let combined = arrow::compute::concat_batches(&schema, &batches).map_err(|e| {
StorageError::Parquet(format!("Failed to concatenate parquet batches: {}", e))
})?;
let fields = schema.fields();
let is_vector = fields.len() == 1
&& matches!(
fields[0].data_type(),
DataType::FixedSizeList(inner, _)
if matches!(inner.data_type(), DataType::Float64)
);
let is_wide_col = !is_vector
&& !fields.is_empty()
&& fields
.iter()
.all(|f| matches!(f.data_type(), DataType::Float64))
&& fields.iter().any(|f| f.name().starts_with("col_"));
let matrix = if is_vector {
let parent = path
.parent()
.ok_or_else(|| {
StorageError::Invalid(format!("Path has no parent: {:?}", path))
})?
.to_str()
.ok_or_else(|| {
StorageError::Invalid(format!("Non-UTF8 parent path for {:?}", path))
})?
.to_string();
let tmp_storage = Self::new(parent, String::from("tmp_storage"));
tmp_storage.from_dense_record_batch(&combined)?
} else if is_wide_col {
let n_rows = combined.num_rows();
let n_cols = combined.num_columns();
if n_rows == 0 || n_cols == 0 {
return Err(StorageError::Invalid(format!(
"Cannot load empty wide-column parquet at {:?}",
path
)));
}
let mut data = Vec::with_capacity(n_rows * n_cols);
for col_idx in 0..n_cols {
let col = combined.column(col_idx);
let arr = col
.as_any()
.downcast_ref::<arrow_array::Float64Array>()
.ok_or_else(|| {
StorageError::Invalid(format!(
"Wide-column parquet expects Float64, got {:?} in column {}",
col.data_type(),
col_idx
))
})?;
for row_idx in 0..n_rows {
data.push(arr.value(row_idx));
}
}
DenseMatrix::new(n_rows, n_cols, data, true)
.map_err(|e| StorageError::Invalid(e.to_string()))?
} else {
return Err(StorageError::Invalid(format!(
"Unsupported Parquet schema at {:?}: expected FixedSizeList<Float64> \
or wide Float64 columns named col_*",
path
)));
};
info!(
"Loaded dense matrix from Parquet: {} x {}",
matrix.shape().0,
matrix.shape().1
);
Ok(matrix)
}
_ => Err(StorageError::Invalid(format!(
"Unsupported file format: {}. Only .lance and .parquet are supported",
extension
))),
}
}
fn file_path(&self, key: &str) -> PathBuf {
self.base_path()
.join(format!("{}_{}.lance", self._name, key))
}
async fn save_sparse(
&self,
key: &str,
matrix: &CsMat<f64>,
md_path: &Path,
) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let path = self.file_path(key);
info!(
"Saving sparse {} matrix: {} x {}, nnz={} at {:?}",
key,
matrix.rows(),
matrix.cols(),
matrix.nnz(),
path
);
let filetype = FileInfo::which_filetype(key);
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
filetype.as_str(),
(matrix.rows(), matrix.cols()),
Some(matrix.nnz()),
None,
),
);
self.save_metadata(&metadata).await?;
let batch = self.to_sparse_record_batch(matrix)?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Sparse matrix {} saved successfully", filetype);
Ok(())
}
async fn load_sparse(&self, key: &str) -> StorageResult<CsMat<f64>> {
info!("Loading sparse {} matrix", key);
let metadata = self.load_metadata().await?;
let filetype = FileInfo::which_filetype(key);
let file_info = metadata
.files
.get(key)
.ok_or_else(|| StorageError::Invalid(format!("{key} not found in metadata")))?;
let expected_rows = file_info.rows;
let expected_cols = file_info.cols;
debug!(
"Expected dimensions from storage metadata: {} x {}",
expected_rows, expected_cols
);
let path = self.file_path(key);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let matrix = self.from_sparse_record_batch(batch, expected_rows, expected_cols)?;
info!(
"Sparse {} matrix loaded: {} x {}, nnz={}",
filetype,
matrix.rows(),
matrix.cols(),
matrix.nnz()
);
Ok(matrix)
}
async fn save_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let key = "lambdas";
let path = self.file_path("lambdas");
info!("Saving {} lambda values", lambdas.len());
let schema = Schema::new(vec![Field::new("lambda", DataType::Float64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
)
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(lambdas.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Lambda values saved successfully");
Ok(())
}
async fn load_lambdas(&self) -> StorageResult<Vec<f64>> {
let path = self.file_path("lambdas");
info!("Loading lambda values from {:?}", path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| StorageError::Invalid("lambda column type mismatch".into()))?;
let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
info!("Loaded {} lambda values", lambdas.len());
Ok(lambdas)
}
async fn save_vector(&self, key: &str, vector: &[f64], md_path: &Path) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let path = self.file_path(key);
info!("Saving {} values for vector {}", vector.len(), key);
let schema = Schema::new(vec![Field::new("element", DataType::Float64, false)]);
let float64_array = Float64Array::from_iter_values::<Vec<f64>>(vector.into());
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(float64_array) as _])
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(vector.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Index {} saved successfully", key);
Ok(())
}
async fn save_index(&self, key: &str, vector: &[usize], md_path: &Path) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let path = self.file_path(key);
info!("Saving {} values for index {}", vector.len(), key);
let schema = Schema::new(vec![Field::new("id", DataType::UInt32, false)]);
let uint32_array = UInt32Array::from_iter_values(vector.iter().map(|&x| x as u32));
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(vector.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Index {} saved successfully", key);
Ok(())
}
async fn load_vector(&self, filename: &str) -> StorageResult<Vec<f64>> {
let path = self.file_path(filename);
info!("Loading vector {} from {:?}", filename, path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
let vector: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
info!("Loaded {} vector values for {}", vector.len(), filename);
Ok(vector)
}
async fn load_index(&self, filename: &str) -> StorageResult<Vec<usize>> {
let path = self.file_path(filename);
info!("Loading vector {} from {:?}", filename, path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.ok_or_else(|| StorageError::Invalid("column type mismatch".into()))?;
let vector: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
info!("Loaded {} vector values for {}", vector.len(), filename);
Ok(vector)
}
async fn save_dense_to_file(data: &DenseMatrix<f64>, path: &Path) -> StorageResult<()> {
use tokio::fs as tokio_fs;
info!("Saving dense matrix to file (async): {:?}", path);
if let Some(parent) = path.parent() {
tokio_fs::try_exists(parent).await.map_err(|e| {
StorageError::Io(format!("Failed to create dir {:?}: {}", parent, e))
})?;
}
let tmp_storage = Self::new(
String::from(path.parent().unwrap().to_str().unwrap()),
String::from("tmp_storage"),
);
let extension = path
.extension()
.and_then(|e| e.to_str())
.ok_or_else(|| StorageError::Invalid(format!("Invalid file path: {:?}", path)))?;
let (n_rows, n_cols) = data.shape();
info!("Saving matrix: {} rows x {} cols", n_rows, n_cols);
match extension {
"lance" => {
let batch = tmp_storage.to_dense_record_batch(data)?;
debug!(
"Created RecordBatch with {} rows for Lance",
batch.num_rows()
);
if batch.num_rows() != n_rows {
return Err(StorageError::Invalid(format!(
"RecordBatch has {} rows but matrix has {} rows",
batch.num_rows(),
n_rows
)));
}
let uri = Self::path_to_uri(path);
tmp_storage.write_lance_batch_async(uri, batch).await?;
info!("Saved dense matrix to Lance: {} x {}", n_rows, n_cols);
Ok(())
}
"parquet" => {
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs::File;
let batch = tmp_storage.to_dense_record_batch(data)?;
debug!(
"Created RecordBatch with {} rows for Parquet",
batch.num_rows()
);
if batch.num_rows() != n_rows {
return Err(StorageError::Invalid(format!(
"RecordBatch has {} rows but matrix has {} rows",
batch.num_rows(),
n_rows
)));
}
let file = File::create(path).map_err(|e| {
StorageError::Io(format!("Failed to create parquet file: {}", e))
})?;
let props = WriterProperties::builder()
.set_compression(parquet::basic::Compression::SNAPPY)
.build();
let mut writer =
ArrowWriter::try_new(file, batch.schema(), Some(props)).map_err(|e| {
StorageError::Parquet(format!("Failed to create parquet writer: {}", e))
})?;
writer
.write(&batch)
.map_err(|e| StorageError::Parquet(format!("Failed to write batch: {}", e)))?;
writer
.close()
.map_err(|e| StorageError::Parquet(format!("Failed to close writer: {}", e)))?;
info!("Saved dense matrix to Parquet: {} x {}", n_rows, n_cols);
Ok(())
}
_ => Err(StorageError::Invalid(format!(
"Unsupported file format: {}. Only .lance and .parquet are supported",
extension
))),
}
}
async fn save_centroid_map(&self, map: &[usize], md_path: &Path) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let key = "centroid_map";
let path = self.file_path(key);
info!("Saving {} centroid map entries", map.len());
let schema = Schema::new(vec![Field::new("centroid_id", DataType::UInt32, false)]);
let uint32_array = UInt32Array::from_iter_values(map.iter().map(|&x| x as u32));
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(uint32_array) as _])
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(map.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Centroid map saved successfully");
Ok(())
}
async fn load_centroid_map(&self) -> StorageResult<Vec<usize>> {
let path = self.file_path("centroid_map");
info!("Loading centroid map from {:?}", path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<UInt32Array>()
.ok_or_else(|| StorageError::Invalid("centroid_id column type mismatch".into()))?;
let map: Vec<usize> = (0..arr.len()).map(|i| arr.value(i) as usize).collect();
info!("Loaded {} centroid map entries", map.len());
Ok(map)
}
async fn save_subcentroid_lambdas(&self, lambdas: &[f64], md_path: &Path) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let key = "subcentroid_lambdas";
let path = self.file_path(key);
info!("Saving {} subcentroid lambda values", lambdas.len());
let schema = Schema::new(vec![Field::new(
"subcentroid_lambda",
DataType::Float64,
false,
)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Float64Array::from(lambdas.to_vec())) as _],
)
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(lambdas.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Subcentroid lambda values saved successfully");
Ok(())
}
async fn load_subcentroid_lambdas(&self) -> StorageResult<Vec<f64>> {
let path = self.file_path("subcentroid_lambdas");
info!("Loading subcentroid lambda values from {:?}", path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| {
StorageError::Invalid("subcentroid_lambda column type mismatch".into())
})?;
let lambdas: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
info!("Loaded {} subcentroid lambda values", lambdas.len());
Ok(lambdas)
}
async fn save_subcentroids(
&self,
subcentroids: &DenseMatrix<f64>,
md_path: &Path,
) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let key = "sub_centroids";
let path = self.file_path(key);
let (n_rows, n_cols) = subcentroids.shape();
info!(
"Saving subcentroids matrix {} x {} at {:?}",
n_rows, n_cols, path
);
let batch = self.to_dense_record_batch(subcentroids)?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
subcentroids.shape(),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
debug!("Subcentroids matrix saved successfully");
Ok(())
}
async fn load_subcentroids(&self) -> StorageResult<Vec<Vec<f64>>> {
let path = self.file_path("sub_centroids");
info!("Loading sub_centroids from {:?}", path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_all_batches_async(uri).await?;
let matrix = self.from_dense_record_batch(&batch)?;
let (n_rows, n_cols) = matrix.shape();
let mut result = Vec::with_capacity(n_rows);
for row_idx in 0..n_rows {
let row: Vec<f64> = (0..n_cols)
.map(|col_idx| *matrix.get((row_idx, col_idx)))
.collect();
result.push(row);
}
info!(
"Loaded sub_centroids: {} x {} as Vec<Vec<f64>>",
n_rows, n_cols
);
Ok(result)
}
async fn save_item_norms(&self, item_norms: &[f64], md_path: &Path) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let key = "item_norms";
let path = self.file_path(key);
info!("Saving {} item norm values", item_norms.len());
let schema = Schema::new(vec![Field::new("norm", DataType::Float64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Float64Array::from(item_norms.to_vec())) as _],
)
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(item_norms.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Item norms saved successfully");
Ok(())
}
async fn load_item_norms(&self) -> StorageResult<Vec<f64>> {
let path = self.file_path("item_norms");
info!("Loading item norms from {:?}", path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| StorageError::Invalid("norm column type mismatch".into()))?;
let norms: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
info!("Loaded {} item norm values", norms.len());
Ok(norms)
}
async fn save_cluster_assignments(
&self,
assignments: &[Option<usize>],
md_path: &Path,
) -> StorageResult<()> {
self.validate_initialized(md_path)?;
let key = "cluster_assignments";
let path = self.file_path(key);
info!("Saving {} cluster assignments", assignments.len());
let values: Vec<i64> = assignments
.iter()
.map(|opt| opt.map(|v| v as i64).unwrap_or(-1))
.collect();
let schema = Schema::new(vec![Field::new("cluster_id", DataType::Int64, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(Int64Array::from(values)) as _],
)
.map_err(|e| StorageError::Lance(e.to_string()))?;
{
let mut metadata = self.load_metadata().await?;
metadata = metadata.add_file(
key,
FileInfo::new(
format!("{}_{}.lance", self.get_name(), key),
"vector",
(assignments.len(), 1),
None,
None,
),
);
self.save_metadata(&metadata).await?;
let uri = Self::path_to_uri(&path);
self.write_lance_batch_async(uri, batch).await?;
}
info!("Cluster assignments saved successfully");
Ok(())
}
async fn load_cluster_assignments(&self) -> StorageResult<Vec<Option<usize>>> {
let path = self.file_path("cluster_assignments");
info!("Loading cluster assignments from {:?}", path);
let uri = Self::path_to_uri(&path);
let batch = self.read_lance_first_batch_async(uri).await?;
let arr = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| StorageError::Invalid("cluster_id column type mismatch".into()))?;
let assignments: Vec<Option<usize>> = (0..arr.len())
.map(|i| {
let val = arr.value(i);
if val < 0 { None } else { Some(val as usize) }
})
.collect();
info!("Loaded {} cluster assignments", assignments.len());
Ok(assignments)
}
}