use crate::bindings::json_utils::{json_value_to_py, py_to_json_value};
use crate::event_stream::{
Event, EventBus, EventBusConfig, EventFilter, EventPublisher, EventSubscriber,
SlowConsumerPolicy,
};
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use std::time::{Duration, UNIX_EPOCH};
#[pyclass(name = "Event")]
#[derive(Clone)]
pub struct PyEvent {
inner: Event,
}
#[pymethods]
impl PyEvent {
#[new]
#[pyo3(signature = (event_type, data=None))]
fn new(py: Python<'_>, event_type: &str, data: Option<Py<PyAny>>) -> PyResult<Self> {
let json_data = match data {
Some(obj) => py_to_json_value(&obj.bind(py).clone())?,
None => serde_json::json!({}),
};
Ok(Self {
inner: Event::new(event_type, json_data),
})
}
#[staticmethod]
#[pyo3(signature = (event_type, resource_id, data=None))]
fn with_resource(
py: Python<'_>,
event_type: &str,
resource_id: &str,
data: Option<Py<PyAny>>,
) -> PyResult<Self> {
let json_data = match data {
Some(obj) => py_to_json_value(&obj.bind(py).clone())?,
None => serde_json::json!({}),
};
Ok(Self {
inner: Event::with_resource(event_type, resource_id, json_data),
})
}
#[staticmethod]
fn progress(resource_id: &str, current: u64, total: u64, message: &str) -> Self {
Self {
inner: Event::progress(resource_id, current, total, message),
}
}
#[staticmethod]
fn log(resource_id: &str, level: &str, message: &str) -> Self {
Self {
inner: Event::log(resource_id, level, message),
}
}
#[staticmethod]
fn stdout(resource_id: &str, line: &str) -> Self {
Self {
inner: Event::stdout(resource_id, line),
}
}
#[staticmethod]
fn stderr(resource_id: &str, line: &str) -> Self {
Self {
inner: Event::stderr(resource_id, line),
}
}
#[getter]
fn id(&self) -> u64 {
self.inner.id
}
#[getter]
fn timestamp(&self) -> f64 {
self.inner
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_secs_f64()
}
#[getter]
fn event_type(&self) -> &str {
&self.inner.event_type
}
#[getter]
fn resource_id(&self) -> Option<&str> {
self.inner.resource_id.as_deref()
}
#[getter]
fn data(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
json_value_to_py(py, &self.inner.data)
}
fn to_json(&self) -> PyResult<String> {
serde_json::to_string(&self.inner).map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
fn __repr__(&self) -> String {
format!(
"Event(id={}, type='{}', resource_id={:?})",
self.inner.id, self.inner.event_type, self.inner.resource_id
)
}
}
#[pyclass(name = "EventFilter")]
#[derive(Clone)]
pub struct PyEventFilter {
inner: EventFilter,
}
#[pymethods]
impl PyEventFilter {
#[new]
fn new() -> Self {
Self {
inner: EventFilter::new(),
}
}
fn event_type(&self, pattern: &str) -> Self {
Self {
inner: self.inner.clone().event_type(pattern),
}
}
fn resource(&self, id: &str) -> Self {
Self {
inner: self.inner.clone().resource(id),
}
}
fn since(&self, timestamp: f64) -> Self {
let time = UNIX_EPOCH + Duration::from_secs_f64(timestamp);
Self {
inner: self.inner.clone().since(time),
}
}
fn until(&self, timestamp: f64) -> Self {
let time = UNIX_EPOCH + Duration::from_secs_f64(timestamp);
Self {
inner: self.inner.clone().until(time),
}
}
fn matches(&self, event: &PyEvent) -> bool {
self.inner.matches(&event.inner)
}
fn __repr__(&self) -> String {
format!(
"EventFilter(event_types={:?}, resource_ids={:?})",
self.inner.event_types, self.inner.resource_ids
)
}
}
#[pyclass(name = "EventBusConfig")]
#[derive(Clone)]
pub struct PyEventBusConfig {
inner: EventBusConfig,
}
#[pymethods]
impl PyEventBusConfig {
#[new]
#[pyo3(signature = (history_size=1000, subscriber_buffer=256, slow_consumer="drop_oldest"))]
fn new(history_size: usize, subscriber_buffer: usize, slow_consumer: &str) -> PyResult<Self> {
let policy =
match slow_consumer {
"drop_oldest" => SlowConsumerPolicy::DropOldest,
"drop_newest" => SlowConsumerPolicy::DropNewest,
"block" => SlowConsumerPolicy::Block,
_ => return Err(PyRuntimeError::new_err(
"Invalid slow_consumer policy. Use 'drop_oldest', 'drop_newest', or 'block'",
)),
};
Ok(Self {
inner: EventBusConfig {
history_size,
subscriber_buffer,
slow_consumer: policy,
},
})
}
#[getter]
fn history_size(&self) -> usize {
self.inner.history_size
}
#[getter]
fn subscriber_buffer(&self) -> usize {
self.inner.subscriber_buffer
}
fn __repr__(&self) -> String {
format!(
"EventBusConfig(history_size={}, subscriber_buffer={})",
self.inner.history_size, self.inner.subscriber_buffer
)
}
}
#[pyclass(name = "EventPublisher")]
pub struct PyEventPublisher {
inner: EventPublisher,
}
#[pymethods]
impl PyEventPublisher {
fn publish(&self, event: &PyEvent) {
self.inner.publish(event.inner.clone());
}
fn progress(&self, resource_id: &str, current: u64, total: u64, message: &str) {
self.inner.progress(resource_id, current, total, message);
}
fn log(&self, resource_id: &str, level: &str, message: &str) {
self.inner.log(resource_id, level, message);
}
fn stdout(&self, resource_id: &str, line: &str) {
self.inner.stdout(resource_id, line);
}
fn stderr(&self, resource_id: &str, line: &str) {
self.inner.stderr(resource_id, line);
}
fn task_started(&self, py: Python<'_>, task_id: &str, data: Option<Py<PyAny>>) -> PyResult<()> {
let json_data = match data {
Some(obj) => py_to_json_value(&obj.bind(py).clone())?,
None => serde_json::json!({}),
};
self.inner.task_started(task_id, json_data);
Ok(())
}
fn task_completed(
&self,
py: Python<'_>,
task_id: &str,
result: Option<Py<PyAny>>,
) -> PyResult<()> {
let json_result = match result {
Some(obj) => py_to_json_value(&obj.bind(py).clone())?,
None => serde_json::json!({}),
};
self.inner.task_completed(task_id, json_result);
Ok(())
}
fn task_failed(&self, task_id: &str, error: &str) {
self.inner.task_failed(task_id, error);
}
fn task_cancelled(&self, task_id: &str) {
self.inner.task_cancelled(task_id);
}
fn __repr__(&self) -> String {
"EventPublisher()".to_string()
}
}
#[pyclass(name = "EventSubscriber")]
pub struct PyEventSubscriber {
inner: EventSubscriber,
}
#[pymethods]
impl PyEventSubscriber {
fn recv(&self, py: Python<'_>) -> Option<PyEvent> {
let inner = &self.inner;
py.detach(|| inner.recv().map(|e| PyEvent { inner: e }))
}
fn try_recv(&self) -> Option<PyEvent> {
self.inner.try_recv().map(|e| PyEvent { inner: e })
}
fn recv_timeout(&self, py: Python<'_>, timeout_ms: u64) -> PyResult<PyEvent> {
let timeout = Duration::from_millis(timeout_ms);
let inner = &self.inner;
py.detach(|| {
inner
.recv_timeout(timeout)
.map(|e| PyEvent { inner: e })
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})
}
fn drain(&self) -> Vec<PyEvent> {
self.inner
.try_iter()
.map(|e| PyEvent { inner: e })
.collect()
}
fn __repr__(&self) -> String {
format!("EventSubscriber(filter={:?})", self.inner.filter())
}
}
#[pyclass(name = "EventBus")]
pub struct PyEventBus {
inner: EventBus,
}
#[pymethods]
impl PyEventBus {
#[new]
#[pyo3(signature = (config=None))]
fn new(config: Option<PyEventBusConfig>) -> Self {
let cfg = config.map(|c| c.inner).unwrap_or_default();
Self {
inner: EventBus::new(cfg),
}
}
fn publisher(&self) -> PyEventPublisher {
PyEventPublisher {
inner: self.inner.publisher(),
}
}
#[pyo3(signature = (filter=None))]
fn subscribe(&self, filter: Option<PyEventFilter>) -> PyEventSubscriber {
let f = filter.map(|f| f.inner).unwrap_or_default();
PyEventSubscriber {
inner: self.inner.subscribe(f),
}
}
#[pyo3(signature = (filter=None))]
fn history(&self, filter: Option<PyEventFilter>) -> Vec<PyEvent> {
let f = filter.map(|f| f.inner).unwrap_or_default();
self.inner
.history(&f)
.into_iter()
.map(|e| PyEvent { inner: e })
.collect()
}
fn clear_history(&self) {
self.inner.clear_history();
}
fn publish(&self, event: &PyEvent) {
self.inner.publish(event.inner.clone());
}
fn __repr__(&self) -> String {
"EventBus()".to_string()
}
}