use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use polars::prelude::LazyFrame;
pub use polars;
pub use taxa_core::{
ClosureSource, CsvSource, Dataset, Error, FrameSource, JoinSource, Loaded, ParquetSource,
Result, Source,
};
pub use taxa_sql::SqlSource;
type TransformFn = Arc<dyn Fn(&HashMap<String, LazyFrame>) -> LazyFrame + Send + Sync>;
pub struct App {
manifest: Dataset,
sources: HashMap<String, Arc<dyn Source>>,
transforms: HashMap<String, (Vec<String>, TransformFn)>,
direct: HashMap<String, Arc<dyn Source>>,
}
impl App {
pub fn new(manifest: Dataset) -> Self {
App {
manifest,
sources: HashMap::new(),
transforms: HashMap::new(),
direct: HashMap::new(),
}
}
pub fn from_manifest(path: impl AsRef<Path>) -> Result<Self> {
let p = path.as_ref();
let raw = std::fs::read_to_string(p)
.map_err(|e| Error::Engine(format!("read manifest {p:?}: {e}")))?;
let manifest: Dataset = serde_json::from_str(&raw)
.map_err(|e| Error::Schema(format!("manifest {p:?} is not a valid dataset: {e}")))?;
Ok(App::new(manifest))
}
pub fn source(mut self, name: impl Into<String>, src: impl Source + 'static) -> Self {
self.sources.insert(name.into(), Arc::new(src));
self
}
pub fn source_arc(mut self, name: impl Into<String>, src: Arc<dyn Source>) -> Self {
self.sources.insert(name.into(), src);
self
}
pub fn transform<F, I, S>(mut self, frame: impl Into<String>, deps: I, f: F) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
F: Fn(&HashMap<String, LazyFrame>) -> LazyFrame + Send + Sync + 'static,
{
let deps: Vec<String> = deps.into_iter().map(Into::into).collect();
self.transforms.insert(frame.into(), (deps, Arc::new(f)));
self
}
pub fn frame_source(mut self, frame: impl Into<String>, src: impl Source + 'static) -> Self {
self.direct.insert(frame.into(), Arc::new(src));
self
}
fn build_frame_sources(&self) -> Result<HashMap<String, Arc<dyn Source>>> {
let needed = taxa_core::loader::referenced_frames(&self.manifest)?;
let mut out: HashMap<String, Arc<dyn Source>> = HashMap::new();
for fname in &needed {
if out.contains_key(fname) {
continue;
}
let frame = self
.manifest
.frame(fname)
.ok_or_else(|| Error::Schema(format!("view frame {fname:?} not in `frames`")))?;
if frame.transform.is_some() {
return Err(Error::Schema(format!(
"frame {fname:?} carries a serialized `transform` plan-path, which the \
Rust facade does not run. Remove it and attach the transform in code \
via .transform({fname:?}, …), or serve the manifest with the `taxa` CLI."
)));
}
let src: Arc<dyn Source> = if let Some((deps, f)) = self.transforms.get(fname) {
let mut dep_map: HashMap<String, Arc<dyn Source>> = HashMap::new();
for d in deps {
let s = self.sources.get(d).cloned().ok_or_else(|| {
Error::Schema(format!(
"transform for frame {fname:?} needs source {d:?}, which was not \
registered via .source()"
))
})?;
dep_map.insert(d.clone(), s);
}
let user_f = f.clone();
Arc::new(ClosureSource::new(dep_map, move |inputs| {
Ok(user_f(inputs))
}))
} else if let Some(direct) = self.direct.get(fname) {
direct.clone()
} else if let Some(name) = frame.source.as_deref() {
self.sources.get(name).cloned().ok_or_else(|| {
Error::Schema(format!(
"frame {fname:?} source {name:?} was not registered — add \
.source({name:?}, …), .frame_source({fname:?}, …), or \
.transform({fname:?}, …)"
))
})?
} else {
return Err(Error::Schema(format!(
"frame {fname:?} has no source: attach .transform({fname:?}, …), \
.frame_source({fname:?}, …), or give it a `source` naming a registered source"
)));
};
out.insert(fname.clone(), src);
}
Ok(out)
}
pub fn build(&self) -> Result<Loaded> {
let frame_sources = self.build_frame_sources()?;
taxa_core::loader::load(&self.manifest, &frame_sources)
}
pub fn serve(self, host: &str, port: u16) -> Result<()> {
let loaded = self.build()?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|e| Error::Engine(format!("tokio runtime: {e}")))?;
rt.block_on(taxa_server::serve_ext(
loaded.main_ds,
loaded.main_backend,
loaded.series,
loaded.series_follows_treemap,
host,
port,
))
.map_err(|e| Error::Engine(format!("server error: {e}")))
}
}