use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::time::SystemTime;
mod tsdb;
pub use tsdb::TimescaleDbDispatcher;
mod df;
pub use df::{DataFrameDispatcher, DataFrameHandle};
mod latest;
pub use latest::{LatestValueDispatcher, LatestValueHandle, RowCell};
mod channel_filter;
pub use channel_filter::ChannelFilter;
mod decimation;
pub use decimation::DecimationDispatcher;
mod low_pass;
pub use low_pass::LowPassDispatcher;
mod csv;
pub use csv::CsvDispatcher;
mod reporting;
pub use reporting::{ReportingDispatcher, ReportingMessage};
use crate::controller::context::ControllerCtx;
#[cfg(feature = "python")]
use pyo3::prelude::*;
#[cfg_attr(feature = "python", pyclass(from_py_object))]
#[derive(Serialize, Deserialize, Default, Clone, Copy, Debug)]
pub enum Overflow {
#[default]
Wrap,
NewFile,
Error,
}
#[cfg(feature = "python")]
#[pymethods]
impl Overflow {
#[staticmethod]
pub fn wrap() -> Self {
Self::Wrap
}
#[staticmethod]
pub fn new_file() -> Self {
Self::NewFile
}
#[staticmethod]
pub fn error() -> Self {
Self::Error
}
}
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct Row {
pub system_time: String,
pub timestamp: i64,
pub channel_values: Vec<f64>,
}
#[typetag::serde(tag = "type")]
pub trait Dispatcher: Send + Sync {
fn init(
&mut self,
ctx: &ControllerCtx,
channel_names: &[String],
core_assignment: usize,
) -> Result<(), String>;
fn consume(
&mut self,
time: SystemTime,
timestamp: i64,
channel_values: Vec<f64>,
) -> Result<(), String>;
fn terminate(&mut self) -> Result<(), String>;
}
pub fn header_columns(channel_names: &[String]) -> Vec<String> {
let mut out = vec!["timestamp".to_owned(), "time".to_owned()];
out.extend(channel_names.iter().cloned());
out
}
pub fn csv_header(channel_names: &[String]) -> String {
let mut header_string = header_columns(channel_names).join(",");
header_string.push('\n');
header_string
}
pub(crate) fn resource_name_with_suffix(op_name: &str, suffix: Option<&str>) -> String {
match suffix {
Some(suffix) if !suffix.is_empty() => format!("{op_name}_{suffix}"),
_ => op_name.to_owned(),
}
}
pub fn fmt_time(time: SystemTime) -> String {
DateTime::<Utc>::from(time).to_rfc3339_opts(chrono::SecondsFormat::Nanos, true)
}
pub fn csv_row_fixed_width(stringbuf: &mut String, vals: (SystemTime, i64, &[f64])) {
stringbuf.clear();
let (time, timestamp, channel_values) = vals;
let t_iso8601 = fmt_time(time);
let timestamp_fixed_width = fmt_i64(timestamp);
stringbuf.extend(format!("{timestamp_fixed_width},{t_iso8601}").chars());
for c in channel_values {
stringbuf.push(',');
stringbuf.push_str(&fmt_f64(*c));
}
stringbuf.push('\n');
}
pub fn csv_row(stringbuf: &mut String, vals: (SystemTime, i64, &[f64])) {
stringbuf.clear();
let (time, timestamp, channel_values) = vals;
let t_iso8601 = fmt_time(time);
stringbuf.extend(format!("{timestamp},{t_iso8601}").chars());
for c in channel_values {
stringbuf.push_str(&format!(",{}", *c));
}
stringbuf.push('\n');
}
#[allow(clippy::manual_strip)]
pub fn fmt_f64(num: f64) -> String {
let precision = 17;
let exp_pad = 3;
let width = precision + exp_pad + 5;
let prefix = match num {
x if x.is_sign_positive() => "+",
_ => "",
};
let mut numstr = format!("{prefix}{:.precision$e}", num, precision = precision);
if !num.is_finite() {
return format!("{:>width$}", numstr, width = width);
}
let exp = numstr.split_off(numstr.find('e').unwrap());
let (sign, exp) = if exp.starts_with("e-") {
('-', &exp[2..])
} else {
('+', &exp[1..])
};
numstr.push_str(&format!("e{}{:0>pad$}", sign, exp, pad = exp_pad));
format!("{:>width$}", numstr, width = width)
}
pub fn fmt_i64(num: i64) -> String {
format!("{num:+020}")
}
#[cfg(test)]
mod tests {
use super::{fmt_f64, fmt_i64, resource_name_with_suffix};
#[test]
fn fmt_f64_has_consistent_width() {
let values = [
f64::INFINITY,
f64::NEG_INFINITY,
f64::NAN,
0.0,
-0.0,
1.0,
-1.0,
10.0,
-10.0,
f64::MIN,
f64::MAX,
];
let expected_len = fmt_f64(values[0]).len();
for value in values {
let formatted = fmt_f64(value);
assert_eq!(
formatted.len(),
expected_len,
"length of `{value}` -> `{formatted}` should be {expected_len} but is {}",
formatted.len()
);
let parsed: f64 = formatted
.trim()
.parse()
.unwrap_or_else(|_| panic!("Failed to parse `{formatted}` to `{value}`"));
if !value.is_nan() {
assert_eq!(
value, parsed,
"{value} was serialized as `{formatted}` and parsed as `{parsed}`"
);
} else {
assert!(parsed.is_nan(), "Failed to parse NaN value as NaN");
}
}
}
#[test]
fn fmt_i64_has_consistent_width() {
let values = [0, i64::MIN, i64::MAX, -1, 1, -10, 10];
let expected_len = fmt_i64(values[0]).len();
for value in values {
let formatted = fmt_i64(value);
assert_eq!(
formatted.len(),
expected_len,
"length of `{value}` -> `{formatted}` should be {expected_len} but is {}",
formatted.len()
);
let parsed: i64 = formatted
.parse()
.unwrap_or_else(|_| panic!("Failed to parse `{formatted}` to `{value}`"));
assert_eq!(
value, parsed,
"{value} was serialized as {formatted} and parsed as {parsed}"
);
}
}
#[test]
fn resource_name_suffix_is_appended_with_separator() {
assert_eq!(resource_name_with_suffix("demo", Some("fast")), "demo_fast");
assert_eq!(resource_name_with_suffix("demo", None), "demo");
assert_eq!(resource_name_with_suffix("demo", Some("")), "demo");
}
}