use crate::engine::backend::{GenericSocketBackend, HasRegistry};
use crate::reconnect::{ReconnectConfig, ReconnectHandle};
use crate::socket::common::{HasCommon, SocketCommon};
use crate::socket::kind::sub::{SubBackendMsgType, SubSocketBackend};
use crate::{
MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions, SocketRecv, SocketSend,
SocketType, ZmqMessage, ZmqResult,
};
use std::sync::Arc;
pub struct XSubSocket {
common: SocketCommon<SubSocketBackend>,
reconnect_handles: Vec<ReconnectHandle>,
send_count: u32,
}
impl crate::socket::family::sealed::Sealed for XSubSocket {}
impl crate::socket::family::Subscriber for XSubSocket {}
impl Drop for XSubSocket {
fn drop(&mut self) {
for handle in self.reconnect_handles.drain(..) {
handle.shutdown();
}
self.common.backend.shutdown();
}
}
impl HasCommon for XSubSocket {
type Backend = SubSocketBackend;
fn common(&self) -> &SocketCommon<Self::Backend> {
&self.common
}
fn common_mut(&mut self) -> &mut SocketCommon<Self::Backend> {
&mut self.common
}
}
impl Socket for XSubSocket {
type Backend = SubSocketBackend;
fn with_options(options: SocketOptions) -> Self {
let backend = Arc::new(SubSocketBackend::with_options(options, SocketType::XSUB));
Self {
common: SocketCommon::new(backend),
reconnect_handles: Vec::new(),
send_count: 0,
}
}
async fn linger_drain(&mut self) {
let opts = self.common.backend.socket_options();
crate::engine::registry::drain_registry(self.common.backend.registry(), opts).await;
}
async fn connect<E>(&mut self, endpoint: E) -> ZmqResult<()>
where
E: TryInto<crate::endpoint::Endpoint> + Send,
E::Error: Into<crate::ZmqError>,
{
let endpoint = endpoint.try_into().map_err(Into::into)?;
let connect_timeout = self.common.backend.socket_options().connect_timeout;
let (resolved, peer_id) = crate::socket::handshake::connect_peer_forever(
endpoint.clone(),
self.common.backend.clone(),
connect_timeout,
)
.await?;
if let Some(monitor) = self.common.backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(resolved, peer_id.clone()));
}
let backend_for_closure = self.common.backend.clone();
let register_fn: crate::reconnect::RegisterDisconnectFn =
Box::new(move |peer_id, notifier| {
backend_for_closure.register_disconnect_notifier(peer_id, notifier);
});
let opts = self.common.backend.socket_options();
let reconnect_handle = crate::reconnect::spawn_reconnect_task(
endpoint,
self.common.backend.clone(),
peer_id,
register_fn,
ReconnectConfig {
initial_interval: opts.reconnect_interval,
max_interval: opts.reconnect_interval_max,
backoff_multiplier: 2.0,
},
);
self.reconnect_handles.push(reconnect_handle);
Ok(())
}
}
impl SocketSend for XSubSocket {
async fn send(&mut self, message: impl Into<ZmqMessage> + Send) -> ZmqResult<()> {
let message = message.into();
let first = message.get(0);
let (cmd, topic): (Option<SubBackendMsgType>, Option<Vec<u8>>) = match first {
Some(frame) if !frame.is_empty() => match frame[0] {
1 => (
Some(SubBackendMsgType::SUBSCRIBE),
Some(frame[1..].to_vec()),
),
0 => (
Some(SubBackendMsgType::UNSUBSCRIBE),
Some(frame[1..].to_vec()),
),
_ => (None, None),
},
_ => (None, None),
};
if let (Some(cmd_kind), Some(topic_bytes)) = (cmd, topic) {
let mut subs = self.common.backend.subs.lock();
match cmd_kind {
SubBackendMsgType::SUBSCRIBE => {
subs.insert(topic_bytes);
}
SubBackendMsgType::UNSUBSCRIBE => {
subs.remove(&topic_bytes);
}
}
}
let mut peers = Vec::new();
self.common.backend.registry().snapshot_into(&mut peers);
let mut dead: Vec<crate::engine::registry::PeerKey> = Vec::new();
for (key, engine) in &peers {
if let Err(e) = engine.send_msg(message.clone()).await {
log::debug!("XSUB: fanout to key={} failed: {:?}", key, e);
dead.push(*key);
}
}
for key in dead {
if let Some(id) = self.common.backend.registry().id_for(key) {
self.common.backend.peer_disconnected(&id);
}
}
self.send_count = self.send_count.wrapping_add(1);
if self.send_count % crate::async_rt::task::SOCKET_SEND_YIELD_EVERY == 0 {
crate::async_rt::task::yield_now().await;
}
Ok(())
}
}
impl SocketRecv for XSubSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
let receive_timeout = self.common.backend.socket_options().receive_timeout;
let (_peer_id, message) = GenericSocketBackend::recv_next_timed(
&self.common.backend.inbound_rx,
&*self.common.backend,
receive_timeout,
)
.await?;
Ok(message)
}
}