#[cfg(feature = "draft-api")]
use bitflags::bitflags;
use crate::{
ZmqResult, sealed,
socket::{MultipartReceiver, MultipartSender, Socket, SocketOption, SocketType},
};
pub type RouterSocket = Socket<Router>;
pub struct Router {}
impl sealed::SenderFlag for Router {}
impl sealed::ReceiverFlag for Router {}
impl sealed::SocketType for Router {
fn raw_socket_type() -> SocketType {
SocketType::Router
}
}
unsafe impl Sync for Socket<Router> {}
unsafe impl Send for Socket<Router> {}
impl MultipartSender for Socket<Router> {}
impl MultipartReceiver for Socket<Router> {}
#[cfg(feature = "draft-api")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
#[cfg_attr(feature = "builder", derive(serde::Serialize, serde::Deserialize))]
pub struct RouterNotify(i32);
#[cfg(feature = "draft-api")]
bitflags! {
impl RouterNotify: i32 {
const NotifyConnect = 0b0000_0000_0000_0001;
const NotifyDisconnect = 0b0000_0000_0000_0010;
}
}
impl Socket<Router> {
pub fn set_routing_id<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::RoutingId, value)
}
pub fn routing_id(&self) -> ZmqResult<String> {
self.get_sockopt_string(SocketOption::RoutingId)
}
pub fn set_connect_routing_id<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::ConnectRoutingId, value)
}
pub fn set_probe_router(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::ProbeRouter, value)
}
pub fn set_router_handover(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::RouterHandover, value)
}
pub fn set_router_mandatory(&self, value: bool) -> ZmqResult<()> {
self.set_sockopt_bool(SocketOption::RouterMandatory, value)
}
#[cfg(feature = "draft-api")]
pub fn set_disconnect_message<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::DisconnectMessage, value)
}
#[cfg(feature = "draft-api")]
pub fn set_hello_message<V>(&self, value: V) -> ZmqResult<()>
where
V: AsRef<str>,
{
self.set_sockopt_string(SocketOption::HelloMessage, value)
}
#[cfg(feature = "draft-api")]
pub fn set_router_notify(&self, value: RouterNotify) -> ZmqResult<()> {
self.set_sockopt_int(SocketOption::RouterNotify, value.bits())
}
#[cfg(feature = "draft-api")]
pub fn router_notify(&self) -> ZmqResult<RouterNotify> {
self.get_sockopt_int(SocketOption::RouterNotify)
.map(RouterNotify::from_bits_truncate)
}
}
#[cfg(test)]
mod router_tests {
#[cfg(feature = "draft-api")]
use super::RouterNotify;
use super::RouterSocket;
use crate::prelude::{
Context, DealerSocket, Message, MultipartReceiver, MultipartSender, RecvFlags, SendFlags,
ZmqResult,
};
#[test]
fn set_routing_id_sets_routing_id() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_routing_id("asdf")?;
assert_eq!(socket.routing_id()?, "asdf");
Ok(())
}
#[test]
fn set_connect_routing_id_sets_connect_routing_id() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_connect_routing_id("asdf")?;
Ok(())
}
#[test]
fn set_probe_router_sets_probe_router() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_probe_router(true)?;
Ok(())
}
#[test]
fn set_router_handover_sets_router_handover() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_router_handover(true)?;
Ok(())
}
#[test]
fn set_router_mandatory_sets_router_mandatory() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_router_mandatory(true)?;
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_disconnect_message_sets_disconnect_message() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_disconnect_message("asdf")?;
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_hello_message_sets_hello_message() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_hello_message("asdf")?;
Ok(())
}
#[cfg(feature = "draft-api")]
#[test]
fn set_router_notify_sets_router_notify() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterSocket::from_context(&context)?;
socket.set_router_notify(RouterNotify::NotifyConnect | RouterNotify::NotifyDisconnect)?;
assert_eq!(
socket.router_notify()?,
RouterNotify::NotifyConnect | RouterNotify::NotifyDisconnect
);
Ok(())
}
#[test]
fn dealer_router() -> ZmqResult<()> {
let context = Context::new()?;
let router = RouterSocket::from_context(&context)?;
router.bind("tcp://127.0.0.1:*")?;
let dealer_endpoint = router.last_endpoint()?;
std::thread::spawn(move || {
let mut multipart = router.recv_multipart(RecvFlags::empty()).unwrap();
let msg = multipart.pop_back().unwrap();
assert_eq!(msg.to_string(), "Hello");
multipart.push_back("World".into());
router
.send_multipart(multipart, SendFlags::empty())
.unwrap();
});
let dealer = DealerSocket::from_context(&context)?;
dealer.connect(dealer_endpoint)?;
let multipart: Vec<Message> = vec![vec![].into(), "Hello".into()];
dealer.send_multipart(multipart, SendFlags::empty())?;
let mut msg = dealer.recv_multipart(RecvFlags::empty()).unwrap();
assert_eq!(msg.pop_back().unwrap().to_string(), "World");
Ok(())
}
#[cfg(feature = "futures")]
#[test]
fn dealer_router_async() -> ZmqResult<()> {
let context = Context::new()?;
let router = RouterSocket::from_context(&context)?;
router.bind("tcp://127.0.0.1:*")?;
let dealer_endpoint = router.last_endpoint()?;
std::thread::spawn(move || {
futures::executor::block_on(async {
let mut multipart = router.recv_multipart_async().await;
let msg = multipart.pop_back().unwrap();
assert_eq!(msg.to_string(), "Hello");
multipart.push_back("World".into());
router
.send_multipart_async(multipart, SendFlags::empty())
.await;
})
});
let dealer = DealerSocket::from_context(&context)?;
dealer.connect(dealer_endpoint)?;
futures::executor::block_on(async {
let multipart: Vec<Message> = vec![vec![].into(), "Hello".into()];
dealer
.send_multipart_async(multipart, SendFlags::empty())
.await;
let mut msg = dealer.recv_multipart_async().await;
assert_eq!(msg.pop_back().unwrap().to_string(), "World");
Ok(())
})
}
}
#[cfg(feature = "builder")]
pub(crate) mod builder {
use core::default::Default;
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
#[cfg(feature = "draft-api")]
use super::RouterNotify;
use super::RouterSocket;
use crate::{ZmqResult, context::Context, socket::SocketBuilder};
#[derive(Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Builder)]
#[builder(
pattern = "owned",
name = "RouterBuilder",
public,
build_fn(skip, error = "ZmqError"),
derive(PartialEq, Eq, Hash, Clone, serde::Serialize, serde::Deserialize)
)]
#[builder_struct_attr(doc = "Builder for [`RouterSocket`].\n\n")]
#[allow(dead_code)]
struct RouterConfig {
socket_builder: SocketBuilder,
#[cfg(feature = "draft-api")]
#[builder(setter(into), default = "Default::default()")]
hello_message: String,
#[cfg(feature = "draft-api")]
#[builder(setter(into), default = "Default::default()")]
disconnect_message: String,
#[cfg(feature = "draft-api")]
#[builder(setter(into), default = "RouterNotify::empty()")]
router_notify: RouterNotify,
#[builder(setter(into), default = "Default::default()")]
routing_id: String,
#[builder(default = false)]
router_mandatory: bool,
#[builder(default = false)]
router_handover: bool,
#[builder(setter(into), default = "Default::default()")]
connect_routing_id: String,
}
impl RouterBuilder {
pub fn apply(self, socket: &RouterSocket) -> ZmqResult<()> {
if let Some(socket_builder) = self.socket_builder {
socket_builder.apply(socket)?;
}
#[cfg(feature = "draft-api")]
self.hello_message
.iter()
.try_for_each(|message| socket.set_hello_message(message))?;
#[cfg(feature = "draft-api")]
self.disconnect_message
.iter()
.try_for_each(|message| socket.set_disconnect_message(message))?;
#[cfg(feature = "draft-api")]
self.router_notify
.iter()
.try_for_each(|notify| socket.set_router_notify(*notify))?;
self.routing_id
.iter()
.try_for_each(|routing_id| socket.set_routing_id(routing_id))?;
self.router_mandatory
.iter()
.try_for_each(|router_mandatory| socket.set_router_mandatory(*router_mandatory))?;
self.router_handover
.iter()
.try_for_each(|router_handover| socket.set_router_handover(*router_handover))?;
self.connect_routing_id
.iter()
.try_for_each(|connect_routing_id| {
socket.set_connect_routing_id(connect_routing_id)
})?;
Ok(())
}
pub fn build_from_context(self, context: &Context) -> ZmqResult<RouterSocket> {
let socket = RouterSocket::from_context(context)?;
self.apply(&socket)?;
Ok(socket)
}
}
#[cfg(test)]
mod router_builder_tests {
use super::RouterBuilder;
#[cfg(feature = "draft-api")]
use super::RouterNotify;
use crate::prelude::{Context, SocketBuilder, ZmqResult};
#[test]
fn default_router_builder() -> ZmqResult<()> {
let context = Context::new()?;
let socket = RouterBuilder::default().build_from_context(&context)?;
assert_eq!(socket.routing_id()?, "");
#[cfg(feature = "draft-api")]
assert_eq!(socket.router_notify()?, RouterNotify::empty());
Ok(())
}
#[test]
fn router_builder_with_custom_values() -> ZmqResult<()> {
let context = Context::new()?;
let builder = RouterBuilder::default()
.socket_builder(SocketBuilder::default())
.routing_id("asdf")
.router_handover(true)
.router_mandatory(true)
.connect_routing_id("1234");
#[cfg(feature = "draft-api")]
let builder = builder
.router_notify(RouterNotify::NotifyConnect | RouterNotify::NotifyDisconnect)
.disconnect_message("byebye")
.hello_message("hello");
let socket = builder.build_from_context(&context)?;
assert_eq!(socket.routing_id()?, "asdf");
#[cfg(feature = "draft-api")]
assert_eq!(
socket.router_notify()?,
RouterNotify::NotifyConnect | RouterNotify::NotifyDisconnect
);
Ok(())
}
}
}