pub(crate) mod core;
pub mod events;
pub mod options;
pub mod patterns;
pub mod types;
pub(crate) mod connection_iface;
pub mod dealer_socket;
pub mod pub_socket;
pub mod pull_socket;
pub mod push_socket;
pub mod rep_socket;
pub mod req_socket;
pub mod router_socket;
pub mod sub_socket;
use crate::context::Context; use crate::error::{ZmqError, ZmqResult}; use crate::message::Msg; use crate::runtime::{Command, MailboxSender}; use crate::socket::options::SocketOptions;
use crate::socket::core::SocketCore; use crate::Blob;
use async_trait::async_trait; use std::sync::Arc;
#[macro_export]
macro_rules! delegate_to_core {
($self:ident, $variant:ident, $($field:ident : $value:expr),+ $(,)?) => {
{
use fibre::oneshot;
let (reply_tx, reply_rx) = oneshot::oneshot();
let cmd = $crate::runtime::Command::$variant { $($field : $value),+, reply_tx };
$self.mailbox()
.send(cmd)
.await.map_err(|_send_error| $crate::error::ZmqError::Internal("Mailbox send error".into()))?;
reply_rx.recv().await.map_err(|_recv_error| $crate::error::ZmqError::Internal("Reply channel error".into()))?
}
};
($self:ident, $variant:ident $(,)?) => {
{
use fibre::oneshot;
let (reply_tx, reply_rx) = oneshot::oneshot();
let cmd = $crate::runtime::Command::$variant { reply_tx };
$self.mailbox()
.send(cmd)
.await.map_err(|_send_error| $crate::error::ZmqError::Internal("Mailbox send error".into()))?;
reply_rx.recv().await.map_err(|_recv_error| $crate::error::ZmqError::Internal("Reply channel error".into()))?
}
};
}
#[async_trait]
pub trait ISocket: Send + Sync + 'static {
fn core(&self) -> &Arc<SocketCore>;
fn mailbox(&self) -> MailboxSender;
async fn bind(&self, endpoint: &str) -> Result<(), ZmqError>;
async fn connect(&self, endpoint: &str) -> Result<(), ZmqError>;
async fn disconnect(&self, endpoint: &str) -> Result<(), ZmqError>;
async fn unbind(&self, endpoint: &str) -> Result<(), ZmqError>;
async fn send(&self, msg: Msg) -> Result<(), ZmqError>;
async fn recv(&self) -> Result<Msg, ZmqError>;
async fn send_multipart(&self, frames: Vec<Msg>) -> Result<(), ZmqError>;
async fn recv_multipart(&self) -> Result<Vec<Msg>, ZmqError>;
async fn set_option(&self, option: i32, value: &[u8]) -> Result<(), ZmqError>;
async fn get_option(&self, option: i32) -> Result<Vec<u8>, ZmqError>;
async fn close(&self) -> Result<(), ZmqError>;
async fn set_pattern_option(&self, option: i32, value: &[u8]) -> ZmqResult<()>;
async fn get_pattern_option(&self, option: i32) -> Result<Vec<u8>, ZmqError>;
async fn process_command(&self, command: Command) -> Result<bool, ZmqError>;
async fn handle_pipe_event(&self, pipe_id: usize, event_command: Command) -> Result<(), ZmqError>;
async fn pipe_attached(&self, pipe_read_id: usize, pipe_write_id: usize, peer_identity: Option<&[u8]>);
async fn update_peer_identity(&self, pipe_read_id: usize, identity: Option<Blob>);
async fn pipe_detached(&self, pipe_read_id: usize);
}
pub use events::{MonitorReceiver, MonitorSender, SocketEvent, DEFAULT_MONITOR_CAPACITY};
pub use options::*; pub use types::{Socket, SocketType};
pub(crate) fn create_socket_actor(
handle: usize, ctx: Context, socket_type: SocketType, ) -> Result<(Arc<dyn ISocket>, MailboxSender), ZmqError> {
let initial_options = SocketOptions::default();
SocketCore::create_and_spawn(handle, ctx, socket_type, initial_options)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct SourcePipeReadId(usize);