use crate::{
ZmqResult, sealed,
socket::{Socket, SocketType},
};
pub type DishSocket = Socket<Dish>;
pub struct Dish {}
impl sealed::ReceiverFlag for Dish {}
impl sealed::SocketType for Dish {
fn raw_socket_type() -> SocketType {
SocketType::Dish
}
}
unsafe impl Sync for Socket<Dish> {}
unsafe impl Send for Socket<Dish> {}
impl Socket<Dish> {
pub fn join<G>(&self, group: G) -> ZmqResult<()>
where
G: AsRef<str>,
{
self.socket.join(group.as_ref())
}
pub fn leave<G>(&self, group: G) -> ZmqResult<()>
where
G: AsRef<str>,
{
self.socket.leave(group.as_ref())
}
}
#[cfg(test)]
mod dish_tests {
use super::DishSocket;
use crate::prelude::{
Context, Message, RadioSocket, Receiver, RecvFlags, SendFlags, Sender, ZmqError, ZmqResult,
};
#[test]
fn join_joins_group() -> ZmqResult<()> {
let context = Context::new()?;
let socket = DishSocket::from_context(&context)?;
socket.join("asdf")?;
Ok(())
}
#[test]
fn join_when_already_joined() -> ZmqResult<()> {
let context = Context::new()?;
let socket = DishSocket::from_context(&context)?;
socket.join("asdf")?;
let result = socket.join("asdf");
assert!(result.is_err_and(|err| err == ZmqError::InvalidArgument));
Ok(())
}
#[test]
fn leave_leaves_group() -> ZmqResult<()> {
let context = Context::new()?;
let socket = DishSocket::from_context(&context)?;
socket.join("asdf")?;
socket.leave("asdf")?;
Ok(())
}
#[test]
fn leave_when_no_group_joined() -> ZmqResult<()> {
let context = Context::new()?;
let socket = DishSocket::from_context(&context)?;
let result = socket.leave("asdf");
assert!(result.is_err_and(|err| err == ZmqError::InvalidArgument));
Ok(())
}
#[test]
fn radio_dish() -> ZmqResult<()> {
let context = Context::new()?;
let radio = RadioSocket::from_context(&context)?;
radio.bind("tcp://127.0.0.1:*")?;
let dish_endpoint = radio.last_endpoint()?;
std::thread::spawn(move || {
loop {
let message: Message = "radio-msg".into();
message.set_group("asdf").unwrap();
radio.send_msg(message, SendFlags::DONT_WAIT).unwrap();
}
});
let dish = DishSocket::from_context(&context)?;
dish.connect(dish_endpoint)?;
dish.join("asdf")?;
let msg = dish.recv_msg(RecvFlags::empty())?;
assert_eq!(msg.group().unwrap(), "asdf");
assert_eq!(msg.to_string(), "radio-msg");
Ok(())
}
#[cfg(feature = "futures")]
#[test]
fn radio_dish_async() -> ZmqResult<()> {
let context = Context::new()?;
let radio = RadioSocket::from_context(&context)?;
radio.bind("tcp://127.0.0.1:*")?;
let dish_endpoint = radio.last_endpoint()?;
std::thread::spawn(move || {
futures::executor::block_on(async {
loop {
let message: Message = "radio-msg".into();
message.set_group("asdf").unwrap();
radio.send_msg_async(message, SendFlags::DONT_WAIT).await;
}
})
});
let dish = DishSocket::from_context(&context)?;
dish.connect(dish_endpoint)?;
dish.join("asdf")?;
futures::executor::block_on(async {
loop {
if let Some(msg) = dish.recv_msg_async().await {
assert_eq!(msg.group().unwrap(), "asdf");
assert_eq!(msg.to_string(), "radio-msg");
break;
}
}
});
Ok(())
}
}
#[cfg(feature = "builder")]
pub(crate) mod builder {
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use super::DishSocket;
use crate::{ZmqResult, context::Context, socket::SocketBuilder};
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)]
#[builder(
pattern = "owned",
name = "DishBuilder",
public,
build_fn(skip, error = "ZmqError"),
derive(PartialEq, Eq, Hash, Clone, serde::Serialize, serde::Deserialize)
)]
#[builder_struct_attr(doc = "Builder for [`DishSocket`].\n\n")]
#[allow(dead_code)]
struct DishConfig {
socket_builder: SocketBuilder,
#[builder(setter(into), default = "Default::default()")]
join: String,
}
impl DishBuilder {
pub fn apply(self, socket: &DishSocket) -> ZmqResult<()> {
if let Some(socket_builder) = self.socket_builder {
socket_builder.apply(socket)?;
}
self.join.iter().try_for_each(|join| socket.join(join))?;
Ok(())
}
pub fn build_from_context(self, context: &Context) -> ZmqResult<DishSocket> {
let socket = DishSocket::from_context(context)?;
self.apply(&socket)?;
Ok(socket)
}
}
#[cfg(test)]
mod dish_builder_tests {
use super::DishBuilder;
use crate::prelude::{Context, ZmqResult};
#[test]
fn default_dish_builder() -> ZmqResult<()> {
let context = Context::new()?;
DishBuilder::default().build_from_context(&context)?;
Ok(())
}
#[test]
fn dish_builder_with_custom_settings() -> ZmqResult<()> {
let context = Context::new()?;
DishBuilder::default()
.join("asdf")
.build_from_context(&context)?;
Ok(())
}
}
}