use crate::error::ZmqError;
use crate::message::Msg;
use crate::socket::connection_iface::ISocketConnection;
use crate::socket::core::CoreState;
use std::collections::HashSet;
use std::sync::Arc;
use parking_lot::RwLock;
#[derive(Debug, Default)]
pub(crate) struct Distributor {
peer_uris: RwLock<HashSet<String>>,
}
impl Distributor {
pub fn new() -> Self {
Self::default()
}
pub fn add_peer_uri(&self, endpoint_uri: String) {
let mut peers_guard = self.peer_uris.write();
if peers_guard.insert(endpoint_uri.clone()) {
tracing::trace!(uri = %endpoint_uri, "Distributor added peer URI");
}
}
pub fn remove_peer_uri(&self, endpoint_uri: &str) {
let mut peers_guard = self.peer_uris.write();
if peers_guard.remove(endpoint_uri) {
tracing::trace!(uri = %endpoint_uri, "Distributor removed peer URI");
}
}
pub fn get_peer_uris(&self) -> Vec<String> {
let peers_guard = self.peer_uris.read();
peers_guard.iter().cloned().collect() }
pub async fn send_to_all(
&self,
msg: &Msg,
core_handle: usize,
core_state_accessor: &parking_lot::RwLock<CoreState>, ) -> Result<(), Vec<(String, ZmqError)>> {
let uris_to_send_to = self.get_peer_uris(); tracing::debug!(
handle = core_handle,
num_peers = uris_to_send_to.len(),
"Distributor::send_to_all: Distributing to peer URIs"
);
if uris_to_send_to.is_empty() {
return Ok(());
}
let mut failed_uris = Vec::new();
for uri_to_send in uris_to_send_to {
let conn_iface_opt: Option<Arc<dyn ISocketConnection>> = {
let core_s_read = core_state_accessor.read();
core_s_read
.endpoints
.get(&uri_to_send)
.map(|ep_info| ep_info.connection_iface.clone())
};
if let Some(conn_iface) = conn_iface_opt {
let msg_clone = msg.clone(); match conn_iface.send_message(msg_clone).await {
Ok(()) => {
tracing::trace!(handle = core_handle, uri = %uri_to_send, "Distributor: send_message successful for URI.");
}
Err(ZmqError::ResourceLimitReached) | Err(ZmqError::Timeout) => {
tracing::trace!(handle = core_handle, uri = %uri_to_send, "PUB (Distributor) dropping message due to HWM/Timeout for URI");
}
Err(e @ ZmqError::ConnectionClosed) => {
tracing::debug!(handle = core_handle, uri = %uri_to_send, "PUB (Distributor) peer disconnected during send to URI");
failed_uris.push((uri_to_send.clone(), e)); }
Err(e) => {
tracing::error!(handle = core_handle, uri = %uri_to_send, error = %e, "PUB (Distributor) send to URI encountered unexpected error");
failed_uris.push((uri_to_send.clone(), e)); }
}
} else {
tracing::warn!(
handle = core_handle,
uri = %uri_to_send,
"Distributor: ISocketConnection not found for URI. Stale URI?"
);
failed_uris.push((
uri_to_send.clone(),
ZmqError::Internal("Distributor found stale URI".into()),
));
}
}
if failed_uris.is_empty() {
Ok(())
} else {
Err(failed_uris)
}
}
pub async fn send_to_all_multipart(
&self,
zmtp_frames: Vec<Msg>, core_handle: usize,
core_state_accessor: &parking_lot::RwLock<CoreState>,
) -> Result<(), Vec<(String, ZmqError)>> {
let uris_to_send_to = self.get_peer_uris();
tracing::debug!(
handle = core_handle,
num_peers = uris_to_send_to.len(),
num_zmtp_frames = zmtp_frames.len(),
"Distributor::send_to_all_multipart: Distributing to peer URIs"
);
if uris_to_send_to.is_empty() || zmtp_frames.is_empty() {
return Ok(());
}
let mut failed_uris = Vec::new();
for uri_to_send in uris_to_send_to {
let conn_iface_opt: Option<Arc<dyn ISocketConnection>> = {
let core_s_read = core_state_accessor.read();
core_s_read
.endpoints
.get(&uri_to_send)
.map(|ep_info| ep_info.connection_iface.clone())
};
if let Some(conn_iface) = conn_iface_opt {
let frames_for_this_peer = zmtp_frames.clone();
match conn_iface.send_multipart(frames_for_this_peer).await {
Ok(()) => {
tracing::trace!(handle = core_handle, uri = %uri_to_send, "Distributor: send_multipart successful for URI.");
}
Err(ZmqError::ResourceLimitReached) | Err(ZmqError::Timeout) => {
tracing::trace!(handle = core_handle, uri = %uri_to_send, "PUB (Distributor) send_multipart dropping message due to HWM/Timeout for URI");
}
Err(e @ ZmqError::ConnectionClosed) => {
tracing::debug!(handle = core_handle, uri = %uri_to_send, "PUB (Distributor) send_multipart: peer disconnected during send to URI");
failed_uris.push((uri_to_send.clone(), e));
}
Err(e) => {
tracing::error!(handle = core_handle, uri = %uri_to_send, error = %e, "PUB (Distributor) send_multipart to URI encountered unexpected error");
failed_uris.push((uri_to_send.clone(), e));
}
}
} else {
tracing::warn!(
handle = core_handle,
uri = %uri_to_send,
"Distributor: ISocketConnection not found for URI during send_multipart. Stale URI?"
);
failed_uris.push((
uri_to_send.clone(),
ZmqError::Internal("Distributor found stale URI for multipart send".into()),
));
}
}
if failed_uris.is_empty() {
Ok(())
} else {
Err(failed_uris)
}
}
}