use super::endpoint::EndpointUri;
use super::rendezvous::RendezvousDir;
use super::transport::{IpcError, IpcResult, IpcTransport, IpcTransportHandle};
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::time::{Duration, Instant};
pub type IpcRingPair<T> = IpcResult<(Option<IpcRingWriter<T>>, Option<IpcRingReader<T>>)>;
#[derive(Debug, Clone)]
pub struct ConnectOptions {
pub connect_timeout: Option<Duration>,
pub retry_interval: Duration,
}
impl ConnectOptions {
pub const fn non_blocking() -> Self {
Self {
connect_timeout: Some(Duration::from_millis(0)),
retry_interval: Duration::from_millis(0),
}
}
pub const fn wait_forever() -> Self {
Self {
connect_timeout: None,
retry_interval: Duration::from_millis(500),
}
}
pub const fn with_timeout(connect_timeout: Duration) -> Self {
Self {
connect_timeout: Some(connect_timeout),
retry_interval: Duration::from_millis(500),
}
}
}
impl Default for ConnectOptions {
fn default() -> Self {
Self::with_timeout(Duration::from_secs(30))
}
}
#[derive(Debug, Clone)]
pub struct TcpOptions {
pub high_watermark: Option<usize>,
pub overflow: OverflowPolicy,
}
impl Default for TcpOptions {
fn default() -> Self {
Self { high_watermark: None, overflow: OverflowPolicy::DropOldest }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OverflowPolicy {
DropOldest,
DropNewest,
Error,
}
#[derive(Debug, Clone, Default)]
pub struct IpcOptions {
pub connect: ConnectOptions,
pub tcp: TcpOptions,
}
pub struct IpcRingWriter<T: Copy> {
transport: IpcTransportHandle,
_marker: PhantomData<T>,
}
impl<T: Copy> IpcRingWriter<T> {
pub fn from_transport(transport: IpcTransportHandle) -> Self {
Self { transport, _marker: PhantomData }
}
pub fn try_push(&self, value: &T) -> bool {
let bytes = value_to_bytes(value);
self.transport.publish(bytes).is_ok()
}
pub fn is_connected(&self) -> bool {
self.transport.is_ready()
}
pub fn transport(&self) -> &IpcTransportHandle {
&self.transport
}
}
pub struct IpcRingReader<T: Copy> {
transport: IpcTransportHandle,
_marker: PhantomData<T>,
}
impl<T: Copy> IpcRingReader<T> {
pub fn from_transport(transport: IpcTransportHandle) -> Self {
Self { transport, _marker: PhantomData }
}
pub fn try_pop(&self) -> IpcResult<Option<T>> {
match self.transport.try_recv()? {
None => Ok(None),
Some(bytes) => {
if bytes.len() != std::mem::size_of::<T>() {
return Ok(None);
}
let mut v: MaybeUninit<T> = MaybeUninit::uninit();
unsafe {
std::ptr::copy_nonoverlapping(
bytes.as_ptr(),
v.as_mut_ptr() as *mut u8,
bytes.len(),
);
Ok(Some(v.assume_init()))
}
}
}
}
pub fn is_connected(&self) -> bool {
self.transport.is_ready()
}
pub fn transport(&self) -> &IpcTransportHandle {
&self.transport
}
}
pub fn create_ipc_ring_buffer<T: Copy>(
uri: &EndpointUri,
role: super::endpoint::Role,
rdv: RendezvousDir,
) -> IpcRingPair<T> {
create_ipc_ring_buffer_with_opts(uri, role, rdv, &IpcOptions::default())
}
pub fn create_ipc_ring_buffer_with_opts<T: Copy>(
uri: &EndpointUri,
role: super::endpoint::Role,
rdv: RendezvousDir,
opts: &IpcOptions,
) -> IpcRingPair<T> {
use super::backend::TcpTransport;
use super::endpoint::Role;
match role {
Role::Publisher => {
let transport: Arc<dyn IpcTransport> = Arc::new(
TcpTransport::bind_publisher_with_opts(uri, rdv, opts.tcp.clone())?,
);
Ok((Some(IpcRingWriter::from_transport(transport)), None))
}
Role::Subscriber => {
let transport = connect_subscriber_retry::<TcpTransport>(uri, &rdv, &opts.connect)?;
Ok((None, Some(IpcRingReader::from_transport(transport))))
}
}
}
fn connect_subscriber_retry<B: 'static + ConnectSubscriber>(
uri: &EndpointUri,
rdv: &RendezvousDir,
opts: &ConnectOptions,
) -> IpcResult<IpcTransportHandle> {
let deadline = opts.connect_timeout.map(|d| Instant::now() + d);
loop {
match B::connect_subscriber(uri, rdv) {
Ok(t) => return Ok(Arc::new(t)),
Err(e @ (IpcError::NotReady | IpcError::Io(_))) => {
if let Some(dl) = deadline
&& Instant::now() >= dl
{
return Err(if matches!(e, IpcError::NotReady) {
IpcError::NotReady
} else {
e
});
}
std::thread::sleep(opts.retry_interval);
}
Err(e) => return Err(e),
}
}
}
pub(crate) trait ConnectSubscriber: Sized + IpcTransport {
fn connect_subscriber(uri: &EndpointUri, rdv: &RendezvousDir) -> IpcResult<Self>;
}
impl ConnectSubscriber for super::backend::TcpTransport {
fn connect_subscriber(uri: &EndpointUri, rdv: &RendezvousDir) -> IpcResult<Self> {
Self::connect_subscriber(uri, rdv)
}
}
#[inline]
fn value_to_bytes<T: Copy>(value: &T) -> &[u8] {
unsafe { std::slice::from_raw_parts(value as *const T as *const u8, std::mem::size_of::<T>()) }
}
#[cfg(test)]
mod tests {
use super::super::backend::loopback::LoopbackTransport;
use super::*;
#[repr(C)]
#[derive(Copy, Clone, PartialEq, Debug)]
struct Pose {
x: f32,
y: f32,
z: f32,
}
#[test]
fn loopback_push_pop() {
let t: Arc<dyn IpcTransport> = Arc::new(LoopbackTransport::new());
let w = IpcRingWriter::<Pose>::from_transport(t.clone());
let r = IpcRingReader::<Pose>::from_transport(t);
let p = Pose { x: 1.0, y: 2.0, z: 3.0 };
assert!(w.try_push(&p));
assert_eq!(r.try_pop().unwrap(), Some(p));
assert_eq!(r.try_pop().unwrap(), None);
}
}