use crate::codec::{CodecError, FramedIo, IntoEngineWriter, Message};
use crate::endpoint::Endpoint;
use crate::engine::backend::{GenericSocketBackend, HasRegistry};
use crate::engine::registry::{make_framed_engine, PeerRegistry};
use crate::engine::PeerEngine;
use crate::reconnect::{ReconnectConfig, ReconnectHandle};
use crate::PeerIdentity;
use crate::{MultiPeerBackend, SocketBackend};
use crate::{Socket, SocketEvent, SocketOptions, SocketRecv, SocketType, ZmqMessage, ZmqResult};
use bytes::{BufMut, BytesMut};
use flume::{Receiver, Sender};
use futures::channel::mpsc;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
#[allow(clippy::upper_case_acronyms)]
pub(crate) enum SubBackendMsgType {
UNSUBSCRIBE = 0,
SUBSCRIBE = 1,
}
#[doc(hidden)]
pub struct SubSocketBackend {
registry: PeerRegistry,
inbound_tx: Sender<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
pub(crate) inbound_rx: Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
#[cfg(feature = "inproc")]
inproc_inbound_tx: crate::engine::InprocInboundTx,
#[cfg(feature = "inproc")]
pub(crate) inproc_inbound_rx: crate::engine::InprocInboundRx,
#[cfg(feature = "inproc")]
pub(crate) inproc_notify: Arc<crate::async_rt::notify::RuntimeNotify>,
socket_options: SocketOptions,
pub(crate) socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
pub(crate) subs: Mutex<HashSet<Vec<u8>>>,
disconnect_notifiers: Mutex<HashMap<PeerIdentity, crate::engine::backend::DisconnectNotifier>>,
socket_type: SocketType,
}
impl SubSocketBackend {
pub(crate) fn with_options(options: SocketOptions, socket_type: SocketType) -> Self {
let (inbound_tx, inbound_rx) = flume::bounded(options.receive_hwm);
#[cfg(feature = "inproc")]
let (inproc_inbound_tx, inproc_inbound_rx) =
crossbeam_channel::bounded(options.receive_hwm);
#[cfg(feature = "inproc")]
let inproc_notify = Arc::new(crate::async_rt::notify::RuntimeNotify::new());
Self {
registry: PeerRegistry::new(),
inbound_tx,
inbound_rx,
#[cfg(feature = "inproc")]
inproc_inbound_tx,
#[cfg(feature = "inproc")]
inproc_inbound_rx,
#[cfg(feature = "inproc")]
inproc_notify,
socket_options: options,
socket_monitor: Mutex::new(None),
subs: Mutex::new(HashSet::new()),
disconnect_notifiers: Mutex::new(HashMap::new()),
socket_type,
}
}
pub(crate) fn create_subs_message(
subscription: &[u8],
msg_type: SubBackendMsgType,
) -> ZmqMessage {
let mut buf = BytesMut::with_capacity(subscription.len() + 1);
buf.put_u8(msg_type as u8);
buf.extend_from_slice(subscription);
buf.freeze().into()
}
pub(crate) fn register_disconnect_notifier(
&self,
peer_id: PeerIdentity,
notifier: crate::engine::backend::DisconnectNotifier,
) {
self.disconnect_notifiers.lock().insert(peer_id, notifier);
}
}
impl SocketBackend for SubSocketBackend {
fn socket_type(&self) -> SocketType {
self.socket_type
}
fn socket_options(&self) -> &SocketOptions {
&self.socket_options
}
fn shutdown(&self) {
self.registry.clear();
}
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>> {
&self.socket_monitor
}
}
impl HasRegistry for SubSocketBackend {
fn registry(&self) -> &PeerRegistry {
&self.registry
}
}
#[cfg(feature = "inproc")]
impl crate::engine::backend::HasInproc for SubSocketBackend {
#[inline]
fn inproc_inbound_rx(&self) -> &crossbeam_channel::Receiver<crate::engine::backend::TaggedMsg> {
&self.inproc_inbound_rx
}
#[inline]
fn inproc_notify(&self) -> &Arc<crate::async_rt::notify::RuntimeNotify> {
&self.inproc_notify
}
}
impl MultiPeerBackend for SubSocketBackend {
async fn peer_connected<R, W>(
self: Arc<Self>,
peer_id: &PeerIdentity,
io: FramedIo<R, W>,
_endpoint: Option<Endpoint>,
) where
R: futures::Stream<Item = Result<Message, CodecError>> + Unpin + Send + 'static,
W: futures::Sink<Message, Error = CodecError> + Unpin + Send + IntoEngineWriter + 'static,
W::Writer: Send + 'static,
{
#[cfg(feature = "curve")]
let (read_half, write_half, _curve) = io.into_parts();
#[cfg(not(feature = "curve"))]
let (read_half, write_half) = io.into_parts();
let inbound_tx = self.inbound_tx.clone();
let peer_id_for_spawn = peer_id.clone();
let writer = write_half.into_engine_writer();
let send_hwm = self.socket_options.send_hwm;
let (_key, prev) = self.registry.insert_with(peer_id.clone(), move |key| {
make_framed_engine(Arc::new(PeerEngine::spawn(
key,
peer_id_for_spawn,
read_half,
writer,
send_hwm,
inbound_tx,
crate::engine::peer_loop::PeerConfig::default(),
)))
});
drop(prev);
let engine = match self.registry.get_by_id(peer_id) {
Some((_, e)) => e,
None => return,
};
let subs_msgs: Vec<ZmqMessage> = {
let subs = self.subs.lock();
subs.iter()
.map(|s| Self::create_subs_message(s.as_slice(), SubBackendMsgType::SUBSCRIBE))
.collect()
};
for msg in subs_msgs {
if let Err(e) = engine.send_msg(msg).await {
log::error!("SUB replay failed for {:?}: {:?}", peer_id, e);
return;
}
}
if let Some(hello) = self.socket_options.hello_msg.clone() {
let _ = engine.try_send_oneshot(hello);
}
}
#[cfg(feature = "inproc")]
#[allow(private_interfaces)]
async fn peer_connected_inproc(
self: Arc<Self>,
peer_id: &PeerIdentity,
peer: crate::transport::inproc::InprocPeer,
_endpoint: Option<crate::endpoint::Endpoint>,
) -> crate::ZmqResult<()> {
let inproc_tx = self.inproc_inbound_tx.clone();
let inproc_notify = self.inproc_notify.clone();
let (local_key, prev) = self.registry.insert_with(peer_id.clone(), |_| {
crate::engine::registry::AnyEngine::Inproc(Arc::new(
crate::engine::inproc_placeholder_engine(),
))
});
drop(prev);
let local_socket_type = self.socket_type();
let local_routing_id = self.socket_options.peer_id.clone();
let (engine, _remote_routing_id) = match crate::engine::connect_inproc_engine(
local_key,
local_socket_type,
local_routing_id,
inproc_tx,
inproc_notify,
peer,
)
.await
{
Ok(pair) => pair,
Err(e) => {
self.peer_disconnected(peer_id);
return Err(e);
}
};
let engine = Arc::new(engine);
self.registry.replace_engine(
local_key,
crate::engine::registry::AnyEngine::Inproc(engine.clone()),
);
let subs_msgs: Vec<ZmqMessage> = {
let subs = self.subs.lock();
subs.iter()
.map(|s| Self::create_subs_message(s.as_slice(), SubBackendMsgType::SUBSCRIBE))
.collect()
};
for msg in subs_msgs {
if let Err(e) = engine.send_direct(msg) {
log::error!("SUB inproc replay failed for {:?}: {:?}", peer_id, e);
return Ok(());
}
}
if let Some(hello) = self.socket_options.hello_msg.clone() {
let _ = engine.try_send_direct(hello);
}
Ok(())
}
fn on_reconnect(&self, _peer_id: &PeerIdentity) {
if let Some(hiccup) = &self.socket_options.hiccup_msg {
let key = self.registry.any_key().unwrap_or(0);
let item = (key, Ok(Message::Message(hiccup.clone())));
let _ = self.inbound_tx.try_send(item);
}
}
fn peer_disconnected(&self, peer_id: &PeerIdentity) {
if let Some(disc) = &self.socket_options.disconnect_msg {
if let Some((_, engine)) = self.registry.get_by_id(peer_id) {
let _ = engine.try_send_oneshot(disc.clone());
}
}
if let Some(monitor) = self.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone()));
}
self.registry.remove_by_id(peer_id);
if let Some(mut notifier) = self.disconnect_notifiers.lock().remove(peer_id) {
let _ = notifier.try_send(peer_id.clone());
}
}
}
pub struct SubSocket {
common: crate::socket::common::SocketCommon<SubSocketBackend>,
reconnect_handles: Vec<ReconnectHandle>,
}
impl crate::socket::family::sealed::Sealed for SubSocket {}
impl crate::socket::family::Subscriber for SubSocket {}
impl Drop for SubSocket {
fn drop(&mut self) {
for handle in self.reconnect_handles.drain(..) {
handle.shutdown();
}
self.common.backend.shutdown();
}
}
impl crate::socket::common::HasCommon for SubSocket {
type Backend = SubSocketBackend;
fn common(&self) -> &crate::socket::common::SocketCommon<Self::Backend> {
&self.common
}
fn common_mut(&mut self) -> &mut crate::socket::common::SocketCommon<Self::Backend> {
&mut self.common
}
}
impl SubSocket {
pub async fn subscribe(&mut self, subscription: impl AsRef<[u8]>) -> ZmqResult<()> {
let subscription = subscription.as_ref();
self.common
.backend
.subs
.lock()
.insert(subscription.to_vec());
self.process_subs(subscription, SubBackendMsgType::SUBSCRIBE)
.await
}
pub async fn unsubscribe(&mut self, subscription: impl AsRef<[u8]>) -> ZmqResult<()> {
let subscription = subscription.as_ref();
self.common.backend.subs.lock().remove(subscription);
self.process_subs(subscription, SubBackendMsgType::UNSUBSCRIBE)
.await
}
async fn process_subs(
&mut self,
subscription: &[u8],
msg_type: SubBackendMsgType,
) -> ZmqResult<()> {
let message = SubSocketBackend::create_subs_message(subscription, msg_type);
let mut peers = Vec::new();
self.common.backend.registry.snapshot_into(&mut peers);
let mut dead: Vec<crate::engine::registry::PeerKey> = Vec::new();
for (key, engine) in &peers {
if let Err(e) = engine.send_msg_flushed(message.clone()).await {
log::debug!("SUB: sub fanout to key={} failed: {:?}", key, e);
dead.push(*key);
}
}
for key in dead {
if let Some(id) = self.common.backend.registry.id_for(key) {
self.common.backend.peer_disconnected(&id);
}
}
Ok(())
}
}
impl Socket for SubSocket {
type Backend = SubSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let backend = Arc::new(SubSocketBackend::with_options(options, SocketType::SUB));
Self {
common: crate::socket::common::SocketCommon::new(backend),
reconnect_handles: Vec::new(),
}
}
async fn linger_drain(&mut self) {
let opts = self.common.backend.socket_options();
crate::engine::registry::drain_registry(&self.common.backend.registry, opts).await;
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
let connect_timeout = self.common.backend.socket_options().connect_timeout;
let (resolved, peer_id) = crate::socket::handshake::connect_peer_forever(
endpoint.clone(),
self.common.backend.clone(),
connect_timeout,
)
.await?;
if let Some(monitor) = self.common.backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id.clone()));
}
let backend_for_closure = self.common.backend.clone();
let register_fn: crate::reconnect::RegisterDisconnectFn =
Box::new(move |peer_id, notifier| {
backend_for_closure.register_disconnect_notifier(peer_id, notifier);
});
let opts = self.common.backend.socket_options();
let reconnect_handle = crate::reconnect::spawn_reconnect_task(
endpoint,
self.common.backend.clone(),
peer_id,
register_fn,
ReconnectConfig {
initial_interval: opts.reconnect_interval,
max_interval: opts.reconnect_interval_max,
backoff_multiplier: 2.0,
},
);
self.reconnect_handles.push(reconnect_handle);
Ok(())
}
}
impl SocketRecv for SubSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.common.backend.socket_options().receive_timeout;
let (_peer_id, message) = GenericSocketBackend::recv_next_timed(
&self.common.backend.inbound_rx,
&*self.common.backend,
receive_timeout,
)
.await?;
Ok(message)
}
}