#[cfg(feature = "polars-integration")]
use polars::prelude::*;
use crate::core::{TimeSeries, VisibilityGraph};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub enum PolarsError {
ColumnNotFound(String),
WrongDataType(String),
LengthMismatch,
PolarsError(String),
EmptyDataFrame,
}
impl std::fmt::Display for PolarsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PolarsError::ColumnNotFound(col) => write!(f, "Column not found: {}", col),
PolarsError::WrongDataType(msg) => write!(f, "Wrong data type: {}", msg),
PolarsError::LengthMismatch => write!(f, "Column lengths do not match"),
PolarsError::PolarsError(msg) => write!(f, "Polars error: {}", msg),
PolarsError::EmptyDataFrame => write!(f, "DataFrame is empty"),
}
}
}
impl std::error::Error for PolarsError {}
#[cfg(feature = "polars-integration")]
impl From<PolarsError> for polars::error::PolarsError {
fn from(err: PolarsError) -> Self {
polars::error::PolarsError::ComputeError(err.to_string().into())
}
}
#[cfg(feature = "polars-integration")]
pub trait TimeSeriesPolarsExt<T> {
fn from_polars_df(
df: &DataFrame,
time_col: &str,
value_col: &str,
) -> Result<Self, PolarsError>
where
Self: Sized;
fn to_polars_df(&self) -> Result<DataFrame, PolarsError>;
}
#[cfg(feature = "polars-integration")]
impl TimeSeriesPolarsExt<f64> for TimeSeries<f64> {
fn from_polars_df(
df: &DataFrame,
time_col: &str,
value_col: &str,
) -> Result<Self, PolarsError> {
let time_series = df
.column(time_col)
.map_err(|_| PolarsError::ColumnNotFound(time_col.to_string()))?;
let times: Vec<f64> = time_series
.f64()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be f64", time_col)))?
.into_iter()
.filter_map(|v| v)
.collect();
let value_series = df
.column(value_col)
.map_err(|_| PolarsError::ColumnNotFound(value_col.to_string()))?;
let values: Vec<Option<f64>> = value_series
.f64()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be f64", value_col)))?
.into_iter()
.collect();
if times.is_empty() || values.is_empty() {
return Err(PolarsError::EmptyDataFrame);
}
if times.len() != values.len() {
return Err(PolarsError::LengthMismatch);
}
TimeSeries::new(times, values)
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
fn to_polars_df(&self) -> Result<DataFrame, PolarsError> {
let times = &self.timestamps;
let values: Vec<Option<f64>> = self.values.to_vec();
DataFrame::new(vec![
Series::new("time".into(), times).into(),
Series::new("value".into(), values).into(),
])
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
}
#[cfg(feature = "polars-integration")]
impl TimeSeriesPolarsExt<f32> for TimeSeries<f32> {
fn from_polars_df(
df: &DataFrame,
time_col: &str,
value_col: &str,
) -> Result<Self, PolarsError> {
let time_series = df
.column(time_col)
.map_err(|_| PolarsError::ColumnNotFound(time_col.to_string()))?;
let times: Vec<f32> = time_series
.f64()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be numeric", time_col)))?
.into_iter()
.filter_map(|v| v.map(|x| x as f32))
.collect();
let value_series = df
.column(value_col)
.map_err(|_| PolarsError::ColumnNotFound(value_col.to_string()))?;
let values: Vec<Option<f32>> = value_series
.f64()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be numeric", value_col)))?
.into_iter()
.map(|v| v.map(|x| x as f32))
.collect();
if times.is_empty() || values.is_empty() {
return Err(PolarsError::EmptyDataFrame);
}
if times.len() != values.len() {
return Err(PolarsError::LengthMismatch);
}
TimeSeries::new(times, values)
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
fn to_polars_df(&self) -> Result<DataFrame, PolarsError> {
let times: Vec<f64> = self.timestamps.iter().map(|&x| x as f64).collect();
let values: Vec<Option<f64>> = self.values.iter().map(|&x| x.map(|v| v as f64)).collect();
DataFrame::new(vec![
Series::new("time".into(), times).into(),
Series::new("value".into(), values).into(),
])
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
}
#[cfg(feature = "polars-integration")]
pub trait VisibilityGraphPolarsExt<T> {
fn to_polars_df(&self) -> Result<DataFrame, PolarsError>;
fn edges_to_polars_df(&self) -> Result<DataFrame, PolarsError>;
}
#[cfg(feature = "polars-integration")]
impl<T> VisibilityGraphPolarsExt<T> for VisibilityGraph<T>
where
T: Copy + std::fmt::Debug,
{
fn to_polars_df(&self) -> Result<DataFrame, PolarsError> {
let node_ids: Vec<u32> = (0..self.node_count as u32).collect();
let degrees = self.degree_sequence();
let degrees_u32: Vec<u32> = degrees.iter().map(|&d| d as u32).collect();
DataFrame::new(vec![
Series::new("node_id".into(), node_ids).into(),
Series::new("degree".into(), degrees_u32).into(),
])
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
fn edges_to_polars_df(&self) -> Result<DataFrame, PolarsError> {
let edges = self.edges();
let mut sources = Vec::with_capacity(edges.len());
let mut targets = Vec::with_capacity(edges.len());
let mut weights = Vec::with_capacity(edges.len());
for (&(src, tgt), &weight) in edges {
sources.push(src as u32);
targets.push(tgt as u32);
weights.push(weight);
}
DataFrame::new(vec![
Series::new("source".into(), sources).into(),
Series::new("target".into(), targets).into(),
Series::new("weight".into(), weights).into(),
])
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
}
#[cfg(feature = "polars-integration")]
pub struct BatchProcessor {
series_map: HashMap<String, TimeSeries<f64>>,
}
#[cfg(feature = "polars-integration")]
impl BatchProcessor {
pub fn from_polars_df(
df: &DataFrame,
time_col: &str,
value_col: &str,
group_col: &str,
) -> Result<Self, PolarsError> {
let mut series_map: HashMap<String, (Vec<f64>, Vec<Option<f64>>)> = HashMap::new();
let group_series = df
.column(group_col)
.map_err(|_| PolarsError::ColumnNotFound(group_col.to_string()))?;
let time_series = df
.column(time_col)
.map_err(|_| PolarsError::ColumnNotFound(time_col.to_string()))?;
let value_series = df
.column(value_col)
.map_err(|_| PolarsError::ColumnNotFound(value_col.to_string()))?;
let groups = group_series
.str()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be string", group_col)))?;
let times = time_series
.f64()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be f64", time_col)))?;
let values = value_series
.f64()
.map_err(|_| PolarsError::WrongDataType(format!("{} must be f64", value_col)))?;
for i in 0..df.height() {
let group_key = groups
.get(i)
.ok_or(PolarsError::EmptyDataFrame)?
.to_string();
let time = times
.get(i)
.ok_or(PolarsError::EmptyDataFrame)?;
let value = values.get(i);
let entry = series_map
.entry(group_key)
.or_insert_with(|| (Vec::new(), Vec::new()));
entry.0.push(time);
entry.1.push(value);
}
let mut result_map = HashMap::new();
for (key, (times, values)) in series_map {
let ts = TimeSeries::new(times, values)
.map_err(|e| PolarsError::PolarsError(e.to_string()))?;
result_map.insert(key, ts);
}
Ok(BatchProcessor { series_map: result_map })
}
pub fn process_natural(self) -> Result<BatchResults, PolarsError> {
let mut results = HashMap::new();
for (key, series) in self.series_map {
let graph = VisibilityGraph::from_series(&series)
.natural_visibility()
.map_err(|e| PolarsError::PolarsError(e.to_string()))?;
results.insert(key, graph);
}
Ok(BatchResults { results })
}
pub fn process_horizontal(self) -> Result<BatchResults, PolarsError> {
let mut results = HashMap::new();
for (key, series) in self.series_map {
let graph = VisibilityGraph::from_series(&series)
.horizontal_visibility()
.map_err(|e| PolarsError::PolarsError(e.to_string()))?;
results.insert(key, graph);
}
Ok(BatchResults { results })
}
}
#[cfg(feature = "polars-integration")]
pub struct BatchResults {
results: HashMap<String, VisibilityGraph<f64>>,
}
#[cfg(feature = "polars-integration")]
impl BatchResults {
pub fn get(&self, key: &str) -> Option<&VisibilityGraph<f64>> {
self.results.get(key)
}
pub fn to_polars_df(&self) -> Result<DataFrame, PolarsError> {
let mut all_groups = Vec::new();
let mut all_node_ids = Vec::new();
let mut all_degrees = Vec::new();
for (group, graph) in &self.results {
let degrees = graph.degree_sequence();
for (node_id, °ree) in degrees.iter().enumerate() {
all_groups.push(group.clone());
all_node_ids.push(node_id as u32);
all_degrees.push(degree as u32);
}
}
DataFrame::new(vec![
Series::new("group".into(), all_groups).into(),
Series::new("node_id".into(), all_node_ids).into(),
Series::new("degree".into(), all_degrees).into(),
])
.map_err(|e| PolarsError::PolarsError(e.to_string()))
}
pub fn keys(&self) -> Vec<&String> {
self.results.keys().collect()
}
pub fn len(&self) -> usize {
self.results.len()
}
pub fn is_empty(&self) -> bool {
self.results.is_empty()
}
}
#[cfg(all(test, feature = "polars-integration"))]
mod tests {
use super::*;
#[test]
fn test_timeseries_from_polars() {
let df = df! {
"time" => &[0.0, 1.0, 2.0, 3.0],
"value" => &[1.0, 3.0, 2.0, 4.0],
}
.unwrap();
let series = TimeSeries::<f64>::from_polars_df(&df, "time", "value").unwrap();
assert_eq!(series.len(), 4);
}
#[test]
fn test_timeseries_to_polars() {
let series = TimeSeries::from_raw(vec![1.0, 3.0, 2.0, 4.0]).unwrap();
let df = series.to_polars_df().unwrap();
assert_eq!(df.shape(), (4, 2));
assert!(df.column("time").is_ok());
assert!(df.column("value").is_ok());
}
#[test]
fn test_graph_to_polars() {
let series = TimeSeries::from_raw(vec![1.0, 3.0, 2.0, 4.0]).unwrap();
let graph = VisibilityGraph::from_series(&series)
.natural_visibility()
.unwrap();
let df = graph.to_polars_df().unwrap();
assert_eq!(df.shape().0, 4); }
#[test]
fn test_edges_to_polars() {
let series = TimeSeries::from_raw(vec![1.0, 3.0, 2.0]).unwrap();
let graph = VisibilityGraph::from_series(&series)
.natural_visibility()
.unwrap();
let df = graph.edges_to_polars_df().unwrap();
assert!(df.column("source").is_ok());
assert!(df.column("target").is_ok());
assert!(df.column("weight").is_ok());
}
}