wingfoil-python 3.3.0

python bindings for wingfoil - graph based stream processing framework
Documentation
use anyhow::anyhow;
use pyo3::exceptions::PyException;
use std::time::SystemTime;

use ::wingfoil::{NanoTime, RunFor, RunMode};

use pyo3::prelude::*;

use std::time::Duration;
use std::time::UNIX_EPOCH;

pub trait ToPyResult<T> {
    fn to_pyresult(self) -> PyResult<T>;
}

impl<T> ToPyResult<T> for anyhow::Result<T> {
    fn to_pyresult(self) -> PyResult<T> {
        self.map_err(|e| PyException::new_err(e.to_string()))
    }
}

pub fn parse_run_args(
    py: Python<'_>,
    realtime: Option<bool>,
    start: Option<Py<PyAny>>,
    duration: Option<Py<PyAny>>,
    cycles: Option<u32>,
) -> anyhow::Result<(RunMode, RunFor)> {
    if duration.is_some() && cycles.is_some() {
        panic!("Cannot specify both duration and cycles");
    }
    let realtime = realtime.unwrap_or(false);
    if realtime && start.is_some() {
        panic!("Cannot specify start in realtime mode");
    }
    let run_mode = if realtime {
        RunMode::RealTime
    } else {
        let t = match start {
            Some(start) => to_nano_time(py, start)?,
            None => NanoTime::ZERO,
        };
        RunMode::HistoricalFrom(t)
    };
    let run_for = if let Some(cycles) = cycles {
        RunFor::Cycles(cycles)
    } else {
        match duration {
            Some(duration) => {
                let duration = to_duration(py, duration)?;
                RunFor::Duration(duration)
            }
            None => RunFor::Forever,
        }
    };
    Ok((run_mode, run_for))
}

fn to_duration(py: Python<'_>, obj: Py<PyAny>) -> anyhow::Result<Duration> {
    if let Ok(f) = obj.extract::<f64>(py) {
        if f < 0.0 {
            anyhow::bail!("duration can not be negative");
        }
        let nanos = (f * 1e9) as u64;
        Ok(Duration::from_nanos(nanos))
    } else if let Ok(td) = obj.extract::<Duration>(py) {
        Ok(td)
    } else {
        anyhow::bail!("failed to convert duration");
    }
}

fn f64_secs_to_nanos(ts: f64) -> anyhow::Result<u64> {
    if ts.is_sign_negative() {
        return Err(anyhow!("Negative timestamps not supported"));
    }
    let secs = ts.trunc() as u64;
    let frac_nanos = (ts.fract() * 1e9) as u64;
    let nanos = secs
        .checked_mul(1_000_000_000)
        .ok_or_else(|| anyhow!("Overflow when converting seconds to nanoseconds"))?;
    let total_nanos = nanos
        .checked_add(frac_nanos)
        .ok_or_else(|| anyhow!("Overflow when adding fractional nanoseconds"))?;
    Ok(total_nanos)
}

fn to_nano_time(py: Python<'_>, obj: Py<PyAny>) -> anyhow::Result<NanoTime> {
    if let Ok(dt) = obj.extract::<SystemTime>(py) {
        let nanos = dt.duration_since(UNIX_EPOCH)?.as_nanos();
        Ok(nanos.into())
    } else if let Ok(ts) = obj.extract::<f64>(py) {
        Ok(f64_secs_to_nanos(ts)?.into())
    } else {
        Err(anyhow!("failed to convert to NanoTime"))
    }
}