use super::endpoint::{EndpointUri, Role};
use super::rendezvous::RendezvousDir;
use super::ring_buffer::{IpcOptions, create_ipc_ring_buffer_with_opts};
use super::transport::{IpcResult, IpcTransport, IpcTransportHandle};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::sync::{Arc, Mutex};
pub type IpcTriplePair<T> = IpcResult<(Option<IpcTripleWriter<T>>, Option<IpcTripleReader<T>>)>;
pub struct IpcTripleWriter<T: Copy> {
transport: IpcTransportHandle,
_marker: PhantomData<T>,
}
impl<T: Copy> IpcTripleWriter<T> {
pub fn from_transport(transport: IpcTransportHandle) -> Self {
Self { transport, _marker: PhantomData }
}
pub fn publish(&self, value: &T) -> bool {
let bytes = unsafe {
std::slice::from_raw_parts(value as *const T as *const u8, std::mem::size_of::<T>())
};
self.transport.publish(bytes).is_ok()
}
pub fn is_connected(&self) -> bool {
self.transport.is_ready()
}
}
pub struct IpcTripleReader<T: Copy> {
transport: IpcTransportHandle,
latest: Mutex<Option<T>>,
_marker: PhantomData<T>,
}
impl<T: Copy> IpcTripleReader<T> {
pub fn from_transport(transport: IpcTransportHandle) -> Self {
Self { transport, latest: Mutex::new(None), _marker: PhantomData }
}
pub fn get_latest(&self) -> IpcResult<Option<T>> {
let mut last_bytes: Option<Vec<u8>> = None;
while let Some(b) = self.transport.try_recv()? {
last_bytes = Some(b);
}
if let Some(bytes) = last_bytes
&& bytes.len() == std::mem::size_of::<T>()
{
let mut v: MaybeUninit<T> = MaybeUninit::uninit();
unsafe {
std::ptr::copy_nonoverlapping(
bytes.as_ptr(),
v.as_mut_ptr() as *mut u8,
bytes.len(),
);
let value = v.assume_init();
let mut guard = self.latest.lock().expect("latest poisoned");
*guard = Some(value);
}
}
Ok(*self.latest.lock().expect("latest poisoned"))
}
pub fn is_connected(&self) -> bool {
self.transport.is_ready()
}
}
pub fn create_ipc_triple_buffer<T: Copy>(
uri: &EndpointUri,
role: Role,
rdv: RendezvousDir,
) -> IpcTriplePair<T> {
create_ipc_triple_buffer_with_opts(uri, role, rdv, &IpcOptions::default())
}
pub fn create_ipc_triple_buffer_with_opts<T: Copy>(
uri: &EndpointUri,
role: Role,
rdv: RendezvousDir,
opts: &IpcOptions,
) -> IpcTriplePair<T> {
let (w, r) = create_ipc_ring_buffer_with_opts::<T>(uri, role, rdv, opts)?;
match role {
Role::Publisher => {
let t: Arc<dyn IpcTransport> = w.expect("publisher writer").transport().clone();
Ok((Some(IpcTripleWriter::from_transport(t)), None))
}
Role::Subscriber => {
let t: Arc<dyn IpcTransport> = r.expect("subscriber reader").transport().clone();
Ok((None, Some(IpcTripleReader::from_transport(t))))
}
}
}
#[cfg(test)]
mod tests {
use super::super::backend::loopback::LoopbackTransport;
use super::*;
#[repr(C)]
#[derive(Copy, Clone, PartialEq, Debug)]
struct Pose {
seq: u64,
x: f32,
}
#[test]
fn loopback_latest_wins() {
let t: Arc<dyn IpcTransport> = Arc::new(LoopbackTransport::new());
let w = IpcTripleWriter::<Pose>::from_transport(t.clone());
let r = IpcTripleReader::<Pose>::from_transport(t);
assert_eq!(r.get_latest().unwrap(), None);
assert!(w.publish(&Pose { seq: 1, x: 0.1 }));
assert!(w.publish(&Pose { seq: 2, x: 0.2 }));
assert!(w.publish(&Pose { seq: 3, x: 0.3 }));
assert_eq!(r.get_latest().unwrap(), Some(Pose { seq: 3, x: 0.3 }));
assert_eq!(r.get_latest().unwrap(), Some(Pose { seq: 3, x: 0.3 }));
assert!(w.publish(&Pose { seq: 4, x: 0.4 }));
assert_eq!(r.get_latest().unwrap(), Some(Pose { seq: 4, x: 0.4 }));
}
}