#![recursion_limit = "1024"]
mod async_rt;
mod backend;
mod codec;
mod dealer;
mod endpoint;
mod error;
mod fair_queue;
mod message;
mod r#pub;
mod pull;
mod push;
mod rep;
mod req;
mod router;
mod sub;
mod task_handle;
mod transport;
pub mod util;
#[doc(hidden)]
pub mod __async_rt {
pub use super::async_rt::*;
}
pub use crate::dealer::*;
pub use crate::endpoint::{Endpoint, Host, Transport, TryIntoEndpoint};
pub use crate::error::{ZmqError, ZmqResult};
pub use crate::message::*;
pub use crate::pull::*;
pub use crate::push::*;
pub use crate::r#pub::*;
pub use crate::rep::*;
pub use crate::req::*;
pub use crate::router::*;
pub use crate::sub::*;
use crate::codec::*;
use crate::transport::AcceptStopHandle;
use util::PeerIdentity;
use async_trait::async_trait;
use asynchronous_codec::FramedWrite;
use futures_channel::mpsc;
use futures_util::{select, FutureExt};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
use std::str::FromStr;
use std::sync::Arc;
const COMPATIBILITY_MATRIX: [u8; 121] = [
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, ];
#[allow(clippy::upper_case_acronyms)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(usize)]
pub enum SocketType {
PAIR = 0,
PUB = 1,
SUB = 2,
REQ = 3,
REP = 4,
DEALER = 5,
ROUTER = 6,
PULL = 7,
PUSH = 8,
XPUB = 9,
XSUB = 10,
STREAM = 11,
}
impl SocketType {
pub const fn as_str(&self) -> &'static str {
match self {
SocketType::PAIR => "PAIR",
SocketType::PUB => "PUB",
SocketType::SUB => "SUB",
SocketType::REQ => "REQ",
SocketType::REP => "REP",
SocketType::DEALER => "DEALER",
SocketType::ROUTER => "ROUTER",
SocketType::PULL => "PULL",
SocketType::PUSH => "PUSH",
SocketType::XPUB => "XPUB",
SocketType::XSUB => "XSUB",
SocketType::STREAM => "STREAM",
}
}
pub fn compatible(&self, other: SocketType) -> bool {
let row_index = *self as usize;
let col_index = other as usize;
COMPATIBILITY_MATRIX[row_index * 11 + col_index] != 0
}
}
impl FromStr for SocketType {
type Err = ZmqError;
#[inline]
fn from_str(s: &str) -> Result<Self, ZmqError> {
Self::try_from(s.as_bytes())
}
}
impl TryFrom<&[u8]> for SocketType {
type Error = ZmqError;
fn try_from(s: &[u8]) -> Result<Self, ZmqError> {
Ok(match s {
b"PAIR" => SocketType::PAIR,
b"PUB" => SocketType::PUB,
b"SUB" => SocketType::SUB,
b"REQ" => SocketType::REQ,
b"REP" => SocketType::REP,
b"DEALER" => SocketType::DEALER,
b"ROUTER" => SocketType::ROUTER,
b"PULL" => SocketType::PULL,
b"PUSH" => SocketType::PUSH,
b"XPUB" => SocketType::XPUB,
b"XSUB" => SocketType::XSUB,
b"STREAM" => SocketType::STREAM,
_ => return Err(ZmqError::Other("Unknown socket type")),
})
}
}
impl Display for SocketType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug)]
pub enum SocketEvent {
Connected(Endpoint, PeerIdentity),
ConnectDelayed,
ConnectRetried,
Listening(Endpoint),
Accepted(Endpoint, PeerIdentity),
AcceptFailed(ZmqError),
Closed,
CloseFailed,
Disconnected(PeerIdentity),
}
#[derive(Default)]
pub struct SocketOptions {
pub(crate) peer_id: Option<PeerIdentity>,
}
impl SocketOptions {
pub fn peer_identity(&mut self, peer_id: PeerIdentity) -> &mut Self {
self.peer_id = Some(peer_id);
self
}
}
#[async_trait]
pub trait MultiPeerBackend: SocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo);
fn peer_disconnected(&self, peer_id: &PeerIdentity);
}
pub trait SocketBackend: Send + Sync {
fn socket_type(&self) -> SocketType;
fn socket_options(&self) -> &SocketOptions;
fn shutdown(&self);
fn monitor(&self) -> &Mutex<Option<mpsc::Sender<SocketEvent>>>;
}
#[async_trait]
pub trait SocketRecv {
async fn recv(&mut self) -> ZmqResult<ZmqMessage>;
}
#[async_trait]
pub trait SocketSend {
async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()>;
}
pub trait CaptureSocket: SocketSend {}
#[async_trait]
pub trait Socket: Sized + Send {
fn new() -> Self {
Self::with_options(SocketOptions::default())
}
fn with_options(options: SocketOptions) -> Self;
fn backend(&self) -> Arc<dyn MultiPeerBackend>;
async fn bind(&mut self, endpoint: &str) -> ZmqResult<Endpoint> {
let endpoint = TryIntoEndpoint::try_into(endpoint)?;
let cloned_backend = self.backend();
let cback = move |result| {
let cloned_backend = cloned_backend.clone();
async move {
let result = match result {
Ok((socket, endpoint)) => {
match util::peer_connected(socket, cloned_backend.clone()).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
}
}
Err(e) => Err(e),
};
match result {
Ok((endpoint, peer_id)) => {
if let Some(monitor) = cloned_backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Accepted(endpoint, peer_id));
}
}
Err(e) => {
if let Some(monitor) = cloned_backend.monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::AcceptFailed(e));
}
}
}
}
};
let (endpoint, stop_handle) = transport::begin_accept(endpoint, cback).await?;
if let Some(monitor) = self.backend().monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Listening(endpoint.clone()));
}
self.binds().insert(endpoint.clone(), stop_handle);
Ok(endpoint)
}
fn binds(&mut self) -> &mut HashMap<Endpoint, AcceptStopHandle>;
async fn unbind(&mut self, endpoint: Endpoint) -> ZmqResult<()> {
let stop_handle = self.binds().remove(&endpoint);
let stop_handle = stop_handle.ok_or(ZmqError::NoSuchBind(endpoint))?;
stop_handle.0.shutdown().await
}
async fn unbind_all(&mut self) -> Vec<ZmqError> {
let mut errs = Vec::new();
let endpoints: Vec<_> = self
.binds()
.iter()
.map(|(endpoint, _)| endpoint.clone())
.collect();
for endpoint in endpoints {
if let Err(err) = self.unbind(endpoint).await {
errs.push(err);
}
}
errs
}
async fn connect(&mut self, endpoint: &str) -> ZmqResult<()> {
let backend = self.backend();
let endpoint = TryIntoEndpoint::try_into(endpoint)?;
let result = match util::connect_forever(endpoint).await {
Ok((socket, endpoint)) => match util::peer_connected(socket, backend).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
},
Err(e) => Err(e),
};
match result {
Ok((endpoint, peer_id)) => {
if let Some(monitor) = self.backend().monitor().lock().as_mut() {
let _ = monitor.try_send(SocketEvent::Connected(endpoint, peer_id));
}
Ok(())
}
Err(e) => Err(e),
}
}
fn monitor(&mut self) -> mpsc::Receiver<SocketEvent>;
async fn close(mut self) -> Vec<ZmqError> {
self.unbind_all().await
}
}
pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + SocketRecv>(
mut frontend: Frontend,
mut backend: Backend,
mut capture: Option<Box<dyn CaptureSocket>>,
) -> ZmqResult<()> {
loop {
select! {
frontend_mess = frontend.recv().fuse() => {
match frontend_mess {
Ok(message) => {
if let Some(capture) = &mut capture {
capture.send(message.clone()).await?;
}
backend.send(message).await?;
}
Err(_) => {
todo!()
}
}
},
backend_mess = backend.recv().fuse() => {
match backend_mess {
Ok(message) => {
if let Some(capture) = &mut capture {
capture.send(message.clone()).await?;
}
frontend.send(message).await?;
}
Err(_) => {
todo!()
}
}
}
};
}
}
pub mod prelude {
pub use crate::{Socket, SocketRecv, SocketSend, TryIntoEndpoint};
}