wingfoil-python 4.0.1

python bindings for wingfoil - graph based stream processing framework
Documentation
use anyhow::anyhow;
use pyo3::exceptions::PyException;
use std::time::SystemTime;

use ::wingfoil::{NanoTime, RunFor, RunMode};

#[cfg(feature = "iceoryx2-beta")]
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;

use std::time::Duration;
use std::time::UNIX_EPOCH;

pub trait ToPyResult<T> {
    fn to_pyresult(self) -> PyResult<T>;
}

#[cfg(feature = "iceoryx2-beta")]
#[derive(Debug, thiserror::Error)]
pub enum PyBindingError {
    #[error("{0}")]
    TypeError(String),
}

#[cfg(feature = "iceoryx2-beta")]
pub fn py_type_error(msg: impl Into<String>) -> anyhow::Error {
    anyhow!(PyBindingError::TypeError(msg.into()))
}

impl<T> ToPyResult<T> for anyhow::Result<T> {
    fn to_pyresult(self) -> PyResult<T> {
        self.map_err(|e| {
            #[cfg(feature = "iceoryx2-beta")]
            if let Some(PyBindingError::TypeError(msg)) = e.downcast_ref::<PyBindingError>() {
                return PyTypeError::new_err(msg.clone());
            }

            PyException::new_err(e.to_string())
        })
    }
}

pub fn parse_run_args(
    py: Python<'_>,
    realtime: bool,
    start: Option<Py<PyAny>>,
    duration: Option<Py<PyAny>>,
    cycles: Option<u32>,
) -> anyhow::Result<(RunMode, RunFor)> {
    if duration.is_some() && cycles.is_some() {
        panic!("Cannot specify both duration and cycles");
    }
    if realtime && start.is_some() {
        panic!("Cannot specify start in realtime mode");
    }
    let run_mode = if realtime {
        RunMode::RealTime
    } else {
        let t = match start {
            Some(start) => to_nano_time(py, start)?,
            None => NanoTime::ZERO,
        };
        RunMode::HistoricalFrom(t)
    };
    let run_for = if let Some(cycles) = cycles {
        RunFor::Cycles(cycles)
    } else {
        match duration {
            Some(duration) => {
                let duration = to_duration(py, duration)?;
                RunFor::Duration(duration)
            }
            None => RunFor::Forever,
        }
    };
    Ok((run_mode, run_for))
}

fn to_duration(py: Python<'_>, obj: Py<PyAny>) -> anyhow::Result<Duration> {
    if let Ok(f) = obj.extract::<f64>(py) {
        if f < 0.0 {
            anyhow::bail!("duration can not be negative");
        }
        let nanos = (f * 1e9) as u64;
        Ok(Duration::from_nanos(nanos))
    } else if let Ok(td) = obj.extract::<Duration>(py) {
        Ok(td)
    } else {
        anyhow::bail!("failed to convert duration");
    }
}

fn f64_secs_to_nanos(ts: f64) -> anyhow::Result<u64> {
    if ts.is_sign_negative() {
        return Err(anyhow!("Negative timestamps not supported"));
    }
    let secs = ts.trunc() as u64;
    let frac_nanos = (ts.fract() * 1e9) as u64;
    let nanos = secs
        .checked_mul(1_000_000_000)
        .ok_or_else(|| anyhow!("Overflow when converting seconds to nanoseconds"))?;
    let total_nanos = nanos
        .checked_add(frac_nanos)
        .ok_or_else(|| anyhow!("Overflow when adding fractional nanoseconds"))?;
    Ok(total_nanos)
}

fn to_nano_time(py: Python<'_>, obj: Py<PyAny>) -> anyhow::Result<NanoTime> {
    if let Ok(dt) = obj.extract::<SystemTime>(py) {
        let nanos = dt.duration_since(UNIX_EPOCH)?.as_nanos();
        Ok(nanos.into())
    } else if let Ok(ts) = obj.extract::<f64>(py) {
        Ok(f64_secs_to_nanos(ts)?.into())
    } else {
        Err(anyhow!("failed to convert to NanoTime"))
    }
}