use arrow::{
array::{Float64Array, RecordBatch, StringArray, UInt64Array},
datatypes::{DataType, Field, Schema},
};
use log::info;
use parquet::{
arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
basic::Compression,
file::properties::WriterProperties,
};
use serde::{Deserialize, Serialize};
use smartcore::linalg::basic::{
arrays::{Array, Array2},
matrix::DenseMatrix,
};
use sprs::CsMat;
use std::{collections::HashMap, fs::File, path::Path, sync::Arc};
use crate::{
builder::{ArrowSpaceBuilder, ConfigValue},
core::ArrowSpace,
storage::StorageError,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArrowSpaceMetadata {
pub name_id: String,
pub timestamp: String,
pub n_rows: usize,
pub n_cols: usize,
pub builder_config: HashMap<String, ConfigValue>,
pub files: HashMap<String, FileInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileInfo {
pub filename: String,
pub file_type: String, pub rows: usize,
pub cols: usize,
pub nnz: Option<usize>, pub size_bytes: Option<u64>,
}
impl ArrowSpaceMetadata {
pub fn new(name_id: &str) -> Self {
Self {
name_id: name_id.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
n_rows: 0,
n_cols: 0,
builder_config: HashMap::new(),
files: HashMap::new(),
}
}
pub fn from_builder(name_id: &str, builder: &ArrowSpaceBuilder) -> Self {
Self {
name_id: name_id.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
n_rows: 0,
n_cols: 0,
builder_config: builder.builder_config_typed(),
files: HashMap::new(),
}
}
pub fn with_builder_config(mut self, config: HashMap<String, ConfigValue>) -> Self {
self.builder_config = config;
self
}
pub fn with_dimensions(mut self, rows: usize, cols: usize) -> Self {
self.n_rows = rows;
self.n_cols = cols;
self
}
pub fn add_file(mut self, key: &str, info: FileInfo) -> Self {
self.files.insert(key.to_string(), info);
self
}
pub fn get_config<'a>(&'a self, key: &str) -> Option<&'a ConfigValue> {
self.builder_config.get(key)
}
pub fn lambda_eps(&self) -> Option<f64> {
self.get_config("lambda_eps").and_then(|v| v.as_f64())
}
pub fn lambda_k(&self) -> Option<usize> {
self.get_config("lambda_k").and_then(|v| v.as_usize())
}
pub fn synthesis(&self) -> Option<crate::search::taumode::TauMode> {
self.get_config("synthesis").and_then(|v| v.as_tau_mode())
}
pub fn config_summary(&self) -> String {
let mut lines = Vec::new();
for (key, value) in &self.builder_config {
lines.push(format!(" {} = {}", key, value));
}
lines.join("\n")
}
}
pub fn save_metadata(
metadata: &ArrowSpaceMetadata,
path: impl AsRef<Path>,
name_id: &str,
) -> Result<(), StorageError> {
let metadata_path = path.as_ref().join(format!("{}_metadata.json", name_id));
let json = serde_json::to_string_pretty(metadata)
.map_err(|e| StorageError::Invalid(format!("Failed to serialize metadata: {}", e)))?;
std::fs::write(&metadata_path, json)
.map_err(|e| StorageError::Io(format!("Failed to write metadata: {}", e)))?;
Ok(())
}
pub fn load_metadata(
path: impl AsRef<Path>,
name_id: &str,
) -> Result<ArrowSpaceMetadata, StorageError> {
let metadata_path = path.as_ref().join(format!("{}_metadata.json", name_id));
info!("loading from {:?}", metadata_path);
let json = std::fs::read_to_string(&metadata_path)
.map_err(|e| StorageError::Io(format!("Failed to read metadata: {}", e)))?;
info!("loading from {:?}", json);
let metadata: ArrowSpaceMetadata = serde_json::from_str(&json)
.map_err(|e| StorageError::Invalid(format!("Failed to parse metadata: {}", e)))?;
Ok(metadata)
}
pub fn save_arrowspace(
aspace: &ArrowSpace,
path: impl AsRef<Path>,
name_id: &str,
) -> Result<(), StorageError> {
let arrowspace_path = path
.as_ref()
.join(format!("{}-arrowspace_metadata.json", name_id));
let json = serde_json::to_string_pretty(&aspace.arrowspace_config_typed())
.map_err(|e| StorageError::Invalid(format!("Failed to serialize metadata: {}", e)))?;
std::fs::write(&arrowspace_path, json)
.map_err(|e| StorageError::Io(format!("Failed to write metadata: {}", e)))?;
Ok(())
}
pub fn save_dense_matrix_with_builder(
matrix: &DenseMatrix<f64>,
path: impl AsRef<Path>,
name_id: &str,
builder: Option<&ArrowSpaceBuilder>,
) -> Result<(), StorageError> {
let config = builder.map(|b| b.builder_config_typed());
save_dense_matrix(matrix, path, name_id, config)
}
pub fn save_dense_matrix(
matrix: &DenseMatrix<f64>,
path: impl AsRef<Path>,
name_id: &str,
builder_config: Option<HashMap<String, ConfigValue>>,
) -> Result<(), StorageError> {
let (n_rows, n_cols) = matrix.shape();
let mut fields = vec![
Field::new("name_id", DataType::Utf8, false),
Field::new("n_rows", DataType::UInt64, false),
Field::new("n_cols", DataType::UInt64, false),
];
for i in 0..n_cols {
fields.push(Field::new(format!("col_{}", i), DataType::Float64, false));
}
let schema = Arc::new(Schema::new(fields));
let name_array = StringArray::from(vec![name_id; n_rows]);
let n_rows_array = UInt64Array::from(vec![n_rows as u64; n_rows]);
let n_cols_array = UInt64Array::from(vec![n_cols as u64; n_rows]);
let mut columns: Vec<Arc<dyn arrow::array::Array>> = vec![
Arc::new(name_array),
Arc::new(n_rows_array),
Arc::new(n_cols_array),
];
for col_idx in 0..n_cols {
let col_data: Vec<f64> = (0..n_rows)
.map(|row_idx| *matrix.get((row_idx, col_idx)))
.collect();
columns.push(Arc::new(Float64Array::from(col_data)));
}
let batch = RecordBatch::try_new(schema.clone(), columns)
.map_err(|e| StorageError::Arrow(e.to_string()))?;
let file_path = path.as_ref().join(format!("{}.parquet", name_id));
let file = File::create(&file_path).map_err(|e| StorageError::Io(e.to_string()))?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))
.map_err(|e| StorageError::Parquet(e.to_string()))?;
writer
.write(&batch)
.map_err(|e| StorageError::Parquet(e.to_string()))?;
writer
.close()
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let file_size = std::fs::metadata(&file_path).map(|m| m.len()).ok();
if let Some(config) = builder_config {
let metadata = ArrowSpaceMetadata::new(name_id)
.with_builder_config(config)
.with_dimensions(n_rows, n_cols)
.add_file(
"matrix",
FileInfo {
filename: format!("{}.parquet", name_id),
file_type: "dense".to_string(),
rows: n_rows,
cols: n_cols,
nnz: None,
size_bytes: file_size,
},
);
save_metadata(&metadata, path.as_ref(), name_id)?;
}
Ok(())
}
pub fn load_dense_matrix(path: impl AsRef<Path>) -> Result<DenseMatrix<f64>, StorageError> {
let file = File::open(path.as_ref()).map_err(|e| StorageError::Io(e.to_string()))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let reader = builder
.build()
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let mut n_rows_total: Option<usize> = None;
let mut n_cols_total: Option<usize> = None;
let mut flat_data: Vec<f64> = Vec::new();
let mut current_row_offset = 0;
for batch_result in reader {
let batch = batch_result.map_err(|e| StorageError::Parquet(e.to_string()))?;
if n_rows_total.is_none() {
let n_rows_col = batch
.column_by_name("n_rows")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| StorageError::Invalid("n_rows column missing".to_string()))?;
let r = n_rows_col.value(0) as usize;
n_rows_total = Some(r);
let n_cols_col = batch
.column_by_name("n_cols")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| StorageError::Invalid("n_cols column missing".to_string()))?;
let c = n_cols_col.value(0) as usize;
n_cols_total = Some(c);
flat_data = vec![0.0; r * c];
}
let total_rows = n_rows_total.unwrap();
let cols = n_cols_total.unwrap();
let batch_rows = batch.num_rows();
for col_idx in 0..cols {
let col_name = format!("col_{}", col_idx);
let col = batch
.column_by_name(&col_name)
.and_then(|c| c.as_any().downcast_ref::<Float64Array>())
.ok_or_else(|| StorageError::Invalid(format!("Column {} missing", col_name)))?;
let start_idx = (col_idx * total_rows) + current_row_offset;
let end_idx = start_idx + batch_rows;
flat_data[start_idx..end_idx].copy_from_slice(col.values());
}
current_row_offset += batch_rows;
}
if n_rows_total.is_none() {
return Err(StorageError::Invalid("No data in parquet file".to_string()));
}
let n_rows = n_rows_total.unwrap();
let n_cols = n_cols_total.unwrap();
if current_row_offset != n_rows {
return Err(StorageError::Invalid(format!(
"Parquet file contained {} rows, but metadata claimed {}",
current_row_offset, n_rows
)));
}
let matrix = DenseMatrix::from_iterator(flat_data.into_iter(), n_rows, n_cols, 1);
Ok(matrix)
}
pub fn save_sparse_matrix_with_builder(
matrix: &CsMat<f64>,
path: impl AsRef<Path>,
name_id: &str,
builder: Option<&ArrowSpaceBuilder>,
) -> Result<(), StorageError> {
let config = builder.map(|b| b.builder_config_typed());
save_sparse_matrix(matrix, path, name_id, config)
}
pub fn save_sparse_matrix(
matrix: &CsMat<f64>,
path: impl AsRef<Path>,
name_id: &str,
builder_config: Option<HashMap<String, ConfigValue>>,
) -> Result<(), StorageError> {
let (n_rows, n_cols) = matrix.shape();
let nnz = matrix.nnz();
let mut rows = Vec::with_capacity(nnz);
let mut cols = Vec::with_capacity(nnz);
let mut vals = Vec::with_capacity(nnz);
for (row_idx, row_vec) in matrix.outer_iterator().enumerate() {
for (col_idx, &value) in row_vec.iter() {
rows.push(row_idx as u64);
cols.push(col_idx as u64);
vals.push(value);
}
}
let schema = Arc::new(Schema::new(vec![
Field::new("name_id", DataType::Utf8, false),
Field::new("n_rows", DataType::UInt64, false),
Field::new("n_cols", DataType::UInt64, false),
Field::new("nnz", DataType::UInt64, false),
Field::new("row", DataType::UInt64, false),
Field::new("col", DataType::UInt64, false),
Field::new("value", DataType::Float64, false),
]));
let name_array = StringArray::from(vec![name_id; nnz]);
let n_rows_array = UInt64Array::from(vec![n_rows as u64; nnz]);
let n_cols_array = UInt64Array::from(vec![n_cols as u64; nnz]);
let nnz_array = UInt64Array::from(vec![nnz as u64; nnz]);
let row_array = UInt64Array::from(rows);
let col_array = UInt64Array::from(cols);
let val_array = Float64Array::from(vals);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(name_array),
Arc::new(n_rows_array),
Arc::new(n_cols_array),
Arc::new(nnz_array),
Arc::new(row_array),
Arc::new(col_array),
Arc::new(val_array),
],
)
.map_err(|e| StorageError::Arrow(e.to_string()))?;
let file_path = path.as_ref().join(format!("{}.parquet", name_id));
let file = File::create(&file_path).map_err(|e| StorageError::Io(e.to_string()))?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))
.map_err(|e| StorageError::Parquet(e.to_string()))?;
writer
.write(&batch)
.map_err(|e| StorageError::Parquet(e.to_string()))?;
writer
.close()
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let file_size = std::fs::metadata(&file_path).map(|m| m.len()).ok();
if let Some(config) = builder_config {
let metadata = ArrowSpaceMetadata::new(name_id)
.with_builder_config(config)
.with_dimensions(n_rows, n_cols)
.add_file(
"matrix",
FileInfo {
filename: format!("{}.parquet", name_id),
file_type: "sparse".to_string(),
rows: n_rows,
cols: n_cols,
nnz: Some(nnz),
size_bytes: file_size,
},
);
save_metadata(&metadata, path.as_ref(), name_id)?;
}
Ok(())
}
pub fn load_sparse_matrix(path: impl AsRef<Path>) -> Result<CsMat<f64>, StorageError> {
let file = File::open(path.as_ref()).map_err(|e| StorageError::Io(e.to_string()))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let reader = builder
.build()
.map_err(|e| StorageError::Parquet(e.to_string()))?;
use sprs::TriMat;
let mut trimat: Option<TriMat<f64>> = None;
for batch_result in reader {
let batch = batch_result.map_err(|e| StorageError::Parquet(e.to_string()))?;
if trimat.is_none() {
let n_rows_col = batch
.column_by_name("n_rows")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| StorageError::Invalid("n_rows missing".to_string()))?;
let n_rows = n_rows_col.value(0) as usize;
let n_cols_col = batch
.column_by_name("n_cols")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| StorageError::Invalid("n_cols missing".to_string()))?;
let n_cols = n_cols_col.value(0) as usize;
trimat = Some(TriMat::new((n_rows, n_cols)));
}
let row_col = batch
.column_by_name("row")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| StorageError::Invalid("row missing".to_string()))?;
let col_col = batch
.column_by_name("col")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
.ok_or_else(|| StorageError::Invalid("col missing".to_string()))?;
let val_col = batch
.column_by_name("value")
.and_then(|c| c.as_any().downcast_ref::<Float64Array>())
.ok_or_else(|| StorageError::Invalid("value missing".to_string()))?;
if let Some(tm) = &mut trimat {
for i in 0..row_col.len() {
tm.add_triplet(
row_col.value(i) as usize,
col_col.value(i) as usize,
val_col.value(i),
);
}
}
}
trimat
.ok_or_else(|| StorageError::Invalid("No data in parquet file".to_string()))
.map(|tm| tm.to_csr())
}
pub fn save_arrowspace_checkpoint_with_builder(
path: impl AsRef<Path>,
checkpoint_name: &str,
raw_data: &DenseMatrix<f64>,
adjacency: &CsMat<f64>,
centroids: &DenseMatrix<f64>,
laplacian: &CsMat<f64>,
signals: &CsMat<f64>,
builder: &ArrowSpaceBuilder,
) -> Result<(), StorageError> {
let base_path = path.as_ref();
std::fs::create_dir_all(base_path)
.map_err(|e| StorageError::Io(format!("Failed to create directory: {}", e)))?;
save_dense_matrix(
raw_data,
base_path,
&format!("{}_raw_data", checkpoint_name),
None,
)?;
save_sparse_matrix(
adjacency,
base_path,
&format!("{}_adjacency", checkpoint_name),
None,
)?;
save_dense_matrix(
centroids,
base_path,
&format!("{}_centroids", checkpoint_name),
None,
)?;
save_sparse_matrix(
laplacian,
base_path,
&format!("{}_laplacian", checkpoint_name),
None,
)?;
save_sparse_matrix(
signals,
base_path,
&format!("{}_signals", checkpoint_name),
None,
)?;
let mut metadata = ArrowSpaceMetadata::from_builder(checkpoint_name, builder)
.with_dimensions(raw_data.shape().0, raw_data.shape().1);
let artifacts = vec![
("raw_data", "dense", raw_data.shape(), None),
(
"adjacency",
"sparse",
adjacency.shape(),
Some(adjacency.nnz()),
),
("centroids", "dense", centroids.shape(), None),
(
"laplacian",
"sparse",
laplacian.shape(),
Some(laplacian.nnz()),
),
("signals", "sparse", signals.shape(), Some(signals.nnz())),
];
for (name, file_type, (rows, cols), nnz) in artifacts {
let filename = format!("{}_{}.parquet", checkpoint_name, name);
let file_size = std::fs::metadata(base_path.join(&filename))
.map(|m| m.len())
.ok();
metadata = metadata.add_file(
name,
FileInfo {
filename,
file_type: file_type.to_string(),
rows,
cols,
nnz,
size_bytes: file_size,
},
);
}
save_metadata(&metadata, base_path, checkpoint_name)?;
Ok(())
}
pub fn save_lambda_with_builder(
lambdas: &[f64],
path: impl AsRef<Path>,
name_id: &str,
builder: Option<&ArrowSpaceBuilder>,
) -> Result<(), StorageError> {
let config = builder.map(|b| b.builder_config_typed());
save_lambda(lambdas, path, name_id, config)
}
pub fn save_lambda(
lambdas: &[f64],
path: impl AsRef<Path>,
name_id: &str,
builder_config: Option<HashMap<String, ConfigValue>>,
) -> Result<(), StorageError> {
let n_values = lambdas.len();
if n_values == 0 {
return Err(StorageError::Invalid(
"Cannot save empty lambda vector".to_string(),
));
}
let schema = Arc::new(Schema::new(vec![
Field::new("name_id", DataType::Utf8, false),
Field::new("n_values", DataType::UInt64, false),
Field::new("row_index", DataType::UInt64, false),
Field::new("lambda", DataType::Float64, false),
]));
let name_array = StringArray::from(vec![name_id; n_values]);
let n_values_array = UInt64Array::from(vec![n_values as u64; n_values]);
let row_index_array = UInt64Array::from((0..n_values as u64).collect::<Vec<_>>());
let lambda_array = Float64Array::from(lambdas.to_vec());
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(name_array),
Arc::new(n_values_array),
Arc::new(row_index_array),
Arc::new(lambda_array),
],
)
.map_err(|e| StorageError::Arrow(e.to_string()))?;
let file_path = path.as_ref().join(format!("{}.parquet", name_id));
let file = File::create(&file_path).map_err(|e| StorageError::Io(e.to_string()))?;
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))
.map_err(|e| StorageError::Parquet(e.to_string()))?;
writer
.write(&batch)
.map_err(|e| StorageError::Parquet(e.to_string()))?;
writer
.close()
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let file_size = std::fs::metadata(&file_path).map(|m| m.len()).ok();
if let Some(config) = builder_config {
let metadata = ArrowSpaceMetadata::new(name_id)
.with_builder_config(config)
.with_dimensions(n_values, 1) .add_file(
"lambda_vector",
FileInfo {
filename: format!("{}.parquet", name_id),
file_type: "lambda_vector".to_string(),
rows: n_values,
cols: 1,
nnz: None,
size_bytes: file_size,
},
);
save_metadata(&metadata, path.as_ref(), name_id)?;
}
Ok(())
}
pub fn load_lambda(path: impl AsRef<Path>) -> Result<Vec<f64>, StorageError> {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
let file = File::open(path.as_ref()).map_err(|e| StorageError::Io(e.to_string()))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let reader = builder
.build()
.map_err(|e| StorageError::Parquet(e.to_string()))?;
let mut lambdas = Vec::new();
for batch_result in reader {
let batch = batch_result.map_err(|e| StorageError::Parquet(e.to_string()))?;
if lambdas.is_empty() {
if let Some(n_values_col) = batch
.column_by_name("n_values")
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
{
if !n_values_col.is_empty() {
let n_values = n_values_col.value(0) as usize;
lambdas.reserve(n_values);
}
}
}
let lambda_col = batch
.column_by_name("lambda")
.and_then(|c| c.as_any().downcast_ref::<Float64Array>())
.ok_or_else(|| StorageError::Invalid("lambda column missing".to_string()))?;
lambdas.extend_from_slice(lambda_col.values());
}
Ok(lambdas)
}