#[cfg(feature = "python-bindings")]
use pyo3::prelude::*;
#[cfg(feature = "python-bindings")]
use pyo3::types::PyModule;
#[cfg(feature = "python-bindings")]
use numpy::{PyArray1, PyArray2, PyArrayMethods};
#[cfg(feature = "python-bindings")]
use crate::{
TimeSeries as RustTimeSeries,
VisibilityGraph as RustVisibilityGraph,
FeatureSet as RustFeatureSet,
BuiltinFeature as RustBuiltinFeature,
MissingDataStrategy as RustMissingDataStrategy,
};
#[cfg(feature = "python-bindings")]
#[pyclass(name = "TimeSeries")]
pub struct PyTimeSeries {
inner: RustTimeSeries<f64>,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyTimeSeries {
#[new]
fn new(values: Vec<f64>) -> PyResult<Self> {
let inner = RustTimeSeries::from_raw(values)
.map_err(py_error_helpers::to_value_error)?;
Ok(Self { inner })
}
fn __len__(&self) -> usize {
self.inner.len()
}
fn natural_visibility(&self) -> PyResult<PyVisibilityGraph> {
let graph = RustVisibilityGraph::from_series(&self.inner)
.natural_visibility()
.map_err(py_error_helpers::to_runtime_error)?;
Ok(PyVisibilityGraph { inner: graph })
}
fn horizontal_visibility(&self) -> PyResult<PyVisibilityGraph> {
let graph = RustVisibilityGraph::from_series(&self.inner)
.horizontal_visibility()
.map_err(py_error_helpers::to_runtime_error)?;
Ok(PyVisibilityGraph { inner: graph })
}
fn natural_visibility_with_features(&self, mut features: PyRefMut<'_, PyFeatureSet>) -> PyResult<PyVisibilityGraph> {
let feature_set = std::mem::replace(&mut features.inner, RustFeatureSet::new());
let graph = RustVisibilityGraph::from_series(&self.inner)
.with_features(feature_set)
.natural_visibility()
.map_err(py_error_helpers::to_runtime_error)?;
Ok(PyVisibilityGraph { inner: graph })
}
fn horizontal_visibility_with_features(&self, mut features: PyRefMut<'_, PyFeatureSet>) -> PyResult<PyVisibilityGraph> {
let feature_set = std::mem::replace(&mut features.inner, RustFeatureSet::new());
let graph = RustVisibilityGraph::from_series(&self.inner)
.with_features(feature_set)
.horizontal_visibility()
.map_err(py_error_helpers::to_runtime_error)?;
Ok(PyVisibilityGraph { inner: graph })
}
fn to_numpy<'py>(&self, py: Python<'py>) -> Bound<'py, PyArray1<f64>> {
let values: Vec<f64> = self.inner.values.iter().filter_map(|&v| v).collect();
PyArray1::from_vec(py, values)
}
fn handle_missing(&self, strategy: &PyMissingDataStrategy) -> PyResult<Self> {
let cleaned = self.inner.handle_missing(strategy.inner.clone())
.map_err(py_error_helpers::to_runtime_error)?;
Ok(Self { inner: cleaned })
}
#[staticmethod]
fn with_missing(timestamps: Vec<f64>, values: Vec<Option<f64>>) -> PyResult<Self> {
let inner = RustTimeSeries::new(timestamps, values)
.map_err(py_error_helpers::to_value_error)?;
Ok(Self { inner })
}
}
#[cfg(feature = "python-bindings")]
#[pyclass(name = "MissingDataStrategy")]
#[derive(Clone)]
pub struct PyMissingDataStrategy {
inner: RustMissingDataStrategy,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyMissingDataStrategy {
#[staticmethod]
fn linear_interpolation() -> Self {
Self {
inner: RustMissingDataStrategy::LinearInterpolation,
}
}
#[staticmethod]
fn forward_fill() -> Self {
Self {
inner: RustMissingDataStrategy::ForwardFill,
}
}
#[staticmethod]
fn backward_fill() -> Self {
Self {
inner: RustMissingDataStrategy::BackwardFill,
}
}
#[staticmethod]
fn nearest_neighbor() -> Self {
Self {
inner: RustMissingDataStrategy::NearestNeighbor,
}
}
#[staticmethod]
fn mean_imputation(window_size: usize) -> Self {
Self {
inner: RustMissingDataStrategy::MeanImputation { window_size },
}
}
#[staticmethod]
fn median_imputation(window_size: usize) -> Self {
Self {
inner: RustMissingDataStrategy::MedianImputation { window_size },
}
}
#[staticmethod]
fn zero_fill() -> Self {
Self {
inner: RustMissingDataStrategy::ZeroFill,
}
}
#[staticmethod]
fn drop() -> Self {
Self {
inner: RustMissingDataStrategy::Drop,
}
}
fn with_fallback(&self, fallback: &PyMissingDataStrategy) -> Self {
Self {
inner: self.inner.clone().with_fallback(fallback.inner.clone()),
}
}
fn __repr__(&self) -> String {
format!("MissingDataStrategy({:?})", self.inner)
}
}
#[cfg(feature = "python-bindings")]
#[pyclass(name = "VisibilityGraph")]
pub struct PyVisibilityGraph {
inner: RustVisibilityGraph<f64>,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyVisibilityGraph {
fn node_count(&self) -> usize {
self.inner.node_count
}
fn edge_count(&self) -> usize {
self.inner.edges().len()
}
fn density(&self) -> f64 {
self.inner.density()
}
fn clustering_coefficient(&self) -> f64 {
self.inner.average_clustering_coefficient()
}
fn diameter(&self) -> usize {
self.inner.diameter()
}
fn degree_sequence(&self) -> Vec<usize> {
self.inner.degree_sequence()
}
fn edges(&self) -> Vec<(usize, usize, f64)> {
self.inner.edges().iter()
.map(|(&(src, dst), &weight)| (src, dst, weight))
.collect()
}
fn adjacency_matrix<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyArray2<f64>>> {
let matrix = self.inner.to_adjacency_matrix();
let n = self.inner.node_count;
let flat: Vec<f64> = matrix.into_iter().flatten().collect();
let array = PyArray1::from_vec(py, flat);
array.reshape([n, n])
}
fn to_json(&self) -> String {
self.inner.to_json(crate::io::ExportOptions::default())
}
fn to_edge_list_csv(&self, include_weights: bool) -> String {
self.inner.to_edge_list_csv(include_weights)
}
fn to_adjacency_csv(&self) -> String {
self.inner.to_adjacency_matrix_csv()
}
fn to_features_csv(&self) -> String {
self.inner.features_to_csv()
}
fn to_dot(&self) -> String {
self.inner.to_dot()
}
fn to_graphml(&self) -> String {
self.inner.to_graphml()
}
fn save_edge_list_csv(&self, path: &str, include_weights: bool) -> PyResult<()> {
use std::fs::File;
use std::io::Write;
let csv = self.inner.to_edge_list_csv(include_weights);
let mut file = File::create(path)
.map_err(|e| py_error_helpers::to_runtime_error(e))?;
file.write_all(csv.as_bytes())
.map_err(|e| py_error_helpers::to_runtime_error(e))
}
fn save_adjacency_csv(&self, path: &str) -> PyResult<()> {
use std::fs::File;
use std::io::Write;
let csv = self.inner.to_adjacency_matrix_csv();
let mut file = File::create(path)
.map_err(|e| py_error_helpers::to_runtime_error(e))?;
file.write_all(csv.as_bytes())
.map_err(|e| py_error_helpers::to_runtime_error(e))
}
fn save_dot(&self, path: &str) -> PyResult<()> {
use std::fs::File;
use std::io::Write;
let dot = self.inner.to_dot();
let mut file = File::create(path)
.map_err(|e| py_error_helpers::to_runtime_error(e))?;
file.write_all(dot.as_bytes())
.map_err(|e| py_error_helpers::to_runtime_error(e))
}
fn save_graphml(&self, path: &str) -> PyResult<()> {
use std::fs::File;
use std::io::Write;
let graphml = self.inner.to_graphml();
let mut file = File::create(path)
.map_err(|e| py_error_helpers::to_runtime_error(e))?;
file.write_all(graphml.as_bytes())
.map_err(|e| py_error_helpers::to_runtime_error(e))
}
fn get_node_feature_names(&self, node: usize) -> PyResult<Vec<String>> {
if node >= self.inner.node_count {
return Err(py_error_helpers::to_value_error(
format!("Node {} out of range (graph has {} nodes)", node, self.inner.node_count)
));
}
if node >= self.inner.node_features.len() {
return Err(py_error_helpers::to_value_error(
"No features computed. Use natural_visibility_with_features() or horizontal_visibility_with_features()"
));
}
let features = &self.inner.node_features[node];
Ok(features.keys().cloned().collect())
}
fn get_node_features(&self, node: usize) -> PyResult<std::collections::HashMap<String, f64>> {
if node >= self.inner.node_count {
return Err(py_error_helpers::to_value_error(
format!("Node {} out of range (graph has {} nodes)", node, self.inner.node_count)
));
}
if node >= self.inner.node_features.len() {
return Err(py_error_helpers::to_value_error(
"No features computed. Use natural_visibility_with_features() or horizontal_visibility_with_features()"
));
}
Ok(self.inner.node_features[node].clone())
}
fn get_all_features<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyArray2<f64>>> {
if self.inner.node_features.is_empty() {
return Err(py_error_helpers::to_value_error(
"No features computed. Use natural_visibility_with_features() or horizontal_visibility_with_features()"
));
}
let n = self.inner.node_count;
let feature_names: Vec<String> = self.inner.node_features[0].keys().cloned().collect();
let feature_count = feature_names.len();
let mut flat = Vec::with_capacity(n * feature_count);
for i in 0..n {
for name in &feature_names {
let value = self.inner.node_features.get(i)
.and_then(|f| f.get(name))
.copied()
.unwrap_or(0.0);
flat.push(value);
}
}
let array = PyArray1::from_vec(py, flat);
array.reshape([n, feature_count])
}
fn has_features(&self) -> bool {
!self.inner.node_features.is_empty()
}
fn feature_count(&self) -> usize {
self.inner.node_features.first()
.map(|f| f.len())
.unwrap_or(0)
}
fn betweenness_centrality(&self, node: usize) -> PyResult<f64> {
self.inner.betweenness_centrality(node)
.ok_or_else(|| py_error_helpers::to_value_error(
format!("Node {} not found in graph", node)
))
}
fn detect_communities(&self) -> Vec<usize> {
self.inner.detect_communities().node_communities
}
fn shortest_path_length(&self, source: usize, target: usize) -> Option<usize> {
self.inner.shortest_path_length(source, target)
}
fn average_path_length(&self) -> f64 {
self.inner.average_path_length()
}
fn radius(&self) -> usize {
self.inner.radius()
}
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
fn count_components(&self) -> usize {
self.inner.count_components()
}
fn largest_component_size(&self) -> usize {
self.inner.largest_component_size()
}
fn assortativity(&self) -> f64 {
self.inner.assortativity()
}
fn degree_variance(&self) -> f64 {
self.inner.degree_variance()
}
fn degree_std_dev(&self) -> f64 {
self.inner.degree_std_dev()
}
fn degree_distribution(&self) -> std::collections::HashMap<usize, usize> {
self.inner.degree_distribution()
}
fn degree_entropy(&self) -> f64 {
self.inner.degree_entropy()
}
fn node_clustering_coefficient(&self, node: usize) -> Option<f64> {
self.inner.clustering_coefficient(node)
}
fn global_clustering_coefficient(&self) -> f64 {
self.inner.global_clustering_coefficient()
}
fn betweenness_centrality_all(&self) -> Vec<f64> {
self.inner.betweenness_centrality_all()
}
fn degree_centrality(&self) -> Vec<f64> {
self.inner.degree_centrality()
}
fn compute_statistics(&self) -> PyGraphStatistics {
let stats = self.inner.compute_statistics();
PyGraphStatistics { inner: stats }
}
fn detect_motifs(&self) -> PyMotifCounts {
let motifs = self.inner.detect_3node_motifs();
PyMotifCounts { inner: motifs }
}
fn neighbors(&self, node: usize) -> Option<Vec<f64>> {
self.inner.neighbors(node).map(|n| n.to_vec())
}
fn has_edge(&self, from_node: usize, to_node: usize) -> bool {
self.inner.has_edge(from_node, to_node)
}
fn neighbor_indices(&self, node: usize) -> Vec<usize> {
self.inner.neighbor_indices(node)
}
fn degree(&self, node: usize) -> Option<usize> {
self.inner.degree(node)
}
fn node_features(&self, node: usize) -> Option<std::collections::HashMap<String, f64>> {
self.inner.node_features(node).map(|f| f.clone())
}
fn is_directed(&self) -> bool {
self.inner.directed
}
fn __repr__(&self) -> String {
format!(
"VisibilityGraph(nodes={}, edges={}, density={:.4})",
self.inner.node_count,
self.inner.edges().len(),
self.inner.density()
)
}
}
#[cfg(feature = "python-bindings")]
#[pyclass(name = "GraphStatistics")]
pub struct PyGraphStatistics {
inner: crate::analysis::statistics::GraphStatistics,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyGraphStatistics {
#[getter]
fn node_count(&self) -> usize {
self.inner.node_count
}
#[getter]
fn edge_count(&self) -> usize {
self.inner.edge_count
}
#[getter]
fn is_directed(&self) -> bool {
self.inner.is_directed
}
#[getter]
fn average_degree(&self) -> f64 {
self.inner.average_degree
}
#[getter]
fn min_degree(&self) -> usize {
self.inner.min_degree
}
#[getter]
fn max_degree(&self) -> usize {
self.inner.max_degree
}
#[getter]
fn degree_std_dev(&self) -> f64 {
self.inner.degree_std_dev
}
#[getter]
fn degree_variance(&self) -> f64 {
self.inner.degree_variance
}
#[getter]
fn average_clustering(&self) -> f64 {
self.inner.average_clustering
}
#[getter]
fn global_clustering(&self) -> f64 {
self.inner.global_clustering
}
#[getter]
fn average_path_length(&self) -> f64 {
self.inner.average_path_length
}
#[getter]
fn diameter(&self) -> usize {
self.inner.diameter
}
#[getter]
fn radius(&self) -> usize {
self.inner.radius
}
#[getter]
fn density(&self) -> f64 {
self.inner.density
}
#[getter]
fn is_connected(&self) -> bool {
self.inner.is_connected
}
#[getter]
fn num_components(&self) -> usize {
self.inner.num_components
}
#[getter]
fn largest_component_size(&self) -> usize {
self.inner.largest_component_size
}
#[getter]
fn assortativity(&self) -> f64 {
self.inner.assortativity
}
#[getter]
fn feature_count(&self) -> usize {
self.inner.feature_count
}
fn __repr__(&self) -> String {
format!("{}", self.inner)
}
fn __str__(&self) -> String {
format!("{}", self.inner)
}
}
#[cfg(feature = "python-bindings")]
#[pyclass(name = "MotifCounts")]
pub struct PyMotifCounts {
inner: crate::analysis::motifs::MotifCounts,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyMotifCounts {
fn counts(&self) -> std::collections::HashMap<String, usize> {
self.inner.counts.clone()
}
#[getter]
fn total_subgraphs(&self) -> usize {
self.inner.total_subgraphs
}
fn get(&self, motif_name: &str) -> Option<usize> {
self.inner.counts.get(motif_name).copied()
}
fn __repr__(&self) -> String {
format!("MotifCounts(counts={:?}, total={})", self.inner.counts, self.inner.total_subgraphs)
}
}
#[cfg(feature = "python-bindings")]
#[pyclass(name = "BuiltinFeature")]
#[derive(Clone)]
pub struct PyBuiltinFeature {
inner: RustBuiltinFeature,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyBuiltinFeature {
#[classattr]
const DELTA_FORWARD: &'static str = "DeltaForward";
#[classattr]
const DELTA_BACKWARD: &'static str = "DeltaBackward";
#[classattr]
const DELTA_SYMMETRIC: &'static str = "DeltaSymmetric";
#[classattr]
const LOCAL_SLOPE: &'static str = "LocalSlope";
#[classattr]
const ACCELERATION: &'static str = "Acceleration";
#[classattr]
const LOCAL_MEAN: &'static str = "LocalMean";
#[classattr]
const LOCAL_VARIANCE: &'static str = "LocalVariance";
#[classattr]
const IS_LOCAL_MAX: &'static str = "IsLocalMax";
#[classattr]
const IS_LOCAL_MIN: &'static str = "IsLocalMin";
#[classattr]
const ZSCORE: &'static str = "ZScore";
#[new]
fn new(name: &str) -> PyResult<Self> {
let inner = match name {
"DeltaForward" => RustBuiltinFeature::DeltaForward,
"DeltaBackward" => RustBuiltinFeature::DeltaBackward,
"DeltaSymmetric" => RustBuiltinFeature::DeltaSymmetric,
"LocalSlope" => RustBuiltinFeature::LocalSlope,
"Acceleration" => RustBuiltinFeature::Acceleration,
"LocalMean" => RustBuiltinFeature::LocalMean,
"LocalVariance" => RustBuiltinFeature::LocalVariance,
"IsLocalMax" => RustBuiltinFeature::IsLocalMax,
"IsLocalMin" => RustBuiltinFeature::IsLocalMin,
"ZScore" => RustBuiltinFeature::ZScore,
_ => return Err(py_error_helpers::to_value_error(
format!("Unknown feature: {}", name)
)),
};
Ok(Self { inner })
}
fn __repr__(&self) -> String {
format!("BuiltinFeature({:?})", self.inner)
}
}
#[cfg(feature = "python-bindings")]
#[pyclass(name = "FeatureSet")]
pub struct PyFeatureSet {
inner: RustFeatureSet<f64>,
}
#[cfg(feature = "python-bindings")]
#[pymethods]
impl PyFeatureSet {
#[new]
fn new() -> Self {
Self {
inner: RustFeatureSet::new(),
}
}
fn add_builtin(mut slf: PyRefMut<'_, Self>, feature: PyBuiltinFeature) -> PyRefMut<'_, Self> {
let inner = std::mem::replace(&mut slf.inner, RustFeatureSet::new());
slf.inner = inner.add_builtin(feature.inner);
slf
}
fn __repr__(&self) -> String {
"FeatureSet(...)".to_string()
}
}
#[cfg(feature = "python-bindings")]
#[pyfunction]
fn natural_visibility(values: Vec<f64>) -> PyResult<PyVisibilityGraph> {
let series = RustTimeSeries::from_raw(values)
.map_err(py_error_helpers::to_value_error)?;
let graph = crate::VisibilityGraph::from_series(&series)
.natural_visibility()
.map_err(py_error_helpers::to_runtime_error)?;
Ok(PyVisibilityGraph { inner: graph })
}
#[cfg(feature = "python-bindings")]
#[pyfunction]
fn horizontal_visibility(values: Vec<f64>) -> PyResult<PyVisibilityGraph> {
let series = RustTimeSeries::from_raw(values)
.map_err(py_error_helpers::to_value_error)?;
let graph = crate::VisibilityGraph::from_series(&series)
.horizontal_visibility()
.map_err(py_error_helpers::to_runtime_error)?;
Ok(PyVisibilityGraph { inner: graph })
}
#[cfg(feature = "python-bindings")]
#[pymodule]
fn _rustygraph(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyTimeSeries>()?;
m.add_class::<PyVisibilityGraph>()?;
m.add_class::<PyBuiltinFeature>()?;
m.add_class::<PyFeatureSet>()?;
m.add_class::<PyMissingDataStrategy>()?;
m.add_class::<PyGraphStatistics>()?;
m.add_class::<PyMotifCounts>()?;
m.add_function(wrap_pyfunction!(natural_visibility, m)?)?;
m.add_function(wrap_pyfunction!(horizontal_visibility, m)?)?;
Ok(())
}
#[cfg(feature = "python-bindings")]
mod py_error_helpers {
use pyo3::PyErr;
use pyo3::exceptions::{PyValueError, PyRuntimeError};
pub fn to_value_error<E: std::fmt::Display>(error: E) -> PyErr {
PyErr::new::<PyValueError, _>(error.to_string())
}
pub fn to_runtime_error<E: std::fmt::Display>(error: E) -> PyErr {
PyErr::new::<PyRuntimeError, _>(error.to_string())
}
}
#[cfg(test)]
#[cfg(feature = "python-bindings")]
mod tests {
use super::*;
#[test]
fn test_py_timeseries_creation() {
let series = PyTimeSeries::new(vec![1.0, 2.0, 3.0, 4.0]).unwrap();
assert_eq!(series.__len__(), 4);
}
}