use crate::delegate_to_core;
use crate::error::ZmqError;
use crate::message::{Blob, Msg, MsgFlags};
use crate::runtime::{Command, MailboxSender};
use crate::socket::ISocket;
use crate::socket::connection_iface::ISocketConnection;
use crate::socket::core::SocketCore;
use crate::socket::options::{AUTO_DELIMITER, ROUTER_MANDATORY};
use crate::socket::patterns::incoming_orchestrator::IncomingMessageOrchestrator;
use crate::socket::patterns::{FramingLatch, RouterMap, WritePipeCoordinator, router_auto_decode, router_auto_encode};
use dashmap::DashMap;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::{Mutex as TokioMutex, OwnedSemaphorePermit};
use super::core::command_processor::update_core_option;
use super::parse_bool_option;
#[derive(Debug)]
struct ActiveFragmentedSend {
target_endpoint_uri: String,
_permit: OwnedSemaphorePermit,
}
#[derive(Debug)]
pub(crate) struct RouterSocket {
core: Arc<SocketCore>,
router_map_for_send: RouterMap,
incoming_orchestrator: IncomingMessageOrchestrator<(Blob, Vec<Msg>)>,
pipe_to_identity_shared_map: Arc<DashMap<usize, Blob>>,
current_send_target: TokioMutex<Option<ActiveFragmentedSend>>,
pipe_send_coordinator: Arc<WritePipeCoordinator>,
framing: FramingLatch,
}
impl RouterSocket {
pub fn new(core: Arc<SocketCore>) -> Self {
let rcvhwm = { core.core_state.read().options.rcvhwm };
let orchestrator = IncomingMessageOrchestrator::new(core.handle, rcvhwm);
Self {
core,
router_map_for_send: RouterMap::new(),
incoming_orchestrator: orchestrator,
pipe_to_identity_shared_map: Arc::new(DashMap::new()),
current_send_target: TokioMutex::new(None),
pipe_send_coordinator: Arc::new(WritePipeCoordinator::new()),
framing: FramingLatch::new(router_auto_encode, router_auto_decode),
}
}
fn pipe_id_to_placeholder_identity(pipe_read_id: usize) -> Blob {
Blob::from_bytes(Bytes::from(format!("pipe:{}", pipe_read_id)))
}
fn process_incoming_zmtp_message(
&self,
pipe_read_id: usize,
mut raw_zmtp_message: Vec<Msg>,
) -> Result<(Blob, Vec<Msg>), ZmqError> {
let identity_blob = self
.pipe_to_identity_shared_map
.get(&pipe_read_id)
.map(|entry| entry.value().clone())
.unwrap_or_else(|| {
tracing::warn!(
handle = self.core.handle,
pipe_id = pipe_read_id,
"Router: Identity for pipe not found, using placeholder for incoming message."
);
Self::pipe_id_to_placeholder_identity(pipe_read_id)
});
let peer_socket_type = {
let core_s = self.core.core_state.read();
core_s
.pipe_read_id_to_endpoint_uri
.get(&pipe_read_id)
.and_then(|uri| core_s.endpoints.get(uri))
.and_then(|ep_info| ep_info.peer_socket_type.clone())
};
if self.framing.is_manual() {
return Ok((identity_blob, raw_zmtp_message));
}
match peer_socket_type.as_deref() {
Some("REQ") | Some("DEALER") => {
if !raw_zmtp_message.is_empty() && raw_zmtp_message[0].size() == 0 {
raw_zmtp_message.remove(0);
} else {
tracing::warn!(
handle = self.core.handle,
pipe_id = pipe_read_id,
"ROUTER: Expected empty delimiter from REQ/DEALER peer, but not found."
);
}
}
Some("ROUTER") => {
}
_ => {
if !raw_zmtp_message.is_empty() && raw_zmtp_message[0].size() == 0 {
raw_zmtp_message.remove(0);
}
}
}
Ok((identity_blob, raw_zmtp_message))
}
fn transform_qitem_to_app_frames(identity_blob: Blob, payload_frames_vec: Vec<Msg>) -> Vec<Msg> {
let mut result_frames = Vec::with_capacity(1 + payload_frames_vec.len());
let id_bytes = Bytes::copy_from_slice(identity_blob.as_ref());
let mut id_msg = Msg::from_bytes(id_bytes);
if !payload_frames_vec.is_empty() {
id_msg.set_flags(MsgFlags::MORE);
} else {
id_msg.set_flags(id_msg.flags() & !MsgFlags::MORE);
}
result_frames.push(id_msg);
result_frames.extend(payload_frames_vec);
if let Some(last_frame) = result_frames.last_mut() {
last_frame.set_flags(last_frame.flags() & !MsgFlags::MORE);
} else if result_frames.is_empty() && !identity_blob.is_empty() {
}
result_frames
}
}
#[async_trait]
impl ISocket for RouterSocket {
fn core(&self) -> &Arc<SocketCore> {
&self.core
}
fn mailbox(&self) -> MailboxSender {
self.core.command_sender()
}
async fn bind(&self, endpoint: &str) -> Result<(), ZmqError> {
delegate_to_core!(self, UserBind, endpoint: endpoint.to_string())
}
async fn connect(&self, endpoint: &str) -> Result<(), ZmqError> {
delegate_to_core!(self, UserConnect, endpoint: endpoint.to_string())
}
async fn disconnect(&self, endpoint: &str) -> Result<(), ZmqError> {
delegate_to_core!(self, UserDisconnect, endpoint: endpoint.to_string())
}
async fn unbind(&self, endpoint: &str) -> Result<(), ZmqError> {
delegate_to_core!(self, UserUnbind, endpoint: endpoint.to_string())
}
async fn set_option(&self, option: i32, value: &[u8]) -> Result<(), ZmqError> {
delegate_to_core!(self, UserSetOpt, option: option, value: value.to_vec())
}
async fn get_option(&self, option: i32) -> Result<Vec<u8>, ZmqError> {
delegate_to_core!(self, UserGetOpt, option: option)
}
async fn close(&self) -> Result<(), ZmqError> {
delegate_to_core!(self, UserClose,)
}
async fn send(&self, msg: Msg) -> Result<(), ZmqError> {
if !self.core.is_running().await {
return Err(ZmqError::InvalidState("Socket is closing".into()));
}
let (timeout_opt, router_mandatory_opt) = {
let core_s_read = self.core.core_state.read();
(
core_s_read.options.sndtimeo,
core_s_read.options.router_mandatory,
)
};
let current_send_target_guard = self.current_send_target.lock().await;
if let Some(active_info) = &*current_send_target_guard {
let target_uri_for_payload = active_info.target_endpoint_uri.clone();
let permit_exists = true;
drop(current_send_target_guard);
let conn_iface_for_payload: Option<Arc<dyn ISocketConnection>> = {
let core_s_read = self.core.core_state.read();
core_s_read
.endpoints
.get(&target_uri_for_payload)
.map(|ep_info| ep_info.connection_iface.clone())
};
let conn_iface = match conn_iface_for_payload {
Some(iface) => iface,
None => {
if permit_exists {
let mut clear_target_guard_on_err = self.current_send_target.lock().await;
*clear_target_guard_on_err = None;
}
return if router_mandatory_opt {
Err(ZmqError::HostUnreachable(
"Peer for fragmented send disappeared".into(),
))
} else {
Ok(())
};
}
};
let is_last_user_part = !msg.is_more();
let send_result = conn_iface.send_message(msg).await;
if is_last_user_part && permit_exists {
let mut clear_target_guard_on_done = self.current_send_target.lock().await;
*clear_target_guard_on_done = None;
}
return match send_result {
Ok(()) => Ok(()),
Err(e) => {
if !is_last_user_part && permit_exists {
let mut clear_target_guard_on_err_payload = self.current_send_target.lock().await;
*clear_target_guard_on_err_payload = None;
}
if router_mandatory_opt {
Err(if matches!(e, ZmqError::ConnectionClosed) {
ZmqError::HostUnreachable("Peer disconnected during payload send".into())
} else {
e
})
} else {
Ok(())
}
}
};
} else {
if !msg.is_more() {
drop(current_send_target_guard);
return Err(ZmqError::InvalidMessage(
"ROUTER send: First frame (identity) must have MORE flag set by application".into(),
));
}
let destination_id = Blob::from_bytes(msg.data_bytes().unwrap_or_default());
if destination_id.is_empty() {
drop(current_send_target_guard);
return Err(ZmqError::InvalidMessage(
"ROUTER send: Identity frame cannot be empty".into(),
));
}
drop(current_send_target_guard);
let target_endpoint_uri = match self
.router_map_for_send
.get_peer_info_for_identity(&destination_id)
.await
{
Some(info) => info.uri,
None => {
return if router_mandatory_opt {
Err(ZmqError::HostUnreachable(format!(
"Peer {:?} not found (ROUTER_MANDATORY)",
destination_id
)))
} else {
Ok(())
};
}
};
let (conn_iface_opt, pipe_read_id_opt) = {
let core_s_read_guard = self.core.core_state.read();
let result = core_s_read_guard
.endpoints
.get(&target_endpoint_uri)
.map_or((None, None), |ep_info| {
(
Some(ep_info.connection_iface.clone()),
ep_info.pipe_ids.map(|(_, read_id)| read_id),
)
});
result
};
let (conn_iface, pipe_read_id) = match (conn_iface_opt, pipe_read_id_opt) {
(Some(iface), Some(id)) => (iface, id),
_ => {
self
.router_map_for_send
.remove_peer_by_identity(&destination_id)
.await;
return if router_mandatory_opt {
Err(ZmqError::HostUnreachable(
"Peer connection for identity disappeared".into(),
))
} else {
Ok(())
};
}
};
let permit = self
.pipe_send_coordinator
.acquire_send_permit(pipe_read_id, timeout_opt)
.await?;
let mut set_target_guard = self.current_send_target.lock().await;
match conn_iface.send_message(msg).await {
Ok(()) => {
let delimiter_result = if !self.framing.is_manual() {
let mut delimiter_frame = Msg::new();
delimiter_frame.set_flags(MsgFlags::MORE);
conn_iface.send_message(delimiter_frame).await
} else {
Ok(())
};
match delimiter_result {
Ok(()) => {
*set_target_guard = Some(ActiveFragmentedSend {
target_endpoint_uri,
_permit: permit,
});
Ok(())
}
Err(e) => {
drop(set_target_guard);
drop(permit);
if router_mandatory_opt {
Err(if matches!(e, ZmqError::ConnectionClosed) {
ZmqError::HostUnreachable("Peer disconnected during delimiter send".into())
} else {
e
})
} else {
Ok(())
}
}
}
}
Err(e) => {
drop(set_target_guard);
drop(permit);
if router_mandatory_opt {
Err(if matches!(e, ZmqError::ConnectionClosed) {
ZmqError::HostUnreachable("Peer disconnected during identity send".into())
} else {
e
})
} else {
Ok(())
}
}
}
}
}
async fn recv(&self) -> Result<Msg, ZmqError> {
if !self.core.is_running().await {
return Err(ZmqError::InvalidState("Socket is closing".into()));
}
let rcvtimeo_opt = { self.core.core_state.read().options.rcvtimeo };
self
.incoming_orchestrator
.recv_message(rcvtimeo_opt, |(identity_blob, payload_frames_vec)| {
Self::transform_qitem_to_app_frames(identity_blob, payload_frames_vec)
})
.await
}
async fn send_multipart(&self, mut frames: Vec<Msg>) -> Result<(), ZmqError> {
if !self.core.is_running().await {
return Err(ZmqError::InvalidState("Socket is closing".into()));
}
if frames.is_empty() {
return Err(ZmqError::InvalidMessage(
"ROUTER send_multipart requires at least an identity frame.".into(),
));
}
let destination_identity_msg = frames.remove(0);
let destination_id_blob =
Blob::from_bytes(destination_identity_msg.data_bytes().unwrap_or_default());
if destination_id_blob.is_empty() {
return Err(ZmqError::InvalidMessage(
"ROUTER send_multipart: Identity frame cannot be empty".into(),
));
}
let (timeout_opt, router_mandatory_opt) = {
let core_s_read = self.core.core_state.read();
(
core_s_read.options.sndtimeo,
core_s_read.options.router_mandatory,
)
};
let peer_info = match self
.router_map_for_send
.get_peer_info_for_identity(&destination_id_blob)
.await
{
Some(info) => info,
None => {
return if router_mandatory_opt {
Err(ZmqError::HostUnreachable(format!(
"Peer with identity {:?} not found (ROUTER_MANDATORY)",
destination_id_blob
)))
} else {
Ok(())
};
}
};
let (conn_iface_opt, pipe_read_id_opt) = {
let core_s_read = self.core.core_state.read();
core_s_read
.endpoints
.get(&peer_info.uri)
.map_or((None, None), |ep_info| {
(
Some(ep_info.connection_iface.clone()),
ep_info.pipe_ids.map(|(_, read_id)| read_id),
)
})
};
let (conn_iface, pipe_read_id) = match (conn_iface_opt, pipe_read_id_opt) {
(Some(iface), Some(id)) => (iface, id),
_ => {
self
.router_map_for_send
.remove_peer_by_identity(&destination_id_blob)
.await;
return if router_mandatory_opt {
Err(ZmqError::HostUnreachable(
"Peer connection for identity disappeared before send".into(),
))
} else {
Ok(())
};
}
};
let _permit = self
.pipe_send_coordinator
.acquire_send_permit(pipe_read_id, timeout_opt)
.await
.map_err(|e| {
if router_mandatory_opt {
e
} else {
ZmqError::Internal("Send dropped due to permit error (non-mandatory)".into())
}
})?;
let mut zmtp_wire_frames =
peer_info
.strategy
.prepare_wire_frames(destination_identity_msg, frames, &self.framing);
if let Some(last_frame) = zmtp_wire_frames.last_mut() {
last_frame.set_flags(last_frame.flags() & !MsgFlags::MORE);
}
match conn_iface.send_multipart(zmtp_wire_frames).await {
Ok(()) => Ok(()),
Err(e) => {
if router_mandatory_opt {
Err(if matches!(e, ZmqError::ConnectionClosed) {
ZmqError::HostUnreachable("Peer disconnected during multipart send".into())
} else {
e
})
} else {
Ok(())
}
}
}
}
async fn recv_multipart(&self) -> Result<Vec<Msg>, ZmqError> {
if !self.core.is_running().await {
return Err(ZmqError::InvalidState("Socket is closing".into()));
}
let rcvtimeo_opt = { self.core.core_state.read().options.rcvtimeo };
self
.incoming_orchestrator
.recv_logical_message(rcvtimeo_opt, |(identity_blob, payload_frames_vec)| {
Self::transform_qitem_to_app_frames(identity_blob, payload_frames_vec)
})
.await
}
async fn set_pattern_option(&self, option: i32, value: &[u8]) -> Result<(), ZmqError> {
if option == ROUTER_MANDATORY {
update_core_option(&self.core, |options| {
options.router_mandatory = parse_bool_option(value)?;
Ok(())
})?;
Ok(())
} else if option == AUTO_DELIMITER {
if !parse_bool_option(value)? {
self.framing.set_manual();
}
Ok(())
} else {
Err(ZmqError::UnsupportedOption(option))
}
}
async fn get_pattern_option(&self, option: i32) -> Result<Vec<u8>, ZmqError> {
if option == ROUTER_MANDATORY {
let val = { self.core.core_state.read().options.router_mandatory };
Ok((val as i32).to_ne_bytes().to_vec())
} else if option == AUTO_DELIMITER {
Ok((!self.framing.is_manual() as i32).to_ne_bytes().to_vec())
} else {
Err(ZmqError::UnsupportedOption(option))
}
}
async fn process_command(&self, command: Command) -> Result<bool, ZmqError> {
match command {
Command::Stop => {
self.incoming_orchestrator.close().await;
}
_ => return Ok(false),
}
Ok(true)
}
async fn handle_pipe_event(&self, pipe_read_id: usize, event: Command) -> Result<(), ZmqError> {
match event {
Command::PipeMessageReceived { msg, .. } => {
if let Some(raw_zmtp_message_vec) = self
.incoming_orchestrator
.accumulate_pipe_frame(pipe_read_id, msg)?
{
match self.process_incoming_zmtp_message(pipe_read_id, raw_zmtp_message_vec) {
Ok((identity_blob, payload_only_vec)) => {
self
.incoming_orchestrator
.queue_item(pipe_read_id, (identity_blob, payload_only_vec))
.await?;
}
Err(e) => {
tracing::error!(
handle = self.core.handle,
pipe_id = pipe_read_id,
"Router: Error processing incoming ZMTP message: {}. Dropped.",
e
);
}
}
}
}
_ => {}
}
Ok(())
}
async fn pipe_attached(
&self,
pipe_read_id: usize,
_pipe_write_id: usize,
peer_identity_opt: Option<&[u8]>,
) {
let (endpoint_uri_opt, connection_id_opt) = {
let core_s_read = self.core.core_state.read();
let uri_opt = core_s_read
.pipe_read_id_to_endpoint_uri
.get(&pipe_read_id)
.cloned();
let conn_id_opt = uri_opt.as_ref().and_then(|uri| {
core_s_read
.endpoints
.get(uri)
.map(|ep_info| ep_info.handle_id)
});
(uri_opt, conn_id_opt)
};
if let (Some(endpoint_uri), Some(connection_id)) = (endpoint_uri_opt, connection_id_opt) {
let identity_to_use = match peer_identity_opt {
Some(id_bytes) if !id_bytes.is_empty() => {
Blob::from_bytes(Bytes::copy_from_slice(id_bytes))
}
_ => Self::pipe_id_to_placeholder_identity(pipe_read_id),
};
tracing::debug!(handle = self.core.handle, pipe_read_id, uri = %endpoint_uri, ?identity_to_use, conn_id = connection_id, "ROUTER attaching connection");
self
.router_map_for_send
.add_peer(identity_to_use.clone(), pipe_read_id, endpoint_uri)
.await;
self
.pipe_to_identity_shared_map
.insert(pipe_read_id, identity_to_use);
self.pipe_send_coordinator.add_pipe(pipe_read_id).await;
} else {
tracing::warn!(
handle = self.core.handle,
pipe_read_id,
"ROUTER pipe_attached: Endpoint URI or Connection ID not found. Maps not fully updated."
);
}
}
async fn update_peer_identity(&self, pipe_read_id: usize, new_identity_opt: Option<Blob>) {
let (endpoint_uri_opt, peer_socket_type_opt) = {
let core_s_read = self.core.core_state.read();
let uri_opt = core_s_read
.pipe_read_id_to_endpoint_uri
.get(&pipe_read_id)
.cloned();
let type_opt = if let Some(ref uri) = uri_opt {
core_s_read
.endpoints
.get(uri)
.and_then(|ep_info| ep_info.peer_socket_type.clone())
} else {
None
};
(uri_opt, type_opt)
};
if let Some(endpoint_uri) = endpoint_uri_opt {
let new_identity = match new_identity_opt {
Some(id) if !id.is_empty() => id,
_ => {
tracing::warn!(handle = self.core.handle, pipe_read_id, uri = %endpoint_uri, "ROUTER update_peer_identity: No valid new ZMTP identity provided, using placeholder.");
Self::pipe_id_to_placeholder_identity(pipe_read_id)
}
};
tracing::debug!(handle = self.core.handle, pipe_read_id, new_identity = ?new_identity, "ROUTER updating peer identity");
self
.router_map_for_send
.update_peer_identity(
pipe_read_id,
new_identity.clone(),
&endpoint_uri,
peer_socket_type_opt.as_deref(),
)
.await;
self
.pipe_to_identity_shared_map
.insert(pipe_read_id, new_identity);
} else {
tracing::warn!(
handle = self.core.handle,
pipe_read_id,
"ROUTER update_peer_identity: Endpoint URI not found for pipe. Cannot update identity maps."
);
}
}
async fn pipe_detached(&self, pipe_read_id: usize) {
tracing::debug!(
handle = self.core.handle,
pipe_read_id,
"ROUTER detaching pipe"
);
let (endpoint_uri_opt, connection_id_opt) = {
let core_s_read = self.core.core_state.read();
let uri_opt = core_s_read
.pipe_read_id_to_endpoint_uri
.get(&pipe_read_id)
.cloned();
let conn_id_opt = uri_opt
.as_ref()
.and_then(|uri| core_s_read.endpoints.get(uri))
.map(|ep_info| ep_info.handle_id);
(uri_opt, conn_id_opt)
};
self
.router_map_for_send
.remove_peer_by_read_pipe(pipe_read_id)
.await;
self.pipe_to_identity_shared_map.remove(&pipe_read_id);
self.pipe_send_coordinator.remove_pipe(pipe_read_id).await;
if connection_id_opt.is_some() {
let mut active_frag_guard = self.current_send_target.lock().await;
if let Some(active_info) = &*active_frag_guard {
if endpoint_uri_opt.as_deref() == Some(&active_info.target_endpoint_uri) {
*active_frag_guard = None;
}
}
}
self
.incoming_orchestrator
.clear_pipe_state(pipe_read_id)
.await;
}
}