use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
time::SystemTime,
};
use tracing::info;
#[cfg(feature = "python")]
use pyo3::prelude::*;
use crate::controller::context::ControllerCtx;
use crate::py_json_methods;
use super::{Dispatcher, Overflow, Row, fmt_time, header_columns};
#[derive(Serialize, Deserialize, Default, Debug)]
pub struct SimpleDataFrame {
channel_names: Vec<String>,
rows: Vec<Row>,
}
impl SimpleDataFrame {
pub fn new(channel_names: Vec<String>, capacity: usize) -> Self {
Self {
channel_names,
rows: Vec::with_capacity(capacity),
}
}
pub fn push(&mut self, row: Row) {
debug_assert!({
if !self.rows.is_empty() {
row.channel_values.len() == self.rows[0].channel_values.len()
} else {
true
}
});
self.rows.push(row);
}
pub fn put(&mut self, row: Row, loc: usize) {
debug_assert!({
if !self.rows.is_empty() {
row.channel_values.len() == self.rows[0].channel_values.len()
} else {
true
}
});
if loc >= self.len() {
self.push(row);
} else {
self.rows[loc] = row;
}
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn headers(&self) -> Vec<String> {
header_columns(&self.channel_names)
}
pub fn rows(&self) -> &[Row] {
&self.rows
}
}
#[derive(Serialize, Deserialize, Default)]
#[cfg_attr(feature = "python", pyclass)]
pub struct DataFrameDispatcher {
max_size_megabytes: usize,
overflow_behavior: Overflow,
#[serde(skip)]
nrows: usize,
#[serde(skip)]
row_index: usize,
#[serde(skip)]
df: Arc<RwLock<SimpleDataFrame>>,
}
#[cfg_attr(feature = "python", pyclass(from_py_object))]
#[derive(Clone, Default)]
pub struct DataFrameHandle {
df: Arc<RwLock<SimpleDataFrame>>,
}
impl DataFrameHandle {
fn new(df: Arc<RwLock<SimpleDataFrame>>) -> Self {
Self { df }
}
fn read(&self) -> Result<RwLockReadGuard<'_, SimpleDataFrame>, String> {
self.df
.read()
.map_err(|e| format!("Unable to lock dataframe: {e}"))
}
pub fn columns(&self) -> Result<HashMap<String, Vec<f64>>, String> {
let df = self.read()?;
let row_count = df.rows.len();
let channel_names = df.channel_names.clone();
let mut cols: Vec<Vec<f64>> = channel_names
.iter()
.map(|_| Vec::with_capacity(row_count))
.collect();
for row in df.rows.iter() {
for (col, val) in cols.iter_mut().zip(row.channel_values.iter()) {
col.push(*val);
}
}
Ok(channel_names
.into_iter()
.zip(cols)
.collect::<HashMap<_, _>>())
}
pub fn time(&self) -> Result<Vec<String>, String> {
let df = self.read()?;
let mut out = Vec::with_capacity(df.rows.len());
for row in df.rows.iter() {
out.push(row.system_time.clone());
}
Ok(out)
}
pub fn timestamp(&self) -> Result<Vec<i64>, String> {
let df = self.read()?;
let mut out = Vec::with_capacity(df.rows.len());
for row in df.rows.iter() {
out.push(row.timestamp);
}
Ok(out)
}
}
impl DataFrameDispatcher {
pub fn new(
max_size_megabytes: usize,
overflow_behavior: Overflow,
df: Option<Arc<RwLock<SimpleDataFrame>>>,
) -> (Box<Self>, Arc<RwLock<SimpleDataFrame>>) {
match overflow_behavior {
Overflow::Wrap => (),
x => unimplemented!("Overflow behavior {x:?} is not available for DataFrameDispatcher"),
}
let df = df.unwrap_or_default();
let df_handle = df.clone();
(
Box::new(Self {
max_size_megabytes,
overflow_behavior,
df,
..Default::default()
}),
df_handle,
)
}
fn write(&self) -> Result<RwLockWriteGuard<'_, SimpleDataFrame>, String> {
self.df
.try_write()
.map_err(|e| format!("Unable to lock dataframe: {e}"))
}
pub fn handle(&self) -> DataFrameHandle {
DataFrameHandle::new(self.df.clone())
}
}
py_json_methods!(
DataFrameDispatcher,
Dispatcher,
#[new]
fn py_new(max_size_megabytes: usize, overflow_behavior: Overflow) -> PyResult<Self> {
let (dispatcher, _df_handle) = Self::new(max_size_megabytes, overflow_behavior, None);
Ok(*dispatcher)
},
#[pyo3(name = "handle")]
fn py_handle(&self) -> DataFrameHandle {
self.handle()
}
);
#[cfg(feature = "python")]
#[pymethods]
impl DataFrameHandle {
#[pyo3(name = "columns")]
fn py_columns(&self) -> PyResult<HashMap<String, Vec<f64>>> {
self.columns()
.map_err(pyo3::exceptions::PyRuntimeError::new_err)
}
#[pyo3(name = "time")]
fn py_time(&self) -> PyResult<Vec<String>> {
self.time()
.map_err(pyo3::exceptions::PyRuntimeError::new_err)
}
#[pyo3(name = "timestamp")]
fn py_timestamp(&self) -> PyResult<Vec<i64>> {
self.timestamp()
.map_err(pyo3::exceptions::PyRuntimeError::new_err)
}
}
#[typetag::serde]
impl Dispatcher for DataFrameDispatcher {
fn init(
&mut self,
_ctx: &ControllerCtx,
channel_names: &[String],
_core_assignment: usize,
) -> Result<(), String> {
info!("Initializing dataframe dispatcher");
let channel_names = channel_names.to_vec();
self.row_index = 0;
let time_size = fmt_time(SystemTime::now()).len();
let row_size = time_size + (1 + channel_names.len()) * 8;
self.nrows = (self.max_size_megabytes * 1024 * 1024) / row_size;
*self.write()? = SimpleDataFrame::new(channel_names, self.nrows);
Ok(())
}
fn consume(
&mut self,
time: SystemTime,
timestamp: i64,
channel_values: Vec<f64>,
) -> Result<(), String> {
let i = self.row_index;
let row = Row {
system_time: fmt_time(time),
timestamp,
channel_values,
};
self.df
.write()
.map_err(|e| format!("Failed to write dataframe row: {e}"))?
.put(row, i);
self.row_index += 1;
if self.row_index > self.nrows {
match self.overflow_behavior {
Overflow::Wrap => self.row_index = 0,
Overflow::Error => return Err("DataFrame out of memory".to_string()),
x => unimplemented!(
"Overflow behavior {x:?} is not available for DataFrameDispatcher"
),
}
}
Ok(())
}
fn terminate(&mut self) -> Result<(), String> {
let (dispatcher, _df_handle) = Self::new(
self.max_size_megabytes,
self.overflow_behavior,
Some(self.df.clone()),
);
(*self) = *dispatcher;
Ok(())
}
}