use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use arrow::array::*;
use arrow::datatypes::DataType;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use serde_json::Value;
use crate::error::Error;
#[derive(Debug, Clone)]
pub enum Filter {
Eq(String, Value),
Near(String, f64, f64),
Gte(String, f64),
}
pub type ColumnInfo = (String, String);
pub struct ParquetStore {
data_dir: PathBuf,
cache: RwLock<HashMap<String, Arc<Vec<Value>>>>,
}
impl ParquetStore {
pub fn new(data_dir: impl AsRef<Path>) -> Self {
Self {
data_dir: data_dir.as_ref().to_path_buf(),
cache: RwLock::new(HashMap::new()),
}
}
pub fn load(&self, rel_path: &str) -> crate::Result<Arc<Vec<Value>>> {
{
let c = self.cache.read().unwrap_or_else(|e| e.into_inner());
if let Some(rows) = c.get(rel_path) {
return Ok(Arc::clone(rows));
}
}
let path = self.data_dir.join(rel_path);
if !path.exists() {
#[cfg(feature = "fetch")]
{
let marker = self.data_dir.join(".lazy_base_url");
if marker.exists() {
crate::DataDir::from_root(&self.data_dir).fetch_file(rel_path)?;
}
}
if !path.exists() {
return Err(Error::DataDirNotFound(path));
}
}
let rows = Arc::new(parse_parquet_file(&path)?);
let mut c = self.cache.write().unwrap_or_else(|e| e.into_inner());
c.entry(rel_path.to_string())
.or_insert_with(|| Arc::clone(&rows));
Ok(rows)
}
pub fn load_filtered(&self, rel_path: &str, filters: &[Filter]) -> crate::Result<Vec<Value>> {
let all = self.load(rel_path)?;
if filters.is_empty() {
return Ok((*all).clone());
}
Ok(all
.iter()
.filter(|row| filters.iter().all(|f| matches_filter(row, f)))
.cloned()
.collect())
}
pub fn schema(&self, rel_path: &str) -> crate::Result<Vec<ColumnInfo>> {
let path = self.data_dir.join(rel_path);
let file = fs::File::open(&path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let schema = builder.schema();
Ok(schema
.fields()
.iter()
.map(|f| (f.name().clone(), format!("{}", f.data_type())))
.collect())
}
}
fn matches_filter(row: &Value, filter: &Filter) -> bool {
match filter {
Filter::Eq(col, expected) => row.get(col).is_some_and(|v| v == expected),
Filter::Near(col, target, tolerance) => row
.get(col)
.and_then(|v| v.as_f64())
.is_some_and(|v| (v - target).abs() < *tolerance),
Filter::Gte(col, threshold) => row
.get(col)
.and_then(|v| v.as_f64())
.is_some_and(|v| v >= *threshold),
}
}
pub fn parse_parquet_file(path: &Path) -> crate::Result<Vec<Value>> {
let data = fs::read(path)?;
let data = bytes::Bytes::from(data);
let reader = ParquetRecordBatchReaderBuilder::try_new(data)?.build()?;
let mut rows = Vec::new();
for batch_result in reader {
let batch = batch_result?;
let schema = batch.schema();
for row_idx in 0..batch.num_rows() {
let mut obj = serde_json::Map::new();
for (col_idx, field) in schema.fields().iter().enumerate() {
let col = batch.column(col_idx);
let val = column_value_to_json(col.as_ref(), row_idx);
obj.insert(field.name().clone(), val);
}
rows.push(Value::Object(obj));
}
}
Ok(rows)
}
pub fn column_value_to_json(col: &dyn Array, idx: usize) -> Value {
if col.is_null(idx) {
return Value::Null;
}
if let Some(a) = col.as_any().downcast_ref::<Float64Array>() {
let v = a.value(idx);
Value::Number(
serde_json::Number::from_f64(v).unwrap_or_else(|| serde_json::Number::from(0)),
)
} else if let Some(a) = col.as_any().downcast_ref::<Float32Array>() {
let v = a.value(idx) as f64;
Value::Number(
serde_json::Number::from_f64(v).unwrap_or_else(|| serde_json::Number::from(0)),
)
} else if let Some(a) = col.as_any().downcast_ref::<Int64Array>() {
Value::Number(a.value(idx).into())
} else if let Some(a) = col.as_any().downcast_ref::<Int32Array>() {
Value::Number(a.value(idx).into())
} else if let Some(a) = col.as_any().downcast_ref::<Int16Array>() {
Value::Number((a.value(idx) as i64).into())
} else if let Some(a) = col.as_any().downcast_ref::<UInt8Array>() {
Value::Number((a.value(idx) as i64).into())
} else if let Some(a) = col.as_any().downcast_ref::<BooleanArray>() {
Value::Bool(a.value(idx))
} else if let Some(a) = col.as_any().downcast_ref::<StringArray>() {
Value::String(a.value(idx).to_string())
} else {
if let Ok(cast) = arrow::compute::cast(col, &DataType::Utf8) {
if let Some(s) = cast.as_any().downcast_ref::<StringArray>() {
return Value::String(s.value(idx).to_string());
}
}
Value::String(format!("{col:?}"))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn _assert_send<T: Send>() {}
fn _assert_sync<T: Sync>() {}
#[test]
fn parquet_store_is_send_sync() {
_assert_send::<ParquetStore>();
_assert_sync::<ParquetStore>();
}
fn data_dir() -> PathBuf {
Path::new(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..")
.join("..")
.join("data")
}
#[test]
#[ignore = "requires nucl-parquet data files"]
fn load_decay_parquet() {
let store = ParquetStore::new(data_dir());
let rows = store.load("meta/decay.parquet").unwrap();
assert!(!rows.is_empty(), "decay.parquet should have rows");
let first = &rows[0];
assert!(first.get("Z").is_some());
assert!(first.get("A").is_some());
assert!(first.get("decay_mode").is_some());
}
#[test]
#[ignore = "requires nucl-parquet data files"]
fn filter_by_z() {
let store = ParquetStore::new(data_dir());
let co = store
.load_filtered(
"meta/decay.parquet",
&[Filter::Eq("Z".into(), serde_json::json!(27))],
)
.unwrap();
assert!(!co.is_empty(), "Co (Z=27) should have decay entries");
assert!(co
.iter()
.all(|r| r.get("Z").and_then(|v| v.as_i64()) == Some(27)));
}
#[test]
#[ignore = "requires nucl-parquet data files"]
fn schema_introspection() {
let store = ParquetStore::new(data_dir());
let schema = store.schema("meta/decay.parquet").unwrap();
let names: Vec<_> = schema.iter().map(|(n, _)| n.as_str()).collect();
assert!(names.contains(&"Z"));
assert!(names.contains(&"decay_mode"));
assert!(names.contains(&"branching"));
}
#[test]
#[ignore = "requires nucl-parquet data files"]
fn per_element_file() {
let store = ParquetStore::new(data_dir());
let rows = store
.load_filtered(
"meta/ensdf/radiation/Ni.parquet",
&[Filter::Eq("A".into(), serde_json::json!(60))],
)
.unwrap();
assert!(!rows.is_empty(), "Ni-60 radiation should have rows");
}
#[test]
#[ignore = "requires nucl-parquet data files"]
fn cache_hit() {
let store = ParquetStore::new(data_dir());
let _ = store.load("meta/decay.parquet").unwrap();
let a = store.load("meta/decay.parquet").unwrap();
let b = store.load("meta/decay.parquet").unwrap();
assert!(Arc::ptr_eq(&a, &b), "cached loads should return same Arc");
}
}