use std::sync::Arc;
use memuse::DynamicUsage;
use polars::prelude::*;
use serde::Serialize;
use crate::column::{CanonicalColumnName, SourceColumnName};
#[derive(Clone)]
pub struct DeferredTransform {
label: String,
apply: Arc<dyn Fn(LazyFrame) -> LazyFrame + Send + Sync>,
}
impl DeferredTransform {
pub fn new(
label: impl Into<String>,
apply: impl Fn(LazyFrame) -> LazyFrame + Send + Sync + 'static,
) -> Self {
Self {
label: label.into(),
apply: Arc::new(apply),
}
}
pub fn apply(&self, lf: LazyFrame) -> LazyFrame {
(self.apply)(lf)
}
pub fn label(&self) -> &str {
&self.label
}
}
impl std::fmt::Debug for DeferredTransform {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DeferredTransform({})", self.label)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct DataSourceName(String);
impl DataSourceName {
pub fn new(name: impl Into<String>) -> Self {
Self(name.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for DataSourceName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<&str> for DataSourceName {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
impl From<String> for DataSourceName {
fn from(s: String) -> Self {
Self(s)
}
}
#[derive(Debug, Clone)]
pub enum FragmentRef {
ColumnRef(ColumnRefData),
Stacked(Vec<StackedFragment>),
Materialized(DataFrame),
}
#[derive(Debug, Clone)]
pub struct ColumnRefData {
pub source: Arc<DataFrame>,
pub value_column: SourceColumnName,
pub canonical_name: CanonicalColumnName,
pub source_name: DataSourceName,
pub column_mappings: Vec<(SourceColumnName, CanonicalColumnName)>,
pub transform: Option<DeferredTransform>,
}
#[derive(Debug, Clone)]
pub struct StackedFragment {
pub source: Arc<DataFrame>,
pub value_column: SourceColumnName,
pub canonical_name: CanonicalColumnName,
pub source_name: DataSourceName,
pub column_mappings: Vec<(SourceColumnName, CanonicalColumnName)>,
pub transform: Option<DeferredTransform>,
}
impl FragmentRef {
pub fn as_dataframe(&self) -> Result<DataFrame, PolarsError> {
self.as_lazy()?.collect()
}
pub fn as_lazy(&self) -> Result<LazyFrame, PolarsError> {
match self {
FragmentRef::ColumnRef(ref_data) => {
let exprs = build_select_exprs(
&ref_data.column_mappings,
&ref_data.value_column,
&ref_data.canonical_name,
);
let mut lf = (*ref_data.source).clone().lazy().select(exprs);
if let Some(ref transform) = ref_data.transform {
lf = transform.apply(lf);
}
Ok(lf)
}
FragmentRef::Stacked(fragments) => {
if fragments.is_empty() {
return Ok(DataFrame::empty().lazy());
}
if fragments.len() == 1 {
let f = &fragments[0];
let exprs =
build_select_exprs(&f.column_mappings, &f.value_column, &f.canonical_name);
let mut lf = (*f.source).clone().lazy().select(exprs);
if let Some(ref transform) = f.transform {
lf = transform.apply(lf);
}
return Ok(lf);
}
let mut lazy_frames = Vec::with_capacity(fragments.len());
for frag in fragments {
let exprs = build_select_exprs(
&frag.column_mappings,
&frag.value_column,
&frag.canonical_name,
);
let mut lf = (*frag.source).clone().lazy().select(exprs);
if let Some(ref transform) = frag.transform {
lf = transform.apply(lf);
}
lazy_frames.push(lf);
}
concat(lazy_frames, UnionArgs::default())
}
FragmentRef::Materialized(df) => Ok(df.clone().lazy()),
}
}
pub fn height(&self) -> usize {
match self {
FragmentRef::ColumnRef(r) => r.source.height(),
FragmentRef::Stacked(frags) => frags.iter().map(|f| f.source.height()).sum(),
FragmentRef::Materialized(df) => df.height(),
}
}
pub fn is_materialized(&self) -> bool {
matches!(self, FragmentRef::Materialized(_))
}
pub fn owned_bytes(&self) -> usize {
match self {
FragmentRef::ColumnRef(_) => 0,
FragmentRef::Stacked(_) => 0,
FragmentRef::Materialized(df) => df.estimated_size(),
}
}
pub fn shared_source_bytes(&self) -> usize {
match self {
FragmentRef::ColumnRef(r) => r.source.estimated_size(),
FragmentRef::Stacked(frags) => frags.iter().map(|f| f.source.estimated_size()).sum(),
FragmentRef::Materialized(_) => 0,
}
}
pub fn source_arc_ptrs(&self) -> Vec<usize> {
match self {
FragmentRef::ColumnRef(r) => vec![Arc::as_ptr(&r.source) as usize],
FragmentRef::Stacked(frags) => frags
.iter()
.map(|f| Arc::as_ptr(&f.source) as usize)
.collect(),
FragmentRef::Materialized(_) => vec![],
}
}
pub fn shared_source_bytes_for_ptr(&self, ptr: usize) -> usize {
match self {
FragmentRef::ColumnRef(r) => {
if Arc::as_ptr(&r.source) as usize == ptr {
r.source.estimated_size()
} else {
0
}
}
FragmentRef::Stacked(frags) => frags
.iter()
.find(|f| Arc::as_ptr(&f.source) as usize == ptr)
.map(|f| f.source.estimated_size())
.unwrap_or(0),
FragmentRef::Materialized(_) => 0,
}
}
pub fn raw_source_dataframe(&self) -> Option<&DataFrame> {
match self {
FragmentRef::ColumnRef(r) => Some(&*r.source),
FragmentRef::Materialized(df) => Some(df),
FragmentRef::Stacked(_) => None,
}
}
pub fn source_name(&self) -> Option<&DataSourceName> {
match self {
FragmentRef::ColumnRef(r) => Some(&r.source_name),
FragmentRef::Stacked(frags) => frags.first().map(|f| &f.source_name),
FragmentRef::Materialized(_) => None,
}
}
pub fn physical_column_name(&self, canonical: &str) -> Option<String> {
match self {
FragmentRef::ColumnRef(r) => {
if r.canonical_name.as_str() == canonical {
return Some(r.value_column.as_str().to_string());
}
r.column_mappings
.iter()
.find(|(_, canon)| canon.as_str() == canonical)
.map(|(src, _)| src.as_str().to_string())
}
_ => Some(canonical.to_string()),
}
}
pub fn has_transform(&self) -> bool {
match self {
FragmentRef::ColumnRef(r) => r.transform.is_some(),
FragmentRef::Stacked(frags) => frags.iter().any(|f| f.transform.is_some()),
FragmentRef::Materialized(_) => false,
}
}
pub fn storage_description(&self) -> StorageDescription {
match self {
FragmentRef::ColumnRef(r) => StorageDescription {
kind: "column_ref".to_string(),
source_count: 1,
sources: vec![r.source_name.to_string()],
rows: r.source.height(),
},
FragmentRef::Stacked(frags) => StorageDescription {
kind: "stacked".to_string(),
source_count: frags.len(),
sources: frags.iter().map(|f| f.source_name.to_string()).collect(),
rows: frags.iter().map(|f| f.source.height()).sum(),
},
FragmentRef::Materialized(df) => StorageDescription {
kind: "materialized".to_string(),
source_count: 1,
sources: vec![],
rows: df.height(),
},
}
}
}
impl DynamicUsage for FragmentRef {
fn dynamic_usage(&self) -> usize {
match self {
FragmentRef::ColumnRef(r) => {
r.column_mappings.len()
* std::mem::size_of::<(SourceColumnName, CanonicalColumnName)>()
+ r.value_column.as_str().len()
+ r.canonical_name.as_str().len()
+ r.source_name.as_str().len()
}
FragmentRef::Stacked(frags) => frags
.iter()
.map(|f| {
f.column_mappings.len()
* std::mem::size_of::<(SourceColumnName, CanonicalColumnName)>()
+ f.value_column.as_str().len()
+ f.canonical_name.as_str().len()
+ f.source_name.as_str().len()
})
.sum(),
FragmentRef::Materialized(df) => df.estimated_size(),
}
}
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
let usage = self.dynamic_usage();
(usage, Some(usage))
}
}
impl DynamicUsage for ColumnRefData {
fn dynamic_usage(&self) -> usize {
self.column_mappings.len() * std::mem::size_of::<(SourceColumnName, CanonicalColumnName)>()
+ self.value_column.as_str().len()
+ self.canonical_name.as_str().len()
+ self.source_name.as_str().len()
}
fn dynamic_usage_bounds(&self) -> (usize, Option<usize>) {
let usage = self.dynamic_usage();
(usage, Some(usage))
}
}
#[derive(Debug, Clone, Serialize)]
pub struct StorageDescription {
pub kind: String,
pub source_count: usize,
pub sources: Vec<String>,
pub rows: usize,
}
fn build_select_exprs(
column_mappings: &[(SourceColumnName, CanonicalColumnName)],
value_source: &SourceColumnName,
value_canonical: &CanonicalColumnName,
) -> Vec<Expr> {
let mut exprs: Vec<Expr> = column_mappings
.iter()
.map(|(src, canon)| col(src.as_str()).alias(canon.as_str()))
.collect();
exprs.push(col(value_source.as_str()).alias(value_canonical.as_str()));
exprs
}
#[cfg(test)]
mod tests {
use super::*;
fn make_source_df() -> DataFrame {
DataFrame::new(vec![
Column::new("station_id".into(), &["A", "A", "B", "B"]),
Column::new("obs_time".into(), &[1i64, 2, 1, 2]),
Column::new("sump_reading".into(), &[1.0f64, 2.0, 3.0, 4.0]),
Column::new("discharge_reading".into(), &[10.0f64, 20.0, 30.0, 40.0]),
])
.unwrap()
}
fn standard_mappings() -> Vec<(SourceColumnName, CanonicalColumnName)> {
vec![
(
SourceColumnName::new("station_id"),
CanonicalColumnName::new("station_name"),
),
(
SourceColumnName::new("obs_time"),
CanonicalColumnName::new("timestamp"),
),
]
}
#[test]
fn test_column_ref_selects_and_renames() {
let source = Arc::new(make_source_df());
let storage = FragmentRef::ColumnRef(ColumnRefData {
source: source.clone(),
value_column: SourceColumnName::new("sump_reading"),
canonical_name: CanonicalColumnName::new("sump"),
source_name: DataSourceName::new("scada"),
column_mappings: standard_mappings(),
transform: None,
});
let df = storage.as_dataframe().unwrap();
assert_eq!(df.width(), 3); assert_eq!(df.height(), 4);
assert!(df.column("sump").is_ok());
assert!(df.column("station_name").is_ok());
assert!(df.column("timestamp").is_ok());
assert!(df.column("station_id").is_err());
assert!(df.column("sump_reading").is_err());
}
#[test]
fn test_stacked_combines_fragments() {
let source_a = Arc::new(
DataFrame::new(vec![
Column::new("station_id".into(), &["A", "A"]),
Column::new("obs_time".into(), &[1i64, 2]),
Column::new("sump_reading".into(), &[1.0f64, 2.0]),
])
.unwrap(),
);
let source_b = Arc::new(
DataFrame::new(vec![
Column::new("station_id".into(), &["B", "B"]),
Column::new("obs_time".into(), &[1i64, 2]),
Column::new("sump_reading".into(), &[3.0f64, 4.0]),
])
.unwrap(),
);
let mappings = standard_mappings();
let storage = FragmentRef::Stacked(vec![
StackedFragment {
source: source_a,
value_column: SourceColumnName::new("sump_reading"),
canonical_name: CanonicalColumnName::new("sump"),
source_name: DataSourceName::new("store"),
column_mappings: mappings.clone(),
transform: None,
},
StackedFragment {
source: source_b,
value_column: SourceColumnName::new("sump_reading"),
canonical_name: CanonicalColumnName::new("sump"),
source_name: DataSourceName::new("adhoc"),
column_mappings: mappings,
transform: None,
},
]);
let df = storage.as_dataframe().unwrap();
assert_eq!(df.height(), 4);
assert_eq!(df.width(), 3);
assert!(df.column("sump").is_ok());
assert!(df.column("station_name").is_ok());
}
#[test]
fn test_materialized_returns_as_is() {
let df = DataFrame::new(vec![
Column::new("station_name".into(), &["A"]),
Column::new("timestamp".into(), &[1i64]),
Column::new("engine".into(), &["1"]),
Column::new("engines_on_count".into(), &[1.0f64]),
])
.unwrap();
let storage = FragmentRef::Materialized(df.clone());
let result = storage.as_dataframe().unwrap();
assert_eq!(result.height(), 1);
assert_eq!(result.width(), 4);
}
#[test]
fn test_column_ref_shared_across_measurements() {
let source = Arc::new(make_source_df());
let mappings = standard_mappings();
let sump_ref = FragmentRef::ColumnRef(ColumnRefData {
source: source.clone(),
value_column: SourceColumnName::new("sump_reading"),
canonical_name: CanonicalColumnName::new("sump"),
source_name: DataSourceName::new("scada"),
column_mappings: mappings.clone(),
transform: None,
});
let discharge_ref = FragmentRef::ColumnRef(ColumnRefData {
source: source.clone(),
value_column: SourceColumnName::new("discharge_reading"),
canonical_name: CanonicalColumnName::new("discharge"),
source_name: DataSourceName::new("scada"),
column_mappings: mappings,
transform: None,
});
assert_eq!(Arc::strong_count(&source), 3);
let sump_df = sump_ref.as_dataframe().unwrap();
let discharge_df = discharge_ref.as_dataframe().unwrap();
assert!(sump_df.column("sump").is_ok());
assert!(discharge_df.column("discharge").is_ok());
assert!(sump_df.column("discharge").is_err());
assert!(discharge_df.column("sump").is_err());
}
#[test]
fn test_lazy_produces_same_result() {
let source = Arc::new(make_source_df());
let storage = FragmentRef::ColumnRef(ColumnRefData {
source,
value_column: SourceColumnName::new("sump_reading"),
canonical_name: CanonicalColumnName::new("sump"),
source_name: DataSourceName::new("scada"),
column_mappings: standard_mappings(),
transform: None,
});
let eager = storage.as_dataframe().unwrap();
let lazy = storage.as_lazy().unwrap().collect().unwrap();
assert_eq!(eager.height(), lazy.height());
assert_eq!(eager.width(), lazy.width());
assert_eq!(eager.get_column_names(), lazy.get_column_names());
}
}