binlog 0.5.0

A binary data log library
Documentation
use std::borrow::Cow;
use std::ops::Bound;
use std::time::Duration;

use crate::{Error, Range, RangeableStore, Store, SubscribeableStore, Subscription};

use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;

fn map_result<T>(res: Result<T, Error>) -> PyResult<T> {
    res.map_err(|err| match err {
        Error::Database(err) => PyRuntimeError::new_err(format!("{}", err)),
        Error::Io(err) => PyIOError::new_err(err),
        Error::BadRange => PyValueError::new_err("bad range"),
    })
}

#[pyclass]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Entry {
    #[pyo3(get, set)]
    pub timestamp: i64,
    #[pyo3(get, set)]
    pub name: String,
    #[pyo3(get, set)]
    pub value: Vec<u8>,
}

#[pymethods]
impl Entry {
    #[new]
    pub fn new(timestamp: i64, name: String, value: Vec<u8>) -> Self {
        Entry { timestamp, name, value }
    }
}

impl From<Entry> for crate::Entry {
    fn from(entry: Entry) -> crate::Entry {
        crate::Entry::new_with_timestamp(entry.timestamp, entry.name, entry.value)
    }
}

impl From<crate::Entry> for Entry {
    fn from(entry: crate::Entry) -> Entry {
        Entry::new(entry.timestamp, entry.name.to_string(), entry.value)
    }
}

#[pyclass]
pub struct SqliteStore {
    store: crate::SqliteStore,
}

#[pymethods]
impl SqliteStore {
    #[new]
    pub fn new(path: String, compression_level: Option<i32>) -> PyResult<Self> {
        Ok(Self {
            store: map_result(crate::SqliteStore::new(path, compression_level))?,
        })
    }

    pub fn push(&self, py: Python, entry: Entry) -> PyResult<()> {
        let entry = Cow::Owned(entry.into());
        py.allow_threads(move || map_result(self.store.push(entry)))
    }

    pub fn range(
        &self,
        start_bound: Option<i64>,
        end_bound: Option<i64>,
        name: Option<String>,
    ) -> PyResult<SqliteRange> {
        let start_bound = match start_bound {
            Some(ts) => Bound::Included(ts),
            None => Bound::Unbounded,
        };
        let end_bound = match end_bound {
            Some(ts) => Bound::Excluded(ts),
            None => Bound::Unbounded,
        };
        let range = map_result(self.store.range((start_bound, end_bound), name))?;
        Ok(SqliteRange { range: Some(range) })
    }
}

#[pyclass]
pub struct SqliteRange {
    range: Option<crate::SqliteRange>,
}

#[pymethods]
impl SqliteRange {
    pub fn count(&self, py: Python) -> PyResult<u64> {
        if let Some(range) = &self.range {
            py.allow_threads(move || map_result(range.count()))
        } else {
            Err(PyValueError::new_err("range already consumed"))
        }
    }

    pub fn remove(&mut self, py: Python) -> PyResult<()> {
        if let Some(range) = self.range.take() {
            py.allow_threads(move || map_result(range.remove()))
        } else {
            Err(PyValueError::new_err("range already consumed"))
        }
    }

    pub fn iter(&mut self, py: Python) -> PyResult<SqliteRangeIterator> {
        if let Some(range) = self.range.take() {
            py.allow_threads(move || {
                let iter = map_result(range.iter())?;
                Ok(SqliteRangeIterator { iter })
            })
        } else {
            Err(PyValueError::new_err("range already consumed"))
        }
    }
}

#[pyclass]
pub struct SqliteRangeIterator {
    iter: crate::SqliteRangeIterator,
}

#[pymethods]
impl SqliteRangeIterator {
    fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
        slf
    }

    fn __next__(mut slf: PyRefMut<'_, Self>, py: Python) -> Option<PyObject> {
        match slf.iter.next() {
            Some(Ok(entry)) => Some(Entry::from(entry).into_py(py)),
            Some(Err(err)) => Some(map_result::<()>(Err(err)).unwrap_err().into_py(py)),
            None => None,
        }
    }
}

#[pyclass]
pub struct RedisStreamStore {
    store: crate::RedisStreamStore,
}

#[pymethods]
impl RedisStreamStore {
    #[new]
    pub fn new(connection_url: String) -> PyResult<Self> {
        Ok(Self {
            store: map_result(crate::RedisStreamStore::new(connection_url))?,
        })
    }

    pub fn push(&self, py: Python, entry: Entry) -> PyResult<()> {
        let entry = Cow::Owned(entry.into());
        py.allow_threads(move || map_result(self.store.push(entry)))
    }

    pub fn subscribe(&self, name: String) -> PyResult<RedisStreamSubscription> {
        let subscription = map_result(self.store.subscribe(name))?;
        Ok(RedisStreamSubscription { subscription })
    }
}

#[pyclass]
pub struct RedisStreamSubscription {
    subscription: crate::RedisStreamSubscription,
}

#[pymethods]
impl RedisStreamSubscription {
    pub fn next(&mut self, py: Python, duration: Option<f32>) -> PyResult<Option<Entry>> {
        let duration = duration.map(Duration::from_secs_f32);
        py.allow_threads(move || {
            let entry = map_result(self.subscription.next(duration))?;
            Ok(entry.map(|e| e.into()))
        })
    }
}

#[pymodule]
fn binlog(_py: Python, m: &PyModule) -> PyResult<()> {
    m.add_class::<Entry>()?;
    m.add_class::<SqliteStore>()?;
    m.add_class::<SqliteRange>()?;
    m.add_class::<SqliteRangeIterator>()?;
    m.add_class::<RedisStreamStore>()?;
    m.add_class::<RedisStreamSubscription>()?;
    Ok(())
}