#[path = "minimal_fixtures/mod.rs"]
mod minimal_fixtures;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::json;
use pondrs::datasets::{JsonDataset, MemoryDataset, Param, PolarsCsvDataset};
use pondrs::error::PondError;
use pondrs::{Node, Steps};
#[derive(Serialize, Deserialize)]
struct Catalog {
readings: PolarsCsvDataset,
summary: MemoryDataset<f64>,
report: JsonDataset,
}
#[derive(Serialize, Deserialize)]
struct Params {
threshold: Param<f64>,
}
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
},
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
},
)
}
fn main() -> Result<(), PondError> {
let dir = data_dir();
write_fixtures(&dir);
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_args(std::env::args_os())?
.dispatch(pipeline)
}
fn data_dir() -> std::path::PathBuf {
let manifest = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
manifest.join("examples").join("minimal_data")
}
fn write_fixtures(dir: &std::path::Path) {
std::fs::create_dir_all(dir).unwrap();
minimal_fixtures::write_readings_csv(dir);
minimal_fixtures::write_catalog_yml(dir);
std::fs::write(dir.join("params.yml"), "threshold: 0.5\n").unwrap();
}