#![allow(clippy::missing_errors_doc)]
#![allow(unsafe_code)]
#![allow(unsafe_op_in_unsafe_fn)]
#![allow(clippy::useless_conversion)]
#![allow(clippy::needless_lifetimes)]
#![allow(clippy::new_without_default)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::expect_used)]
use std::sync::Mutex;
use pyo3::IntoPyObjectExt;
use pyo3::prelude::*;
use zerodds_dcps::InstanceHandle;
use zerodds_dcps::listener::{DataReaderListener, DataWriterListener};
use zerodds_dcps::status::{
LivelinessChangedStatus, LivelinessLostStatus, OfferedDeadlineMissedStatus,
PublicationMatchedStatus, RequestedDeadlineMissedStatus, SampleLostStatus,
SampleRejectedStatus, SubscriptionMatchedStatus,
};
#[derive(Default)]
struct PyListenerSlots {
on_offered_deadline_missed: Option<Py<PyAny>>,
on_liveliness_lost: Option<Py<PyAny>>,
on_publication_matched: Option<Py<PyAny>>,
on_data_available: Option<Py<PyAny>>,
on_sample_lost: Option<Py<PyAny>>,
on_sample_rejected: Option<Py<PyAny>>,
on_requested_deadline_missed: Option<Py<PyAny>>,
on_liveliness_changed: Option<Py<PyAny>>,
on_subscription_matched: Option<Py<PyAny>>,
}
#[pyclass(name = "DataWriterListener", module = "zerodds_py")]
pub struct PyDataWriterListener {
slots: Mutex<PyListenerSlots>,
}
#[pymethods]
impl PyDataWriterListener {
#[new]
fn new() -> Self {
Self {
slots: Mutex::new(PyListenerSlots::default()),
}
}
fn on_offered_deadline_missed(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_offered_deadline_missed = Some(callback);
}
fn on_liveliness_lost(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_liveliness_lost = Some(callback);
}
fn on_publication_matched(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_publication_matched = Some(callback);
}
}
pub struct PyDataWriterListenerBridge {
slots: std::sync::Arc<Mutex<PyListenerSlots>>,
}
impl PyDataWriterListenerBridge {
pub fn from_pyclass(listener: &PyDataWriterListener) -> std::sync::Arc<Self> {
let snapshot = Python::with_gil(|py| {
let g = listener.slots.lock().unwrap();
PyListenerSlots {
on_offered_deadline_missed: g
.on_offered_deadline_missed
.as_ref()
.map(|c| c.clone_ref(py)),
on_liveliness_lost: g.on_liveliness_lost.as_ref().map(|c| c.clone_ref(py)),
on_publication_matched: g.on_publication_matched.as_ref().map(|c| c.clone_ref(py)),
on_data_available: None,
on_sample_lost: None,
on_sample_rejected: None,
on_requested_deadline_missed: None,
on_liveliness_changed: None,
on_subscription_matched: None,
}
});
std::sync::Arc::new(Self {
slots: std::sync::Arc::new(Mutex::new(snapshot)),
})
}
}
fn call_with_handle_and_status<F>(callback: &Py<PyAny>, build_args: F)
where
F: for<'py> FnOnce(Python<'py>) -> PyResult<Bound<'py, pyo3::types::PyTuple>>,
{
Python::with_gil(|py| match build_args(py) {
Ok(args) => {
if let Err(e) = callback.call1(py, args) {
e.print(py);
}
}
Err(e) => e.print(py),
});
}
impl DataWriterListener for PyDataWriterListenerBridge {
fn on_offered_deadline_missed(
&self,
writer: InstanceHandle,
status: OfferedDeadlineMissedStatus,
) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_offered_deadline_missed {
let handle: u64 = writer.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (status.total_count, status.total_count_change);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
fn on_liveliness_lost(&self, writer: InstanceHandle, status: LivelinessLostStatus) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_liveliness_lost {
let handle: u64 = writer.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (status.total_count, status.total_count_change);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
fn on_publication_matched(&self, writer: InstanceHandle, status: PublicationMatchedStatus) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_publication_matched {
let handle: u64 = writer.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (
status.total_count,
status.total_count_change,
status.current_count,
status.current_count_change,
);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
}
#[pyclass(name = "DataReaderListener", module = "zerodds_py")]
pub struct PyDataReaderListener {
slots: Mutex<PyListenerSlots>,
}
#[pymethods]
impl PyDataReaderListener {
#[new]
fn new() -> Self {
Self {
slots: Mutex::new(PyListenerSlots::default()),
}
}
fn on_data_available(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_data_available = Some(callback);
}
fn on_sample_lost(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_sample_lost = Some(callback);
}
fn on_sample_rejected(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_sample_rejected = Some(callback);
}
fn on_requested_deadline_missed(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_requested_deadline_missed = Some(callback);
}
fn on_liveliness_changed(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_liveliness_changed = Some(callback);
}
fn on_subscription_matched(&self, callback: Py<PyAny>) {
self.slots.lock().unwrap().on_subscription_matched = Some(callback);
}
}
pub struct PyDataReaderListenerBridge {
slots: std::sync::Arc<Mutex<PyListenerSlots>>,
}
impl PyDataReaderListenerBridge {
pub fn from_pyclass(listener: &PyDataReaderListener) -> std::sync::Arc<Self> {
let snapshot = Python::with_gil(|py| {
let g = listener.slots.lock().unwrap();
PyListenerSlots {
on_offered_deadline_missed: None,
on_liveliness_lost: None,
on_publication_matched: None,
on_data_available: g.on_data_available.as_ref().map(|c| c.clone_ref(py)),
on_sample_lost: g.on_sample_lost.as_ref().map(|c| c.clone_ref(py)),
on_sample_rejected: g.on_sample_rejected.as_ref().map(|c| c.clone_ref(py)),
on_requested_deadline_missed: g
.on_requested_deadline_missed
.as_ref()
.map(|c| c.clone_ref(py)),
on_liveliness_changed: g.on_liveliness_changed.as_ref().map(|c| c.clone_ref(py)),
on_subscription_matched: g
.on_subscription_matched
.as_ref()
.map(|c| c.clone_ref(py)),
}
});
std::sync::Arc::new(Self {
slots: std::sync::Arc::new(Mutex::new(snapshot)),
})
}
}
impl DataReaderListener for PyDataReaderListenerBridge {
fn on_data_available(&self, reader: InstanceHandle) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_data_available {
let handle: u64 = reader.as_raw();
call_with_handle_and_status(cb, move |py| {
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?])
});
}
}
fn on_sample_lost(&self, reader: InstanceHandle, status: SampleLostStatus) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_sample_lost {
let handle: u64 = reader.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (status.total_count, status.total_count_change);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
fn on_sample_rejected(&self, reader: InstanceHandle, status: SampleRejectedStatus) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_sample_rejected {
let handle: u64 = reader.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (status.total_count, status.total_count_change);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
fn on_requested_deadline_missed(
&self,
reader: InstanceHandle,
status: RequestedDeadlineMissedStatus,
) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_requested_deadline_missed {
let handle: u64 = reader.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (status.total_count, status.total_count_change);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
fn on_liveliness_changed(&self, reader: InstanceHandle, status: LivelinessChangedStatus) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_liveliness_changed {
let handle: u64 = reader.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (
status.alive_count,
status.not_alive_count,
status.alive_count_change,
status.not_alive_count_change,
);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
fn on_subscription_matched(&self, reader: InstanceHandle, status: SubscriptionMatchedStatus) {
let g = self.slots.lock().unwrap();
if let Some(cb) = &g.on_subscription_matched {
let handle: u64 = reader.as_raw();
call_with_handle_and_status(cb, move |py| {
let s = (
status.total_count,
status.total_count_change,
status.current_count,
status.current_count_change,
);
pyo3::types::PyTuple::new(py, [handle.into_py_any(py)?, s.into_py_any(py)?])
});
}
}
}