use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use crate::bundle::{Receiver, Sender};
use crate::cspcl_sys;
use crate::error::{Error, Result};
use crate::interface::Interface;
pub(crate) struct RawCspcl {
inner: UnsafeCell<cspcl_sys::cspcl_t>,
recv_lock: Mutex<()>,
lifecycle_lock: Mutex<()>,
closed: AtomicBool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ConnectionStats {
pub hits: u32,
pub misses: u32,
pub evictions: u32,
pub connect_failures: u32,
pub invalidations: u32,
}
impl RawCspcl {
fn new(local_addr: u8, local_port: u8, interface: Interface) -> Result<Self> {
let mut cspcl = cspcl_sys::cspcl_t {
local_addr,
csp_port: local_port,
iface_type: interface.clone().into(),
..Default::default()
};
match interface {
Interface::Zmq(interface_name) => cspcl.zmqhub_addr = interface_name.into(),
Interface::Can(interface_name) => cspcl.can_iface = interface_name.into(),
Interface::Loopback(_) => {}
}
unsafe {
Error::from_code(cspcl_sys::cspcl_init(&mut cspcl))?;
}
Ok(Self {
inner: UnsafeCell::new(cspcl),
recv_lock: Mutex::new(()),
lifecycle_lock: Mutex::new(()),
closed: AtomicBool::new(false),
})
}
fn as_mut_ptr(&self) -> *mut cspcl_sys::cspcl_t {
self.inner.get()
}
fn local_addr(&self) -> u8 {
unsafe { (*self.as_mut_ptr()).local_addr }
}
fn local_port(&self) -> u8 {
unsafe { (*self.as_mut_ptr()).csp_port }
}
fn is_initialized(&self) -> bool {
!self.closed.load(Ordering::Acquire) && unsafe { (*self.as_mut_ptr()).initialized }
}
fn shutdown(&self) {
let _guard = self.lifecycle_lock.lock().expect("lifecycle lock poisoned");
if self.closed.swap(true, Ordering::AcqRel) {
return;
}
unsafe {
cspcl_sys::cspcl_cleanup(self.as_mut_ptr());
}
}
fn ensure_initialized(&self) -> Result<()> {
if self.is_initialized() {
Ok(())
} else {
Err(Error::from_code(cspcl_sys::cspcl_error_t_CSPCL_ERR_NOT_INITIALIZED).unwrap_err())
}
}
pub(crate) fn connection_stats(&self) -> ConnectionStats {
let mut stats = cspcl_sys::cspcl_conn_pool_stats_t::default();
unsafe {
cspcl_sys::cspcl_conn_pool_get_stats(&(*self.as_mut_ptr()).conn_pool, &mut stats);
}
ConnectionStats {
hits: stats.hits,
misses: stats.misses,
evictions: stats.evictions,
connect_failures: stats.connect_failures,
invalidations: stats.invalidations,
}
}
}
impl Drop for RawCspcl {
fn drop(&mut self) {
self.shutdown();
}
}
unsafe impl Send for RawCspcl {}
unsafe impl Sync for RawCspcl {}
#[derive(Debug, Clone)]
pub struct CspclConfig {
local_addr: u8,
local_port: u8,
interface: Interface,
}
impl CspclConfig {
pub fn new(local_addr: u8) -> Self {
Self {
local_addr,
local_port: cspcl_sys::CSPCL_PORT_BP as u8,
interface: Interface::default(),
}
}
pub fn with_port(mut self, local_port: u8) -> Self {
self.local_port = local_port;
self
}
pub fn with_interface(mut self, interface: Interface) -> Self {
self.interface = interface;
self
}
}
#[derive(Clone)]
pub struct Cspcl {
raw: Arc<RawCspcl>,
}
impl Cspcl {
pub fn new(local_addr: u8, local_port: u8, interface: Interface) -> Result<Self> {
Self::from_config(
CspclConfig::new(local_addr)
.with_port(local_port)
.with_interface(interface),
)
}
pub fn from_config(config: CspclConfig) -> Result<Self> {
Ok(Self {
raw: Arc::new(RawCspcl::new(
config.local_addr,
config.local_port,
config.interface,
)?),
})
}
pub fn sender(&self) -> Sender {
Sender::new(Arc::clone(&self.raw))
}
pub fn receiver(&self) -> Receiver {
Receiver::new(Arc::clone(&self.raw))
}
pub fn split(&self) -> (Sender, Receiver) {
(self.sender(), self.receiver())
}
pub fn send_bundle(&self, bundle: &[u8], dest_addr: u8, dest_port: u8) -> Result<()> {
self.sender().send_bundle(bundle, dest_addr, dest_port)
}
pub fn recv_bundle(&self, timeout_ms: u32) -> Result<crate::bundle::ReceivedBundle> {
self.receiver().recv_bundle(timeout_ms)
}
pub fn recv_bundle_into(
&self,
buffer: &mut [u8],
timeout_ms: u32,
) -> Result<crate::bundle::ReceivedBundleView> {
self.receiver().recv_bundle_into(buffer, timeout_ms)
}
pub fn shutdown(&self) -> Result<()> {
self.raw.shutdown();
Ok(())
}
pub fn connection_stats(&self) -> ConnectionStats {
self.raw.connection_stats()
}
pub fn local_addr(&self) -> u8 {
self.raw.local_addr()
}
pub fn local_port(&self) -> u8 {
self.raw.local_port()
}
pub fn is_initialized(&self) -> bool {
self.raw.is_initialized()
}
}
pub(crate) fn recv_lock(raw: &Arc<RawCspcl>) -> &Mutex<()> {
&raw.recv_lock
}
pub(crate) fn ensure_initialized(raw: &Arc<RawCspcl>) -> Result<()> {
raw.ensure_initialized()
}
pub(crate) fn raw_ptr(raw: &Arc<RawCspcl>) -> *mut cspcl_sys::cspcl_t {
raw.as_mut_ptr()
}
pub(crate) type SharedRawCspcl = Arc<RawCspcl>;