use std::collections::HashSet;
use std::sync::Arc;
use polars::prelude::*;
use crate::error::Result;
pub trait Source: Send + Sync {
fn frame(&self) -> Result<LazyFrame>;
fn columns(&self) -> Result<HashSet<String>> {
let mut lf = self.frame()?;
let schema = lf.collect_schema()?;
Ok(schema.iter_names().map(|s| s.to_string()).collect())
}
fn schema(&self) -> Result<Vec<(String, String)>> {
let mut lf = self.frame()?;
let schema = lf.collect_schema()?;
Ok(schema
.iter()
.map(|(n, dt)| (n.to_string(), format!("{dt}")))
.collect())
}
}
pub struct FrameSource {
df: DataFrame,
}
impl FrameSource {
pub fn new(df: DataFrame) -> Self {
FrameSource { df }
}
}
impl Source for FrameSource {
fn frame(&self) -> Result<LazyFrame> {
Ok(self.df.clone().lazy())
}
}
pub struct ParquetSource {
path: String,
}
impl ParquetSource {
pub fn new(path: impl Into<String>) -> Self {
ParquetSource { path: path.into() }
}
}
impl Source for ParquetSource {
fn frame(&self) -> Result<LazyFrame> {
Ok(LazyFrame::scan_parquet(
PlPath::new(&self.path),
ScanArgsParquet::default(),
)?)
}
}
pub struct CsvSource {
path: String,
}
impl CsvSource {
pub fn new(path: impl Into<String>) -> Self {
CsvSource { path: path.into() }
}
}
impl Source for CsvSource {
fn frame(&self) -> Result<LazyFrame> {
Ok(LazyCsvReader::new(PlPath::new(&self.path))
.with_has_header(true)
.finish()?)
}
}
pub struct JoinSource {
base: Arc<dyn Source>,
dims: DataFrame,
on: String,
}
impl JoinSource {
pub fn new(base: Arc<dyn Source>, dims: DataFrame, on: impl Into<String>) -> Self {
JoinSource {
base,
dims,
on: on.into(),
}
}
pub fn from_snapshot(
base: Arc<dyn Source>,
snapshot: &dyn Source,
on: impl Into<String>,
dim_cols: &[String],
) -> Result<Self> {
let on = on.into();
let available = snapshot.columns()?;
let base_cols = base.columns()?;
let mut select: Vec<Expr> = vec![col(on.as_str())];
let mut seen: HashSet<String> = HashSet::new();
seen.insert(on.clone());
for c in dim_cols {
if c == &on {
continue;
}
if available.contains(c) && base_cols.contains(c) {
return Err(crate::error::Error::Schema(format!(
"dims_from column {c:?} exists on BOTH the series frame and the \
snapshot — joining it would silently bind a filter/row_filter to \
the series' time-varying value, not the snapshot dimension. Use a \
dedicated level column (e.g. a bucketed `{c}_bucket`) or rename one \
side so the dim is unambiguous."
)));
}
if available.contains(c) && seen.insert(c.clone()) {
select.push(col(c.as_str()));
}
}
let dims = snapshot
.frame()?
.select(select)
.unique(Some(by_name([on.as_str()], true)), UniqueKeepStrategy::Any)
.collect()?;
Ok(JoinSource { base, dims, on })
}
}
impl Source for JoinSource {
fn frame(&self) -> Result<LazyFrame> {
let left = self.base.frame()?;
let right = self.dims.clone().lazy();
Ok(left.join(
right,
[col(self.on.as_str())],
[col(self.on.as_str())],
JoinArgs::new(JoinType::Left),
))
}
}
pub struct ClosureSource {
deps: std::collections::HashMap<String, Arc<dyn Source>>,
#[allow(clippy::type_complexity)]
f: Arc<
dyn Fn(&std::collections::HashMap<String, LazyFrame>) -> PolarsResult<LazyFrame>
+ Send
+ Sync,
>,
}
impl ClosureSource {
pub fn new<F>(deps: std::collections::HashMap<String, Arc<dyn Source>>, f: F) -> Self
where
F: Fn(&std::collections::HashMap<String, LazyFrame>) -> PolarsResult<LazyFrame>
+ Send
+ Sync
+ 'static,
{
ClosureSource {
deps,
f: Arc::new(f),
}
}
}
impl Source for ClosureSource {
fn frame(&self) -> Result<LazyFrame> {
let mut inputs = std::collections::HashMap::with_capacity(self.deps.len());
for (name, src) in &self.deps {
inputs.insert(name.clone(), src.frame()?);
}
Ok((self.f)(&inputs)?)
}
}
pub fn file_source(path: &str) -> Box<dyn Source> {
let ext = path.rsplit('.').next().unwrap_or("").to_lowercase();
match ext.as_str() {
"csv" | "tsv" | "txt" => Box::new(CsvSource::new(path)),
_ => Box::new(ParquetSource::new(path)),
}
}