use crate::codec::{CodecError, FramedIo, IntoEngineWriter, Message};
use crate::engine::backend::{GenericSocketBackend, HasRegistry};
#[cfg(feature = "inproc")]
use crate::engine::registry::AnyEngine;
use crate::engine::registry::{make_framed_engine, PeerRegistry};
use crate::engine::PeerEngine;
use crate::CaptureSocket;
use crate::PeerIdentity;
use crate::{
MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions, SocketRecv, SocketSend,
SocketType, ZmqMessage, ZmqResult,
};
use flume::{Receiver, Sender};
use futures::channel::mpsc;
use parking_lot::Mutex;
use std::sync::Arc;
type TaggedInbound = (
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
);
#[doc(hidden)]
pub struct XPubSocketBackend {
registry: PeerRegistry,
router: crate::socket::topic_router::TopicRouter,
inbound_tx: Sender<TaggedInbound>,
#[cfg(feature = "inproc")]
inproc_inbound_tx: crate::engine::InprocInboundTx,
#[cfg(feature = "inproc")]
pub(crate) inproc_inbound_rx: crate::engine::InprocInboundRx,
#[cfg(feature = "inproc")]
pub(crate) inproc_notify: Arc<crate::async_rt::notify::RuntimeNotify>,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
}
impl XPubSocketBackend {
fn with_options(options: SocketOptions) -> (Self, Receiver<TaggedInbound>) {
let (inbound_tx, inbound_rx) = flume::bounded(options.receive_hwm);
#[cfg(feature = "inproc")]
let (inproc_inbound_tx, inproc_inbound_rx) =
crossbeam_channel::bounded(options.receive_hwm);
#[cfg(feature = "inproc")]
let inproc_notify = Arc::new(crate::async_rt::notify::RuntimeNotify::new());
(
Self {
registry: PeerRegistry::new(),
router: crate::socket::topic_router::TopicRouter::new(),
inbound_tx,
#[cfg(feature = "inproc")]
inproc_inbound_tx,
#[cfg(feature = "inproc")]
inproc_inbound_rx,
#[cfg(feature = "inproc")]
inproc_notify,
socket_options: options,
socket_monitor: Mutex::new(None),
},
inbound_rx,
)
}
fn apply_sub_message(&self, peer_key: crate::engine::registry::PeerKey, data: &[u8]) -> bool {
use crate::socket::topic_router::SubChange;
matches!(
self.router.apply_sub_message(peer_key, data),
Some(SubChange::DuplicateSub | SubChange::DuplicateUnsub)
)
}
}
impl SocketBackend for XPubSocketBackend {
fn socket_type(&self) -> SocketType {
SocketType::XPUB
}
fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}
fn shutdown(&self) {
self.registry.clear();
self.router.clear();
}
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}
impl HasRegistry for XPubSocketBackend {
fn registry(&self) -> &PeerRegistry {
&self.registry
}
}
#[cfg(feature = "inproc")]
impl crate::engine::backend::HasInproc for XPubSocketBackend {
#[inline]
fn inproc_inbound_rx(&self) -> &crossbeam_channel::Receiver<crate::engine::backend::TaggedMsg> {
&self.inproc_inbound_rx
}
#[inline]
fn inproc_notify(&self) -> &Arc<crate::async_rt::notify::RuntimeNotify> {
&self.inproc_notify
}
}
impl MultiPeerBackend for XPubSocketBackend {
async fn peer_connected<R, W>(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo<R, W>,
_endpoint: Option<crate::endpoint::Endpoint>,
) where
R: futures::Stream<Item = Result<Message, CodecError>> + Unpin + Send + 'static,
W: futures::Sink<Message, Error = CodecError> + Unpin + Send + IntoEngineWriter + 'static,
W::Writer: Send + 'static,
{
#[cfg(feature = "curve")]
let (read_half, write_half, _curve) = io.into_parts();
#[cfg(not(feature = "curve"))]
let (read_half, write_half) = io.into_parts();
let inbound_tx = self.inbound_tx.clone();
let peer_id_owned = peer_id.clone();
let writer = write_half.into_engine_writer();
let send_hwm = self.socket_options.send_hwm;
let (key, _prev) = self.registry.insert_with(peer_id.clone(), |key| {
make_framed_engine(Arc::new(PeerEngine::spawn(
key,
peer_id_owned,
read_half,
writer,
send_hwm,
inbound_tx,
crate::engine::peer_loop::PeerConfig::default(),
)))
});
self.router.register_peer(key);
if let Some(hello) = &self.socket_options.hello_msg {
if let Some((_, engine)) = self.registry.get_by_id(peer_id) {
let _ = engine.try_send_oneshot(hello.clone());
}
}
}
#[cfg(feature = "inproc")]
#[allow(private_interfaces)]
async fn peer_connected_inproc(
self: Arc<Self>,
peer_id: &PeerIdentity,
peer: crate::transport::inproc::InprocPeer,
_endpoint: Option<crate::endpoint::Endpoint>,
) -> crate::ZmqResult<()> {
let inproc_tx = self.inproc_inbound_tx.clone();
let inproc_notify = self.inproc_notify.clone();
let (local_key, _) = self.registry.insert_with(peer_id.clone(), |_| {
AnyEngine::Inproc(Arc::new(crate::engine::inproc_placeholder_engine()))
});
self.router.register_peer(local_key);
let local_socket_type = self.socket_type();
let local_routing_id = self.socket_options.peer_id.clone();
let (engine, _remote_routing_id) = match crate::engine::connect_inproc_engine(
local_key,
local_socket_type,
local_routing_id,
inproc_tx,
inproc_notify,
peer,
)
.await
{
Ok(pair) => pair,
Err(e) => {
self.peer_disconnected(peer_id);
return Err(e);
}
};
let engine = Arc::new(engine);
self.registry
.replace_engine(local_key, AnyEngine::Inproc(engine.clone()));
if let Some(hello) = &self.socket_options.hello_msg {
let _ = engine.try_send_direct(hello.clone());
}
Ok(())
}
fn peer_disconnected(&self, peer_id: &PeerIdentity) {
if let Some(disc) = &self.socket_options.disconnect_msg {
if let Some((_, engine)) = self.registry.get_by_id(peer_id) {
let _ = engine.try_send_oneshot(disc.clone());
}
}
if let Some((key, _)) = self.registry.remove_by_id(peer_id) {
self.router.forget_peer(key);
}
}
}
pub struct XPubSocket {
pub(crate) common: crate::socket::common::SocketCommon<XPubSocketBackend>,
inbound: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
peer_buf: Vec<(
crate::engine::registry::PeerKey,
crate::engine::registry::AnyEngine,
)>,
dead_buf: Vec<crate::engine::registry::PeerKey>,
}
impl crate::socket::family::sealed::Sealed for XPubSocket {}
impl crate::socket::family::Publisher for XPubSocket {}
impl crate::socket::family::ExtendedPublisher for XPubSocket {}
impl Drop for XPubSocket {
fn drop(&mut self) {
self.common.backend.shutdown();
}
}
impl crate::socket::common::HasCommon for XPubSocket {
type Backend = XPubSocketBackend;
fn common(&self) -> &crate::socket::common::SocketCommon<Self::Backend> {
&self.common
}
fn common_mut(&mut self) -> &mut crate::socket::common::SocketCommon<Self::Backend> {
&mut self.common
}
}
impl Socket for XPubSocket {
type Backend = XPubSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let (backend, inbound) = XPubSocketBackend::with_options(options);
let backend = Arc::new(backend);
Self {
common: crate::socket::common::SocketCommon::new(backend),
inbound,
peer_buf: Vec::new(),
dead_buf: Vec::new(),
}
}
async fn linger_drain(&mut self) {
let opts = self.common.backend.socket_options();
crate::engine::registry::drain_registry(&self.common.backend.registry, opts).await;
}
}
impl SocketSend for XPubSocket {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let message = message.into();
let first_frame = match message.get(0) {
Some(frame) => frame.clone(),
None => return Ok(()),
};
let shared = Arc::new(message);
self.common
.backend
.registry
.snapshot_into(&mut self.peer_buf);
self.dead_buf.clear();
let invert = self.common.backend.socket_options.invert_matching;
let dead = &mut self.dead_buf;
self.common.backend.router.with_match_guard(|m| {
for (key, engine) in self.peer_buf.iter() {
if !m.matches(*key, &first_frame, invert) {
continue;
}
use crate::engine::registry::TrySendOutcome;
match engine.try_send_fanout(shared.clone()) {
TrySendOutcome::Sent | TrySendOutcome::Full => {}
TrySendOutcome::Closed => dead.push(*key),
}
}
});
if !self.dead_buf.is_empty() {
for key in self.dead_buf.drain(..) {
if let Some(id) = self.common.backend.registry.id_for(key) {
self.common.backend.peer_disconnected(&id);
}
}
}
crate::async_rt::task::yield_now().await;
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum XPubEvent {
Subscribe { topic: bytes::Bytes },
Unsubscribe { topic: bytes::Bytes },
Other(ZmqMessage),
}
impl SocketRecv for XPubSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.common.backend.socket_options().receive_timeout;
let only_first = !self.common.backend.socket_options().xpub_verbose;
loop {
let (key, m) = GenericSocketBackend::recv_next_timed(
&self.inbound,
&*self.common.backend,
receive_timeout,
)
.await?;
if m.len() == 1 {
let frame = m.get(0).unwrap();
let duplicate = self.common.backend.apply_sub_message(key, frame);
if duplicate && only_first {
continue;
}
}
return Ok(m);
}
}
}
impl XPubSocket {
pub async fn recv_event(&mut self) -> ZmqResult<XPubEvent> {
let msg = self.recv().await?;
if msg.len() == 1 {
let frame = msg.first().unwrap();
if !frame.is_empty() {
match frame[0] {
1 => {
return Ok(XPubEvent::Subscribe {
topic: frame.slice(1..),
})
}
0 => {
return Ok(XPubEvent::Unsubscribe {
topic: frame.slice(1..),
})
}
_ => {}
}
}
}
Ok(XPubEvent::Other(msg))
}
}
impl CaptureSocket for XPubSocket {}
#[cfg(all(test, feature = "tokio", feature = "tcp"))]
mod tests {
use super::*;
use crate::async_rt;
use crate::socket::handshake::tests::{
test_bind_to_any_port_helper, test_bind_to_unspecified_interface_helper,
};
use std::net::IpAddr;
#[async_rt::test]
async fn test_bind_to_any_port() -> ZmqResult<()> {
let s = XPubSocket::new();
test_bind_to_any_port_helper(s).await
}
#[async_rt::test]
async fn test_bind_to_any_ipv4_interface() -> ZmqResult<()> {
let any_ipv4: IpAddr = "0.0.0.0".parse().unwrap();
let s = XPubSocket::new();
test_bind_to_unspecified_interface_helper(any_ipv4, s, 4020).await
}
#[async_rt::test]
async fn test_bind_to_any_ipv6_interface() -> ZmqResult<()> {
let any_ipv6: IpAddr = "::".parse().unwrap();
let s = XPubSocket::new();
test_bind_to_unspecified_interface_helper(any_ipv6, s, 4030).await
}
}