use nadi_plugin::nadi_internal_plugin;
#[nadi_internal_plugin]
mod ts {
use crate::prelude::*;
use crate::timeseries::{CompleteSeries, Series, TimeLineInner, TimeSeries};
use abi_stable::external_types::RMutex;
use abi_stable::std_types::{
RArc,
ROption::{RNone, RSome},
RString,
};
use chrono::NaiveDate;
use nadi_plugin::{env_func, network_func, node_func};
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::Path;
#[env_func]
fn ts_timeline(ts: TimeSeries) -> Series {
Series::Complete(CompleteSeries::strings(
ts.timeline()
.lock()
.str_values()
.map(RString::from)
.collect::<Vec<RString>>(),
))
}
#[env_func(fmt = "%Y-%m-%d")]
fn timeseries(timeline: &[RString], series: Series, fmt: &str) -> anyhow::Result<TimeSeries> {
if timeline.len() < 2 {
return Err(anyhow::Error::msg("Timeline should be longer than 2"));
}
let start = NaiveDate::parse_from_str(timeline[0].as_str(), fmt)?;
let end = NaiveDate::parse_from_str(timeline[timeline.len() - 1].as_str(), fmt)?;
let st = start.to_epoch_days() * 24 * 60 * 60 * 100;
let en = end.to_epoch_days() * 24 * 60 * 60 * 100;
let step = (en - st) as i64 / (timeline.len() - 1) as i64;
let tl = RArc::new(RMutex::new(TimeLineInner::new(
st as i64,
en as i64,
step,
true,
timeline.to_vec(),
fmt,
)));
Ok(TimeSeries::new(tl, series))
}
#[node_func]
fn ts_count(node: &NodeInner) -> usize {
node.ts_map().len()
}
#[node_func]
fn ts_list(node: &NodeInner) -> Vec<String> {
node.ts_map().keys().map(|s| s.to_string()).collect()
}
#[node_func]
fn ts_delete(
node: &mut NodeInner,
name: &str,
) -> bool {
node.del_ts(name).is_some()
}
#[node_func(safe = false)]
fn ts_dtype(
node: &NodeInner,
name: &str,
safe: bool,
) -> Result<Option<String>, String> {
match node.try_ts(name) {
Ok(s) => Ok(Some(s.values_type().to_string())),
Err(_) if safe => Ok(None),
Err(e) => Err(e),
}
}
#[node_func(safe = false, valid = false)]
fn ts_len(
node: &NodeInner,
name: &str,
safe: bool,
valid: bool,
) -> Result<Option<usize>, String> {
match node.try_ts(name) {
Ok(s) => {
if valid {
Ok(Some(s.series().len_valid()))
} else {
Ok(Some(s.series().len()))
}
}
Err(_) if safe => Ok(None),
Err(e) => Err(e),
}
}
#[node_func]
fn ts_complete(
node: &mut NodeInner,
name: &str,
) -> Result<(), String> {
match node.ts_map_mut().remove(name) {
RSome(s) => {
node.ts_map_mut().insert(name.into(), s.maybe_complete());
Ok(())
}
RNone => Err(format!("Timeseris {name} doesn't exist")),
}
}
#[node_func(header = true, missing = "")]
fn ts_print(
node: &NodeInner,
name: &str,
header: bool,
head: Option<i64>,
missing: String,
) -> Result<(), RString> {
if let Some(ts) = node.ts(name) {
let values = ts.str_values(&missing);
if header {
println!("time,{name}");
}
let head = head.map(|h| h as usize).unwrap_or_else(|| ts.len());
for (t, v) in ts.timeline().lock().str_values().zip(values).take(head) {
println!("{t},{v}");
}
println!();
} else {
return Err(format!(
"Timeseries `{}` is not available in node `{}`",
name,
node.name()
)
.into());
}
Ok(())
}
#[network_func(missing = "")]
fn ts_print_csv(
net: &Network,
name: &str,
head: Option<usize>,
nodes: Option<HashSet<String>>,
missing: &str,
) -> anyhow::Result<()> {
let mut ts_nodes = vec![];
let mut values = vec![];
let mut timeline = None;
for node in net.nodes() {
let node = node.lock();
if let Some(ref node_list) = nodes {
if !node_list.contains(node.name()) {
continue;
}
}
if let Some(ts) = node.ts(&name) {
if let Some(tl) = &timeline {
if !ts.is_timeline(tl) {
return Err(anyhow::Error::msg("Different Timelines"));
}
} else {
timeline = Some(ts.timeline().clone());
}
ts_nodes.push(node.name().to_string());
values.push(ts.str_values(&missing).collect::<Vec<String>>());
}
}
if let Some(tl) = timeline {
let tl = tl.lock();
let head = head.unwrap_or(tl.str_values().count());
println!("datetime,{}", ts_nodes.join(","));
for (i, t) in tl.str_values().enumerate() {
if i >= head {
break;
}
let row: Vec<&str> = values.iter().map(|v| v[i].as_str()).collect();
println!("{t},{}", row.join(","));
}
}
Ok(())
}
#[network_func(missing = "")]
fn series_csv(
net: &Network,
filter: Vec<bool>,
outfile: &Path,
attrs: Vec<String>,
series: Vec<String>,
missing: &str,
) -> anyhow::Result<()> {
let f = File::create(&outfile)?;
let mut w = BufWriter::new(f);
let middle = !attrs.is_empty() && !series.is_empty();
writeln!(
w,
"{}{}{}",
attrs.join(","),
if middle { "," } else { "" },
series.join(",")
)?;
for (node, _) in net.nodes().zip(filter).filter(|(_, f)| *f) {
let node = node.lock();
let attrs: Vec<String> = attrs
.iter()
.map(|a| node.attr(a).map(|a| a.to_string()).unwrap_or_default())
.collect();
let series: Vec<Vec<String>> = series
.iter()
.map(|a| {
node.series(a)
.map(|s| s.str_values(&missing).collect())
.unwrap_or_default()
})
.collect();
let lengths: Vec<usize> = series.iter().map(|s| s.len()).collect();
if lengths.is_empty() {
writeln!(w, "{}", attrs.join(","))?;
continue;
} else if lengths.iter().any(|l| *l != lengths[0]) {
return Err(anyhow::Error::msg(format!(
"Node {}: Series lengths don't match: {lengths:?}",
node.name()
)));
}
for i in 0..lengths[0] {
let values: Vec<&str> = series.iter().map(|s| s[i].as_str()).collect();
writeln!(
w,
"{}{}{}",
attrs.join(","),
if middle { "," } else { "" },
values.join(",")
)?;
}
}
Ok(())
}
}