use crate::{addr::Endpoint, auth::*, core::*, error::*, Ctx, GroupOwned};
use libzmq_sys as sys;
use sys::errno;
use serde::{Deserialize, Serialize};
use std::{
ffi::{c_void, CString},
str,
sync::{Arc, Mutex},
time::Duration,
};
fn join(socket_mut_ptr: *mut c_void, group: &GroupOwned) -> Result<(), Error> {
let c_str = CString::new(group.as_str()).unwrap();
let rc = unsafe { sys::zmq_join(socket_mut_ptr, c_str.as_ptr()) };
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
let err = {
match errno {
errno::EINVAL => Error::new(ErrorKind::InvalidInput {
msg: "cannot join group twice",
}),
errno::ETERM => Error::new(ErrorKind::CtxTerminated),
errno::EINTR => Error::new(ErrorKind::Interrupted),
errno::ENOTSOCK => panic!("invalid socket"),
errno::EMTHREAD => panic!("no i/o thread available"),
_ => panic!(msg_from_errno(errno)),
}
};
Err(err)
} else {
Ok(())
}
}
fn leave(socket_mut_ptr: *mut c_void, group: &GroupOwned) -> Result<(), Error> {
let c_str = CString::new(group.as_str()).unwrap();
let rc = unsafe { sys::zmq_leave(socket_mut_ptr, c_str.as_ptr()) };
if rc == -1 {
let errno = unsafe { sys::zmq_errno() };
let err = {
match errno {
errno::EINVAL => Error::new(ErrorKind::InvalidInput {
msg: "cannot leave a group that wasn't joined",
}),
errno::ETERM => Error::new(ErrorKind::CtxTerminated),
errno::EINTR => Error::new(ErrorKind::Interrupted),
errno::ENOTSOCK => panic!("invalid socket"),
errno::EMTHREAD => panic!("no i/o thread available"),
_ => panic!(msg_from_errno(errno)),
}
};
Err(err)
} else {
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Dish {
inner: Arc<RawSocket>,
groups: Arc<Mutex<Vec<GroupOwned>>>,
}
impl Dish {
pub fn new() -> Result<Self, Error> {
let inner = Arc::new(RawSocket::new(RawSocketType::Dish)?);
Ok(Self {
inner,
groups: Arc::default(),
})
}
pub fn with_ctx<C>(ctx: C) -> Result<Self, Error>
where
C: Into<Ctx>,
{
let ctx: Ctx = ctx.into();
let inner = Arc::new(RawSocket::with_ctx(RawSocketType::Dish, ctx)?);
Ok(Self {
inner,
groups: Arc::default(),
})
}
pub fn ctx(&self) -> &crate::Ctx {
self.inner.ctx()
}
pub fn join<I, G>(&self, groups: I) -> Result<(), Error<usize>>
where
I: IntoIterator<Item = G>,
G: Into<GroupOwned>,
{
let mut count = 0;
let mut guard = self.groups.lock().unwrap();
for group in groups.into_iter() {
let group = group.into();
join(self.raw_socket().as_mut_ptr(), &group)
.map_err(|err| Error::with_content(err.kind(), count))?;
guard.push(group);
count += 1;
}
Ok(())
}
pub fn joined(&self) -> Vec<GroupOwned> {
self.groups.lock().unwrap().to_owned()
}
pub fn leave<I, G>(&self, groups: I) -> Result<(), Error<usize>>
where
I: IntoIterator<Item = G>,
G: Into<GroupOwned>,
{
let mut count = 0;
let mut guard = self.groups.lock().unwrap();
for group in groups.into_iter() {
let group = group.into();
leave(self.raw_socket().as_mut_ptr(), &group)
.map_err(|err| Error::with_content(err.kind(), count))?;
let position = guard.iter().position(|g| g == &group).unwrap();
guard.remove(position);
count += 1;
}
Ok(())
}
}
impl PartialEq for Dish {
fn eq(&self, other: &Dish) -> bool {
self.inner == other.inner
}
}
impl Eq for Dish {}
impl GetRawSocket for Dish {
fn raw_socket(&self) -> &RawSocket {
&self.inner
}
}
impl Socket for Dish {}
impl RecvMsg for Dish {}
unsafe impl Send for Dish {}
unsafe impl Sync for Dish {}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(from = "FlatDishConfig")]
#[serde(into = "FlatDishConfig")]
pub struct DishConfig {
socket_config: SocketConfig,
recv_config: RecvConfig,
groups: Option<Vec<GroupOwned>>,
}
impl DishConfig {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<Dish, Error<usize>> {
self.with_ctx(Ctx::global())
}
pub fn with_ctx<C>(&self, ctx: C) -> Result<Dish, Error<usize>>
where
C: Into<Ctx>,
{
let ctx: Ctx = ctx.into();
let dish = Dish::with_ctx(ctx).map_err(Error::cast)?;
self.apply(&dish)?;
Ok(dish)
}
pub fn groups(&self) -> Option<&[GroupOwned]> {
self.groups.as_ref().map(Vec::as_slice)
}
pub fn set_groups<I>(&mut self, maybe_groups: Option<I>)
where
I: IntoIterator<Item = GroupOwned>,
{
let groups = maybe_groups.map(|g| g.into_iter().collect());
self.groups = groups;
}
pub fn apply(&self, dish: &Dish) -> Result<(), Error<usize>> {
if let Some(ref groups) = self.groups {
dish.join(groups)?;
}
self.recv_config.apply(dish).map_err(Error::cast)?;
self.socket_config.apply(dish)?;
Ok(())
}
}
#[derive(Clone, Serialize, Deserialize)]
struct FlatDishConfig {
connect: Option<Vec<Endpoint>>,
bind: Option<Vec<Endpoint>>,
backlog: Option<i32>,
#[serde(default)]
#[serde(with = "humantime_serde")]
heartbeat_interval: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
heartbeat_timeout: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
heartbeat_ttl: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
linger: Option<Duration>,
recv_high_water_mark: Option<i32>,
#[serde(default)]
#[serde(with = "humantime_serde")]
recv_timeout: Option<Duration>,
groups: Option<Vec<GroupOwned>>,
mechanism: Option<Mechanism>,
}
impl From<DishConfig> for FlatDishConfig {
fn from(config: DishConfig) -> Self {
let socket_config = config.socket_config;
let recv_config = config.recv_config;
Self {
connect: socket_config.connect,
bind: socket_config.bind,
backlog: socket_config.backlog,
heartbeat_interval: socket_config.heartbeat_interval,
heartbeat_timeout: socket_config.heartbeat_timeout,
heartbeat_ttl: socket_config.heartbeat_ttl,
linger: socket_config.linger,
mechanism: socket_config.mechanism,
recv_high_water_mark: recv_config.recv_high_water_mark,
recv_timeout: recv_config.recv_timeout,
groups: config.groups,
}
}
}
impl From<FlatDishConfig> for DishConfig {
fn from(flat: FlatDishConfig) -> Self {
let socket_config = SocketConfig {
connect: flat.connect,
bind: flat.bind,
backlog: flat.backlog,
heartbeat_interval: flat.heartbeat_interval,
heartbeat_timeout: flat.heartbeat_timeout,
heartbeat_ttl: flat.heartbeat_ttl,
linger: flat.linger,
mechanism: flat.mechanism,
};
let recv_config = RecvConfig {
recv_high_water_mark: flat.recv_high_water_mark,
recv_timeout: flat.recv_timeout,
};
Self {
socket_config,
recv_config,
groups: flat.groups,
}
}
}
impl GetSocketConfig for DishConfig {
fn socket_config(&self) -> &SocketConfig {
&self.socket_config
}
fn socket_config_mut(&mut self) -> &mut SocketConfig {
&mut self.socket_config
}
}
impl ConfigureSocket for DishConfig {}
impl GetRecvConfig for DishConfig {
fn recv_config(&self) -> &RecvConfig {
&self.recv_config
}
fn recv_config_mut(&mut self) -> &mut RecvConfig {
&mut self.recv_config
}
}
impl ConfigureRecv for DishConfig {}
#[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DishBuilder {
inner: DishConfig,
}
impl DishBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(&self) -> Result<Dish, Error<usize>> {
self.inner.build()
}
pub fn with_ctx<C>(&self, ctx: C) -> Result<Dish, Error<usize>>
where
C: Into<Ctx>,
{
self.inner.with_ctx(ctx)
}
pub fn join<I, G>(&mut self, groups: I) -> &mut Self
where
I: IntoIterator<Item = G>,
G: Into<GroupOwned>,
{
let groups: Vec<GroupOwned> = groups.into_iter().map(G::into).collect();
self.inner.set_groups(Some(groups));
self
}
}
impl GetSocketConfig for DishBuilder {
fn socket_config(&self) -> &SocketConfig {
self.inner.socket_config()
}
fn socket_config_mut(&mut self) -> &mut SocketConfig {
self.inner.socket_config_mut()
}
}
impl BuildSocket for DishBuilder {}
impl GetRecvConfig for DishBuilder {
fn recv_config(&self) -> &RecvConfig {
self.inner.recv_config()
}
fn recv_config_mut(&mut self) -> &mut RecvConfig {
self.inner.recv_config_mut()
}
}
impl BuildRecv for DishBuilder {}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_ser_de() {
let config = DishConfig::new();
let ron = ron::ser::to_string(&config).unwrap();
let de: DishConfig = ron::de::from_str(&ron).unwrap();
assert_eq!(config, de);
}
#[test]
fn test_dish() {
use crate::{prelude::*, TcpAddr, *};
use std::{convert::TryInto, thread};
let addr: TcpAddr = "127.0.0.1:*".try_into().unwrap();
let radio = RadioBuilder::new().bind(addr).build().unwrap();
let bound = radio.last_endpoint().unwrap();
let a: &Group = "group a".try_into().unwrap();
let dish = DishBuilder::new().connect(bound).join(a).build().unwrap();
thread::spawn(move || {
let a: &Group = "group a".try_into().unwrap();
let b: &Group = "group b".try_into().unwrap();
let mut count = 0;
loop {
let mut msg = Msg::new();
let group = {
if count % 2 == 0 {
a
} else {
b
}
};
msg.set_group(group);
radio.send(msg).unwrap();
std::thread::sleep(Duration::from_millis(1));
count += 1;
}
});
let msg = dish.recv_msg().unwrap();
assert_eq!(msg.group().unwrap(), a);
let msg = dish.recv_msg().unwrap();
assert_eq!(msg.group().unwrap(), a);
}
}