use crate::error::ZmqError;
#[cfg(feature = "io-uring")]
use crate::runtime::ActorType;
use crate::runtime::{Command, MailboxSender, SystemEvent};
use crate::socket::ISocket;
use crate::socket::connection_iface::ISocketConnection;
#[cfg(feature = "io-uring")]
use crate::socket::core::pipe_manager;
use crate::socket::core::state::{EndpointInfo, EndpointType, ShutdownPhase};
use crate::socket::core::{SocketCore, shutdown};
use crate::socket::events::{MonitorSender, SocketEvent};
use crate::socket::options::{self, *};
use crate::transport::endpoint::{Endpoint, parse_endpoint};
#[cfg(feature = "inproc")]
use crate::transport::inproc;
#[cfg(feature = "ipc")]
use crate::transport::ipc::{IpcConnecter, IpcListener};
use crate::transport::tcp::{TcpConnecter, TcpListener};
use fibre::oneshot;
use std::sync::Arc;
use std::time::Duration;
pub(crate) async fn process_socket_command(
core_arc: Arc<SocketCore>,
socket_logic_strong: &Arc<dyn ISocket>,
command: Command,
) -> Result<(), ZmqError> {
let core_handle = core_arc.handle;
let command_name_str = command.variant_name();
let current_shutdown_phase = core_arc.shutdown_coordinator.lock().await.state;
if current_shutdown_phase != ShutdownPhase::Running {
match command {
Command::UserClose { reply_tx } => {
tracing::info!(
handle = core_handle,
"Processing UserClose command during shutdown."
);
shutdown::initiate_core_shutdown(core_arc.clone(), socket_logic_strong, false).await;
let _ = reply_tx.send(Ok(()));
}
Command::Stop => {
tracing::info!(
handle = core_handle,
"Processing Stop command during shutdown."
);
shutdown::initiate_core_shutdown(core_arc.clone(), socket_logic_strong, false).await;
}
#[cfg(feature = "io-uring")]
Command::UringFdError { fd, error } => {
tracing::debug!(handle=core_handle, %fd, %error, "UringFdError received. Treating as ActorStopping event.");
let endpoint_uri_opt = core_arc
.core_state
.read()
.uring_fd_to_endpoint_uri
.get(&fd)
.cloned();
shutdown::handle_actor_stopping_event(
core_arc.clone(),
socket_logic_strong,
fd as usize,
ActorType::Session,
endpoint_uri_opt.as_deref(),
Some(&error),
)
.await;
}
#[cfg(feature = "io-uring")]
Command::UringFdHandshakeComplete { fd, .. } => {
tracing::debug!(handle = core_handle, %fd, "Ignoring UringFdHandshakeComplete during shutdown.");
}
Command::UserBind { reply_tx, .. }
| Command::UserConnect { reply_tx, .. }
| Command::UserDisconnect { reply_tx, .. }
| Command::UserUnbind { reply_tx, .. }
| Command::UserSetOpt { reply_tx, .. }
| Command::UserMonitor { reply_tx, .. } => {
tracing::warn!(handle = core_handle, cmd_name = %command_name_str, "Command ignored: SocketCore is shutting down.");
let _ = reply_tx.send(Err(ZmqError::InvalidState(
"Socket is shutting down".into(),
)));
}
Command::UserRecv { reply_tx } => {
let _ = reply_tx.send(Err(ZmqError::InvalidState(
"Socket is shutting down".into(),
)));
}
Command::UserGetOpt { reply_tx, .. } => {
let _ = reply_tx.send(Err(ZmqError::InvalidState(
"Socket is shutting down".into(),
)));
}
Command::UserSend { .. } => {
tracing::warn!(handle = core_handle, cmd_name = %command_name_str, "UserSend ignored: SocketCore is shutting down.");
}
_ => {
tracing::warn!(handle = core_handle, cmd_name = %command_name_str, "Unhandled or unexpected command during shutdown.");
}
}
return Ok(()); }
match command {
Command::UserBind { endpoint, reply_tx } => {
handle_user_bind(core_arc, socket_logic_strong.clone(), endpoint, reply_tx).await;
}
Command::UserConnect { endpoint, reply_tx } => {
handle_user_connect(core_arc, socket_logic_strong.clone(), endpoint, reply_tx).await;
}
Command::UserDisconnect { endpoint, reply_tx } => {
handle_user_disconnect(core_arc, endpoint, reply_tx).await;
}
Command::UserUnbind { endpoint, reply_tx } => {
handle_user_unbind(core_arc, socket_logic_strong, endpoint, reply_tx).await;
}
Command::UserSend { msg } => {
if let Err(e) = socket_logic_strong.send(msg).await {
tracing::debug!(handle = core_handle, "UserSend ISocket::send error: {}", e);
}
}
Command::UserRecv { reply_tx } => {
let result = socket_logic_strong.recv().await;
if let Err(ref e) = result {
tracing::debug!(handle = core_handle, "UserRecv ISocket::recv error: {}", e);
}
let _ = reply_tx.send(result);
}
Command::UserSetOpt {
option,
value,
reply_tx,
} => {
let _ = reply_tx
.send(handle_set_option(core_arc.clone(), socket_logic_strong, option, &value).await);
}
Command::UserGetOpt { option, reply_tx } => {
let _ = reply_tx.send(handle_get_option(core_arc.clone(), socket_logic_strong, option).await);
}
Command::UserMonitor {
monitor_tx,
reply_tx,
} => {
handle_user_monitor(core_arc.clone(), monitor_tx, reply_tx).await;
}
Command::UserClose { reply_tx } => {
tracing::info!(
handle = core_handle,
"SocketCore received UserClose command."
);
shutdown::publish_socket_closing_event(&core_arc.context, core_handle).await;
shutdown::initiate_core_shutdown(core_arc.clone(), socket_logic_strong, false).await;
let _ = reply_tx.send(Ok(())); }
Command::Stop => {
tracing::info!(
handle = core_handle,
"SocketCore received direct Stop command."
);
shutdown::publish_socket_closing_event(&core_arc.context, core_handle).await;
shutdown::initiate_core_shutdown(core_arc.clone(), socket_logic_strong, false).await;
}
#[cfg(feature = "io-uring")]
Command::UringFdMessage { fd, msg } => {
let endpoint_uri_opt = core_arc
.core_state
.read()
.uring_fd_to_endpoint_uri
.get(&fd)
.cloned();
if let Some(uri) = endpoint_uri_opt {
let synthetic_read_id_opt = core_arc
.core_state
.read()
.endpoints
.get(&uri)
.and_then(|ep_info| ep_info.pipe_ids.map(|pids| pids.1));
if let Some(s_read_id) = synthetic_read_id_opt {
let cmd_for_isocket = Command::PipeMessageReceived {
pipe_id: s_read_id,
msg,
};
if let Err(e) = socket_logic_strong
.handle_pipe_event(s_read_id, cmd_for_isocket)
.await
{
tracing::error!(handle=core_handle, %fd, "Error from ISocket::handle_pipe_event for UringFdMessage: {}", e);
}
} else {
tracing::warn!(handle=core_handle, %fd, %uri, "No synthetic_read_id found for UringFdMessage. Inconsistent state?");
}
} else {
tracing::warn!(handle=core_handle, %fd, "Received UringFdMessage for unknown FD. Message dropped.");
}
}
#[cfg(feature = "io-uring")]
Command::UringFdError { fd, error } => {
let endpoint_uri_opt = core_arc
.core_state
.read()
.uring_fd_to_endpoint_uri
.get(&fd)
.cloned();
if let Some(uri) = endpoint_uri_opt {
let conn_iface_opt;
let synthetic_read_id_opt;
{
let cs = core_arc.core_state.read();
let ep_info_opt = cs.endpoints.get(&uri);
conn_iface_opt = ep_info_opt.map(|ep| ep.connection_iface.clone());
synthetic_read_id_opt = ep_info_opt.and_then(|ep| ep.pipe_ids.map(|pids| pids.1));
}
tracing::warn!(handle=core_handle, %fd, %uri, %error, "Processing UringFdError. Initiating close and cleanup.");
if let Some(iface) = conn_iface_opt {
if let Err(close_err) = iface.close_connection().await {
tracing::warn!(handle=core_handle, %fd, "Error calling close_connection() for UringFdError: {}", close_err);
}
} else {
tracing::warn!(handle=core_handle, %fd, "No ISocketConnection found to close for UringFdError on URI {}.", uri);
}
if let Some(s_read_id) = synthetic_read_id_opt {
socket_logic_strong.pipe_detached(s_read_id).await;
}
pipe_manager::cleanup_stopped_child_resources(
core_arc.clone(),
socket_logic_strong,
fd as usize, ActorType::Session, Some(&uri),
Some(&error),
current_shutdown_phase != ShutdownPhase::Running,
)
.await;
} else {
tracing::warn!(handle=core_handle, %fd, %error, "Received UringFdError for unknown FD (URI not found in uring_fd_to_endpoint_uri map).");
}
}
#[cfg(feature = "io-uring")]
Command::UringFdHandshakeComplete { fd, peer_identity } => {
let endpoint_uri_opt = core_arc
.core_state
.read()
.uring_fd_to_endpoint_uri
.get(&fd)
.cloned();
if let Some(uri) = endpoint_uri_opt {
let synthetic_read_id_opt = core_arc
.core_state
.read()
.endpoints
.get(&uri) .and_then(|ep_info| ep_info.pipe_ids.map(|pids| pids.1));
if let Some(s_read_id) = synthetic_read_id_opt {
tracing::debug!(
handle = core_handle, %fd, %uri, synth_pipe_id = s_read_id, ?peer_identity,
"SocketCore: Processing UringFdHandshakeComplete. Notifying ISocket."
);
socket_logic_strong
.update_peer_identity(s_read_id, peer_identity)
.await;
core_arc
.core_state
.read()
.send_monitor_event(SocketEvent::HandshakeSucceeded {
endpoint: uri.clone(),
});
} else {
tracing::warn!(
handle = core_handle, %fd, %uri,
"SocketCore: No synthetic_read_id found for UringFdHandshakeComplete. ISocket not notified."
);
}
} else {
tracing::warn!(
handle = core_handle, %fd,
"SocketCore: Received UringFdHandshakeComplete for an FD not mapped to a URI. Ignoring."
);
}
}
_ => {
tracing::error!(handle = core_handle, cmd_name = %command_name_str, "SocketCore received UNEXPECTED command type on its mailbox!");
}
}
Ok(())
}
async fn handle_user_bind(
core_arc: Arc<SocketCore>,
socket_logic: Arc<dyn ISocket>,
endpoint: String,
reply_tx: oneshot::Sender<Result<(), ZmqError>>,
) {
let core_handle = core_arc.handle;
tracing::debug!(handle = core_handle, %endpoint, "Processing UserBind command");
let parse_result = parse_endpoint(&endpoint);
let context_clone = core_arc.context.clone(); let parent_socket_id = core_arc.handle;
let mut actual_uri_for_state_update: Option<String> = None;
let bind_result: Result<(), ZmqError>;
match parse_result {
Ok(Endpoint::Tcp(_addr, ref uri_from_parse)) => {
let core_s_read = core_arc.core_state.read();
if core_s_read.endpoints.contains_key(uri_from_parse) {
bind_result = Err(ZmqError::AddrInUse(uri_from_parse.clone()));
} else {
let monitor_tx_clone = core_s_read.get_monitor_sender_clone();
let options_clone = core_s_read.options.clone();
drop(core_s_read);
let child_actor_handle = context_clone.inner().next_handle();
match TcpListener::create_and_spawn(
child_actor_handle,
endpoint.clone(), options_clone,
socket_logic,
context_clone.inner().next_handle.clone(), monitor_tx_clone,
context_clone.clone(),
parent_socket_id,
) {
Ok((listener_mailbox, listener_task_handle, resolved_uri)) => {
let mut core_s_write = core_arc.core_state.write();
if core_s_write.endpoints.contains_key(&resolved_uri) {
listener_task_handle.abort(); bind_result = Err(ZmqError::AddrInUse(resolved_uri));
} else {
core_s_write.endpoints.insert(
resolved_uri.clone(),
EndpointInfo {
mailbox: listener_mailbox,
task_handle: Some(listener_task_handle),
endpoint_type: EndpointType::Listener,
endpoint_uri: resolved_uri.clone(),
pipe_ids: None,
handle_id: child_actor_handle,
target_endpoint_uri: None,
is_outbound_connection: false,
peer_socket_type: None,
connection_iface: Arc::new(crate::socket::connection_iface::DummyConnection),
},
);
actual_uri_for_state_update = Some(resolved_uri);
bind_result = Ok(());
}
}
Err(e) => bind_result = Err(e),
}
}
}
#[cfg(feature = "ipc")]
Ok(Endpoint::Ipc(ref path_buf, ref uri_from_parse)) => {
let core_s_read = core_arc.core_state.read();
if core_s_read.endpoints.contains_key(uri_from_parse) {
bind_result = Err(ZmqError::AddrInUse(uri_from_parse.clone()));
} else {
let monitor_tx_clone = core_s_read.get_monitor_sender_clone();
let options_clone = core_s_read.options.clone();
drop(core_s_read);
let child_actor_handle = context_clone.inner().next_handle();
match IpcListener::create_and_spawn(
child_actor_handle,
endpoint.clone(), path_buf.clone(),
options_clone,
socket_logic,
context_clone.inner().next_handle.clone(),
monitor_tx_clone,
context_clone.clone(),
parent_socket_id,
) {
Ok((listener_mailbox, listener_task_handle, resolved_uri)) => {
let mut core_s_write = core_arc.core_state.write();
if core_s_write.endpoints.contains_key(&resolved_uri) {
listener_task_handle.abort();
bind_result = Err(ZmqError::AddrInUse(resolved_uri));
} else {
core_s_write.endpoints.insert(
resolved_uri.clone(),
EndpointInfo {
mailbox: listener_mailbox,
task_handle: Some(listener_task_handle),
endpoint_type: EndpointType::Listener,
endpoint_uri: resolved_uri.clone(),
pipe_ids: None,
handle_id: child_actor_handle,
target_endpoint_uri: None,
is_outbound_connection: false,
peer_socket_type: None,
connection_iface: Arc::new(crate::socket::connection_iface::DummyConnection),
},
);
actual_uri_for_state_update = Some(resolved_uri);
bind_result = Ok(());
}
}
Err(e) => bind_result = Err(e),
}
}
}
#[cfg(feature = "inproc")]
Ok(Endpoint::Inproc(ref name)) => {
let is_already_bound_by_this_socket =
core_arc.core_state.read().bound_inproc_names.contains(name);
if is_already_bound_by_this_socket {
bind_result = Err(ZmqError::AddrInUse(format!("inproc://{}", name)));
} else {
if core_arc.context.inner().lookup_inproc(name).is_some() {
bind_result = Err(ZmqError::AddrInUse(format!("inproc://{} (globally)", name)));
} else {
match inproc::bind_inproc(name.clone(), core_arc.clone()).await {
Ok(()) => {
actual_uri_for_state_update = Some(format!("inproc://{}", name));
bind_result = Ok(());
}
Err(e) => bind_result = Err(e),
}
}
}
}
Err(e) => bind_result = Err(e), _ => bind_result = Err(ZmqError::UnsupportedTransport(endpoint.to_string())),
};
if bind_result.is_ok() {
if let Some(ref actual_uri) = actual_uri_for_state_update {
let mut core_s_write = core_arc.core_state.write();
core_s_write.last_bound_endpoint = Some(actual_uri.clone());
core_s_write.send_monitor_event(SocketEvent::Listening {
endpoint: actual_uri.clone(),
});
} else {
tracing::error!(handle=core_handle, %endpoint, "Bind OK but no actual_uri_for_state_update. Internal logic error.");
}
} else if let Err(ref e) = bind_result {
core_arc
.core_state
.read()
.send_monitor_event(SocketEvent::BindFailed {
endpoint: endpoint.clone(),
error_msg: format!("{}", e),
});
}
let _ = reply_tx.send(bind_result);
}
async fn handle_user_connect(
core_arc: Arc<SocketCore>,
socket_logic: Arc<dyn ISocket>,
endpoint_uri: String, reply_tx: oneshot::Sender<Result<(), ZmqError>>,
) {
let core_handle = core_arc.handle;
tracing::debug!(handle = core_handle, uri = %endpoint_uri, "Processing UserConnect command");
let parse_result = parse_endpoint(&endpoint_uri);
match parse_result {
Ok(Endpoint::Tcp(_, ref parsed_uri_for_connecter))
| Ok(Endpoint::Ipc(_, ref parsed_uri_for_connecter)) => {
respawn_connecter_actor(
core_arc.clone(),
socket_logic,
parsed_uri_for_connecter.clone(),
)
.await;
let _ = reply_tx.send(Ok(())); }
#[cfg(feature = "inproc")]
Ok(Endpoint::Inproc(ref name)) => {
let core_arc_clone_for_task = core_arc.clone();
let name_clone_for_task = name.clone();
tokio::spawn(async move {
inproc::connect_inproc(name_clone_for_task, core_arc_clone_for_task, reply_tx).await;
});
}
Err(e) => {
let _ = reply_tx.send(Err(e));
}
_ => {
let _ = reply_tx.send(Err(ZmqError::UnsupportedTransport(endpoint_uri)));
}
}
}
pub(crate) async fn respawn_connecter_actor(
core_arc: Arc<SocketCore>,
socket_logic: Arc<dyn ISocket>,
target_uri: String,
) {
let core_handle = core_arc.handle;
let parent_socket_id = core_handle; let context_clone = core_arc.context.clone();
tracing::debug!(handle = core_handle, target_uri = %target_uri, "Spawning/Respawning connecter task");
let parse_res = parse_endpoint(&target_uri);
match parse_res {
Ok(Endpoint::Tcp(_, _)) => {
let core_s_read = core_arc.core_state.read();
let options_clone = core_s_read.options.clone();
let monitor_tx_clone = core_s_read.get_monitor_sender_clone();
let handle_source_clone = context_clone.inner().next_handle.clone();
drop(core_s_read);
let connecter_actor_handle = context_clone.inner().next_handle();
let _task_handle = TcpConnecter::create_and_spawn(
connecter_actor_handle,
target_uri.clone(), options_clone,
socket_logic,
handle_source_clone,
monitor_tx_clone,
context_clone,
parent_socket_id,
);
}
#[cfg(feature = "ipc")]
Ok(Endpoint::Ipc(path_buf, _)) => {
let core_s_read = core_arc.core_state.read();
let options_clone = core_s_read.options.clone();
let monitor_tx_clone = core_s_read.get_monitor_sender_clone();
let handle_source_clone = context_clone.inner().next_handle.clone();
drop(core_s_read);
let connecter_actor_handle = context_clone.inner().next_handle();
let _task_handle = IpcConnecter::create_and_spawn(
connecter_actor_handle,
target_uri.clone(),
path_buf,
options_clone,
socket_logic,
handle_source_clone,
monitor_tx_clone,
context_clone,
parent_socket_id,
);
}
Ok(Endpoint::Inproc(_)) => {
tracing::warn!(handle = core_handle, %target_uri, "Inproc connections are not respawned via Connecter actor mechanism.");
}
Err(err) => {
tracing::error!(handle = core_handle, %target_uri, error = %err, "Failed to parse endpoint for respawning connecter.");
let _ = core_arc
.context
.event_bus()
.publish(SystemEvent::ConnectionAttemptFailed {
parent_core_id: core_handle,
target_endpoint_uri: target_uri,
error: err,
});
}
_ => {
tracing::warn!(handle = core_handle, %target_uri, "Unsupported transport for respawning connecter.");
}
}
}
pub(crate) async fn handle_connect_failed_event(
core_arc: Arc<SocketCore>,
socket_logic: Arc<dyn ISocket + 'static>,
target_uri: String,
error: ZmqError,
) {
let core_handle = core_arc.handle;
tracing::warn!(handle = core_handle, uri = %target_uri, error = %error, "ConnectionAttemptFailed event received.");
let monitor_event = SocketEvent::ConnectFailed {
endpoint: target_uri.clone(),
error_msg: format!("{}", error),
};
core_arc.core_state.read().send_monitor_event(monitor_event);
let should_reconnect = {
let core_s_read = core_arc.core_state.read();
core_s_read.options.reconnect_ivl.map_or(false, |d| d != Duration::ZERO) && !crate::transport::tcp::is_fatal_connect_error(&error) };
if should_reconnect {
tracing::info!(handle = core_handle, uri = %target_uri, "Connection failed, will attempt to respawn connecter (reconnect).");
respawn_connecter_actor(core_arc, socket_logic, target_uri).await;
} else {
tracing::info!(handle = core_handle, uri = %target_uri, "Connection failed, reconnect not enabled or error is fatal.");
}
}
async fn handle_user_disconnect(
core_arc: Arc<SocketCore>,
endpoint: String,
reply_tx: oneshot::Sender<Result<(), ZmqError>>,
) {
let core_handle = core_arc.handle;
tracing::debug!(handle = core_handle, %endpoint, "Processing UserDisconnect command");
let mut disconnect_result = Err(ZmqError::InvalidArgument(format!(
"Endpoint not found for disconnect: {}",
endpoint
)));
let mut endpoint_info_to_close: Option<(String, Arc<dyn ISocketConnection>)> = None;
if let Some(ep_info) = core_arc.core_state.read().endpoints.get(&endpoint) {
if ep_info.endpoint_type == EndpointType::Session {
endpoint_info_to_close = Some((endpoint.clone(), ep_info.connection_iface.clone()));
}
} else {
for (resolved_uri, ep_info) in core_arc.core_state.read().endpoints.iter() {
if ep_info.endpoint_type == EndpointType::Session
&& ep_info.target_endpoint_uri.as_deref() == Some(&endpoint)
{
endpoint_info_to_close = Some((resolved_uri.clone(), ep_info.connection_iface.clone()));
break;
}
}
}
if let Some((uri_to_close, conn_iface)) = endpoint_info_to_close {
tracing::info!(handle = core_handle, uri = %uri_to_close, "UserDisconnect: Initiating close for connection.");
match conn_iface.close_connection().await {
Ok(()) => {
disconnect_result = Ok(());
}
Err(e) => {
tracing::warn!(handle = core_handle, uri = %uri_to_close, "Error initiating close on disconnect: {}", e);
disconnect_result = Err(e);
}
}
} else {
#[cfg(feature = "inproc")]
if endpoint.starts_with("inproc://") {
disconnect_result = inproc::disconnect_inproc(&endpoint, core_arc.clone()).await;
}
}
let _ = reply_tx.send(disconnect_result);
}
async fn handle_user_unbind(
core_arc: Arc<SocketCore>,
_socket_logic_strong: &Arc<dyn ISocket>, endpoint: String,
reply_tx: oneshot::Sender<Result<(), ZmqError>>,
) {
let core_handle = core_arc.handle;
tracing::debug!(handle = core_handle, %endpoint, "Processing UserUnbind command");
let mut unbind_result = Err(ZmqError::InvalidArgument(format!(
"Listener endpoint not found for unbind: {}",
endpoint
)));
let mut listener_to_stop: Option<(String, MailboxSender)> = None;
{
let core_s_read = core_arc.core_state.read();
if let Some(ep_info) = core_s_read.endpoints.get(&endpoint) {
if ep_info.endpoint_type == EndpointType::Listener {
listener_to_stop = Some((endpoint.clone(), ep_info.mailbox.clone()));
} else {
unbind_result = Err(ZmqError::InvalidArgument(
"Cannot unbind a non-listener endpoint.".into(),
));
}
}
}
if let Some((uri, listener_mailbox)) = listener_to_stop {
tracing::info!(handle = core_handle, uri = %uri, "UserUnbind: Sending Stop to Listener actor.");
if listener_mailbox.send(Command::Stop).await.is_err() {
tracing::warn!(handle = core_handle, uri = %uri, "Failed to send Stop to Listener on unbind (already stopped?).");
unbind_result = Ok(()); } else {
unbind_result = Ok(()); }
} else {
#[cfg(feature = "inproc")]
if endpoint.starts_with("inproc://") {
let name_part = endpoint.strip_prefix("inproc://").unwrap_or("");
let mut was_removed_locally = false;
if !name_part.is_empty() {
was_removed_locally = core_arc
.core_state
.write()
.bound_inproc_names
.remove(name_part);
}
if was_removed_locally {
inproc::unbind_inproc(name_part, &core_arc.context).await; core_arc
.core_state
.read()
.send_monitor_event(SocketEvent::Closed {
endpoint: endpoint.clone(),
});
unbind_result = Ok(());
} else if !was_removed_locally && !name_part.is_empty() {
unbind_result = Err(ZmqError::InvalidArgument(format!(
"Inproc name '{}' not bound by this socket",
name_part
)));
} else {
unbind_result = Err(ZmqError::InvalidEndpoint(endpoint));
}
}
}
let _ = reply_tx.send(unbind_result);
}
async fn handle_set_option(
core_arc: Arc<SocketCore>,
socket_logic: &Arc<dyn ISocket>,
option: i32,
value: &[u8],
) -> Result<(), ZmqError> {
tracing::debug!(
handle = core_arc.handle,
option = option,
value_len = value.len(),
"Setting option"
);
let handled = socket_logic.set_pattern_option(option, value).await;
match handled {
Ok(()) => return Ok(()), Err(ZmqError::UnsupportedOption(_)) => {
}
Err(e) => return Err(e), }
update_core_option(&core_arc, |opts| {
options::apply_core_option_value(opts, option, value)
})
}
pub(crate) fn update_core_option<F>(core_arc: &SocketCore, applier: F) -> Result<(), ZmqError>
where
F: FnOnce(&mut SocketOptions) -> Result<(), ZmqError>,
{
let mut core_s_write = core_arc.core_state.write();
let mut new_options_instance = (*core_s_write.options).clone();
let apply_result = applier(&mut new_options_instance);
if apply_result.is_ok() {
core_s_write.options = Arc::new(new_options_instance);
}
apply_result
}
async fn handle_get_option(
core_arc: Arc<SocketCore>,
socket_logic: &Arc<dyn ISocket>,
option: i32,
) -> Result<Vec<u8>, ZmqError> {
tracing::debug!(handle = core_arc.handle, option = option, "Getting option");
match socket_logic.get_pattern_option(option).await {
Ok(v_val) => return Ok(v_val),
Err(ZmqError::UnsupportedOption(_)) => {}
Err(e_val) => return Err(e_val),
}
let core_s_read = core_arc.core_state.read();
options::retrieve_core_option_value(&core_s_read.options, &core_s_read, option)
}
async fn handle_user_monitor(
core_arc: Arc<SocketCore>,
monitor_tx: MonitorSender,
reply_tx: oneshot::Sender<Result<(), ZmqError>>,
) {
core_arc.core_state.write().monitor_tx = Some(monitor_tx);
let _ = reply_tx.send(Ok(()));
}