#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::codec::mechanism::ZmqMechanism;
#[cfg(feature = "curve")]
use crate::codec::CurveFrame;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::codec::Message;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::codec::{CodecError, FramedIo, PlainFrame};
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::error::ZmqError;
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use crate::{SocketOptions, ZmqResult};
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use bytes::{BufMut, Bytes};
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
use futures::{Sink, SinkExt, Stream, StreamExt};
#[cfg(feature = "curve")]
use rand::Rng;
#[cfg(feature = "curve")]
pub(crate) struct CurveSession {
pub(crate) session_box: crypto_box::SalsaBox,
pub(crate) tx_nonce: u64,
pub(crate) rx_nonce: u64,
pub(crate) is_server: bool,
}
#[cfg(feature = "curve")]
impl std::fmt::Debug for CurveSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CurveSession")
.field("tx_nonce", &self.tx_nonce)
.field("rx_nonce", &self.rx_nonce)
.field("is_server", &self.is_server)
.finish()
}
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
#[derive(Debug, Default)]
pub(crate) struct SessionState {
pub username: Option<String>,
pub password: Option<String>,
#[cfg(feature = "curve")]
pub curve: Option<CurveSession>,
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
pub(crate) async fn mech_handshake<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
peer_mechanism: ZmqMechanism,
peer_greeting: &crate::codec::ZmqGreeting,
peer_addr: &str,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
match options.mechanism {
ZmqMechanism::NULL => match peer_mechanism {
ZmqMechanism::NULL => Ok(SessionState::default()),
other @ ZmqMechanism::PLAIN => Err(ZmqError::MechanismMismatch {
ours: "NULL",
peer: other.as_str(),
}),
#[cfg(feature = "curve")]
other @ ZmqMechanism::CURVE => Err(ZmqError::MechanismMismatch {
ours: "NULL",
peer: other.as_str(),
}),
},
ZmqMechanism::PLAIN => {
plain_handshake(
io,
options,
peer_mechanism,
peer_greeting,
peer_addr,
our_socket_type,
)
.await
}
#[cfg(feature = "curve")]
ZmqMechanism::CURVE => curve_handshake(io, options, peer_mechanism, our_socket_type).await,
}
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
async fn plain_handshake<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
peer_mechanism: ZmqMechanism,
peer_greeting: &crate::codec::ZmqGreeting,
peer_addr: &str,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
if !matches!(peer_mechanism, ZmqMechanism::PLAIN) {
return Err(ZmqError::MechanismMismatch {
ours: "PLAIN",
peer: peer_mechanism.as_str(),
});
}
if options.plain_server {
plain_server(io, options, peer_greeting, peer_addr, our_socket_type).await
} else {
plain_client(io, options, our_socket_type).await
}
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
async fn plain_server<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
peer_greeting: &crate::codec::ZmqGreeting,
peer_addr: &str,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
let raw = recv_security_raw(io).await?;
let frame = PlainFrame::try_from(raw).map_err(|e| ZmqError::PlainAuthFailed {
reason: e.to_string(),
})?;
let state = match frame {
PlainFrame::Hello { username, password } => {
if let (Some(exp_u), Some(exp_p)) = (&options.plain_username, &options.plain_password) {
if username != *exp_u || password != *exp_p {
send_plain_error(io, "Invalid username or password").await;
return Err(ZmqError::PlainAuthFailed {
reason: "wrong credentials".into(),
});
}
}
SessionState {
username: String::from_utf8(username.to_vec()).ok(),
password: String::from_utf8(password.to_vec()).ok(),
#[cfg(feature = "curve")]
curve: None,
}
}
PlainFrame::Error { reason } => {
return Err(ZmqError::PlainAuthFailed {
reason: format!("client sent ERROR: {reason}"),
});
}
_ => {
return Err(ZmqError::PlainAuthFailed {
reason: "expected HELLO".into(),
});
}
};
if let Some(ref domain) = options.zap_domain {
if let Err(e) = crate::zap::zap_check(domain, peer_greeting, &state, peer_addr, None).await
{
send_plain_error(io, "Access denied").await;
return Err(e);
}
}
send_security_frame(io, PlainFrame::Welcome.into()).await?;
let raw = recv_security_raw(io).await?;
match PlainFrame::try_from(raw).map_err(|e| ZmqError::PlainAuthFailed {
reason: e.to_string(),
})? {
PlainFrame::Initiate { .. } => {}
PlainFrame::Error { reason } => {
return Err(ZmqError::PlainAuthFailed { reason });
}
_ => {
return Err(ZmqError::PlainAuthFailed {
reason: "expected INITIATE".into(),
})
}
};
let our_metadata = encode_metadata(our_socket_type, options.peer_id.as_ref());
send_security_frame(
io,
PlainFrame::Ready {
metadata: our_metadata,
}
.into(),
)
.await?;
Ok(state)
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
async fn plain_client<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
let username = options
.plain_username
.clone()
.ok_or(ZmqError::Other("PLAIN client: username not set".into()))?;
let password = options
.plain_password
.clone()
.ok_or(ZmqError::Other("PLAIN client: password not set".into()))?;
send_security_frame(io, PlainFrame::Hello { username, password }.into()).await?;
let raw = recv_security_raw(io).await?;
match PlainFrame::try_from(raw).map_err(|e| ZmqError::PlainAuthFailed {
reason: e.to_string(),
})? {
PlainFrame::Welcome => {}
PlainFrame::Error { reason } => return Err(ZmqError::PlainAuthFailed { reason }),
_ => {
return Err(ZmqError::PlainAuthFailed {
reason: "expected WELCOME or ERROR".into(),
})
}
}
let our_metadata = encode_metadata(our_socket_type, options.peer_id.as_ref());
send_security_frame(
io,
PlainFrame::Initiate {
metadata: our_metadata,
}
.into(),
)
.await?;
let raw = recv_security_raw(io).await?;
match PlainFrame::try_from(raw).map_err(|e| ZmqError::PlainAuthFailed {
reason: e.to_string(),
})? {
PlainFrame::Ready { .. } => {}
PlainFrame::Error { reason } => return Err(ZmqError::PlainAuthFailed { reason }),
_ => {
return Err(ZmqError::PlainAuthFailed {
reason: "expected READY or ERROR".into(),
})
}
};
Ok(SessionState::default())
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
async fn send_plain_error<R, W>(io: &mut FramedIo<R, W>, reason: &str)
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
let _ = send_security_frame(
io,
PlainFrame::Error {
reason: reason.to_string(),
}
.into(),
)
.await;
}
#[cfg(all(
feature = "curve",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
async fn curve_handshake<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
peer_mechanism: ZmqMechanism,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
if !matches!(peer_mechanism, ZmqMechanism::CURVE) {
return Err(ZmqError::MechanismMismatch {
ours: "CURVE",
peer: peer_mechanism.as_str(),
});
}
if options.curve_server {
curve_server(io, options, our_socket_type).await
} else {
curve_client(io, options, our_socket_type).await
}
}
#[cfg(all(
feature = "curve",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
async fn curve_client<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
use crypto_box::{
aead::{Aead, OsRng},
PublicKey, SalsaBox, SecretKey,
};
let server_pub_bytes = options
.curve_server_key
.ok_or(ZmqError::Other("CURVE client: server_key not set".into()))?;
let our_pub_bytes = options
.curve_public_key
.ok_or(ZmqError::Other("CURVE client: public_key not set".into()))?;
let our_sec_bytes = options
.curve_secret_key
.ok_or(ZmqError::Other("CURVE client: secret_key not set".into()))?;
let server_pub = PublicKey::from(server_pub_bytes);
let our_permanent_sec = SecretKey::from(our_sec_bytes);
let our_permanent_pub = PublicKey::from(our_pub_bytes);
let client_eph_sec = SecretKey::generate(&mut OsRng);
let client_eph_pub = client_eph_sec.public_key();
let hello_nonce_ctr: u64 = 1;
let hello_nonce = build_nonce(b"CurveZMQHELLO---", hello_nonce_ctr);
let hello_box = SalsaBox::new(&server_pub, &client_eph_sec);
let hello_cipher = hello_box
.encrypt(&hello_nonce, [0u8; 64].as_ref())
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "HELLO encrypt failed".into(),
})?;
send_curve_frame(
io,
CurveFrame::Hello {
version: (1, 0),
client_ephemeral_pub: *client_eph_pub.as_bytes(),
nonce_short: hello_nonce_ctr.to_be_bytes(),
box_: Bytes::from(hello_cipher),
},
)
.await?;
let raw = recv_security_raw(io).await?;
let (welcome_nonce_random, welcome_box_bytes) =
match CurveFrame::try_from(raw).map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "WELCOME parse failed".into(),
})? {
CurveFrame::Welcome { nonce_random, box_ } => (nonce_random, box_),
CurveFrame::Error { reason } => {
return Err(ZmqError::CurveHandshakeFailed {
reason: format!("server sent ERROR: {reason}").into(),
})
}
_ => {
return Err(ZmqError::CurveHandshakeFailed {
reason: "expected WELCOME".into(),
})
}
};
let mut welcome_nonce = [0u8; 24];
welcome_nonce[..8].copy_from_slice(b"WELCOME-");
welcome_nonce[8..24].copy_from_slice(&welcome_nonce_random);
let welcome_dec = SalsaBox::new(&server_pub, &client_eph_sec);
let welcome_plain = welcome_dec
.decrypt(
&crypto_box::Nonce::clone_from_slice(&welcome_nonce),
welcome_box_bytes.as_ref(),
)
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "WELCOME decrypt failed".into(),
})?;
if welcome_plain.len() < 128 {
return Err(ZmqError::CurveHandshakeFailed {
reason: "WELCOME plaintext too short".into(),
});
}
let mut server_eph_bytes = [0u8; 32];
server_eph_bytes.copy_from_slice(&welcome_plain[..32]);
let server_eph_pub = PublicKey::from(server_eph_bytes);
let mut cookie_nonce_arr = [0u8; 16];
cookie_nonce_arr.copy_from_slice(&welcome_plain[32..48]);
let cookie_cipher = Bytes::copy_from_slice(&welcome_plain[48..128]);
let mut vouch_nonce_random = [0u8; 16];
rand::rng().fill_bytes(&mut vouch_nonce_random);
let mut vouch_nonce = [0u8; 24];
vouch_nonce[..8].copy_from_slice(b"VOUCH---");
vouch_nonce[8..24].copy_from_slice(&vouch_nonce_random);
let vouch_box = SalsaBox::new(&server_eph_pub, &our_permanent_sec);
let mut vouch_plain = Vec::with_capacity(64);
vouch_plain.extend_from_slice(client_eph_pub.as_bytes());
vouch_plain.extend_from_slice(&server_pub_bytes);
let vouch_cipher = vouch_box
.encrypt(
&crypto_box::Nonce::clone_from_slice(&vouch_nonce),
vouch_plain.as_ref(),
)
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "vouch encrypt failed".into(),
})?;
let metadata = encode_metadata(our_socket_type, options.peer_id.as_ref());
let init_nonce_ctr: u64 = 1;
let init_nonce = build_nonce(b"CurveZMQINITIATE", init_nonce_ctr);
let init_box = SalsaBox::new(&server_eph_pub, &client_eph_sec);
let mut init_plain = Vec::new();
init_plain.extend_from_slice(our_permanent_pub.as_bytes()); init_plain.extend_from_slice(&vouch_nonce_random); init_plain.extend_from_slice(&vouch_cipher); init_plain.extend_from_slice(&metadata); let init_cipher = init_box
.encrypt(&init_nonce, init_plain.as_ref())
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "INITIATE encrypt failed".into(),
})?;
send_curve_frame(
io,
CurveFrame::Initiate {
cookie_nonce: cookie_nonce_arr,
cookie_cipher,
nonce_short: init_nonce_ctr.to_be_bytes(),
box_: Bytes::from(init_cipher),
},
)
.await?;
let raw = recv_security_raw(io).await?;
match CurveFrame::try_from(raw).map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "READY parse failed".into(),
})? {
CurveFrame::Ready {
nonce_short,
box_: ready_box_bytes,
} => {
let ready_nonce_ctr = u64::from_be_bytes(nonce_short);
let ready_nonce = build_nonce(b"CurveZMQREADY---", ready_nonce_ctr);
let ready_dec = SalsaBox::new(&server_eph_pub, &client_eph_sec);
let _ready_plain = ready_dec
.decrypt(&ready_nonce, ready_box_bytes.as_ref())
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "READY decrypt failed".into(),
})?;
let session_box = SalsaBox::new(&server_eph_pub, &client_eph_sec);
Ok(SessionState {
curve: Some(CurveSession {
session_box,
tx_nonce: 3,
rx_nonce: 1,
is_server: false,
}),
..Default::default()
})
}
CurveFrame::Error { reason } => Err(ZmqError::CurveHandshakeFailed {
reason: format!("server denied: {reason}").into(),
}),
_ => Err(ZmqError::CurveHandshakeFailed {
reason: "expected READY".into(),
}),
}
}
#[cfg(all(
feature = "curve",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
async fn curve_server<R, W>(
io: &mut FramedIo<R, W>,
options: &SocketOptions,
our_socket_type: crate::SocketType,
) -> ZmqResult<SessionState>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
use crypto_box::{
aead::{Aead, OsRng},
PublicKey, SalsaBox, SecretKey,
};
let our_pub_bytes = options
.curve_public_key
.ok_or(ZmqError::Other("CURVE server: public_key not set".into()))?;
let our_sec_bytes = options
.curve_secret_key
.ok_or(ZmqError::Other("CURVE server: secret_key not set".into()))?;
let our_permanent_sec = SecretKey::from(our_sec_bytes);
let raw = recv_security_raw(io).await?;
let (client_eph_bytes, _hello_nonce_short) =
match CurveFrame::try_from(raw).map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "HELLO parse failed".into(),
})? {
CurveFrame::Hello {
client_ephemeral_pub,
nonce_short,
..
} => (client_ephemeral_pub, nonce_short),
_ => {
return Err(ZmqError::CurveHandshakeFailed {
reason: "expected HELLO".into(),
})
}
};
let client_eph_pub = PublicKey::from(client_eph_bytes);
let server_eph_sec = SecretKey::generate(&mut OsRng);
let server_eph_pub = server_eph_sec.public_key();
let mut cookie_nonce_random = [0u8; 16];
rand::rng().fill_bytes(&mut cookie_nonce_random);
let mut cookie_nonce = [0u8; 24];
cookie_nonce[..8].copy_from_slice(b"COOKIE--");
cookie_nonce[8..24].copy_from_slice(&cookie_nonce_random);
let cookie_box_key = SalsaBox::new(&client_eph_pub, &our_permanent_sec);
let mut cookie_plain = Vec::with_capacity(64);
cookie_plain.extend_from_slice(&client_eph_bytes);
cookie_plain.extend_from_slice(&server_eph_sec.to_bytes());
let cookie_cipher_bytes = cookie_box_key
.encrypt(
&crypto_box::Nonce::clone_from_slice(&cookie_nonce),
cookie_plain.as_ref(),
)
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "cookie encrypt failed".into(),
})?;
let mut welcome_nonce_random = [0u8; 16];
rand::rng().fill_bytes(&mut welcome_nonce_random);
let mut welcome_nonce = [0u8; 24];
welcome_nonce[..8].copy_from_slice(b"WELCOME-");
welcome_nonce[8..24].copy_from_slice(&welcome_nonce_random);
let welcome_box_key = SalsaBox::new(&client_eph_pub, &our_permanent_sec);
let mut welcome_plain = Vec::with_capacity(128);
welcome_plain.extend_from_slice(server_eph_pub.as_bytes()); welcome_plain.extend_from_slice(&cookie_nonce_random); welcome_plain.extend_from_slice(&cookie_cipher_bytes); let welcome_cipher = welcome_box_key
.encrypt(
&crypto_box::Nonce::clone_from_slice(&welcome_nonce),
welcome_plain.as_ref(),
)
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "WELCOME encrypt failed".into(),
})?;
send_curve_frame(
io,
CurveFrame::Welcome {
nonce_random: welcome_nonce_random,
box_: Bytes::from(welcome_cipher),
},
)
.await?;
let raw = recv_security_raw(io).await?;
let (recv_cookie_nonce, recv_cookie_cipher, init_nonce_short, init_box_bytes) =
match CurveFrame::try_from(raw).map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "INITIATE parse failed".into(),
})? {
CurveFrame::Initiate {
cookie_nonce,
cookie_cipher,
nonce_short,
box_,
} => (cookie_nonce, cookie_cipher, nonce_short, box_),
_ => {
return Err(ZmqError::CurveHandshakeFailed {
reason: "expected INITIATE".into(),
})
}
};
let mut verify_cookie_nonce = [0u8; 24];
verify_cookie_nonce[..8].copy_from_slice(b"COOKIE--");
verify_cookie_nonce[8..24].copy_from_slice(&recv_cookie_nonce);
let cookie_verify_key = SalsaBox::new(&client_eph_pub, &our_permanent_sec);
let cookie_plain_recv = cookie_verify_key
.decrypt(
&crypto_box::Nonce::clone_from_slice(&verify_cookie_nonce),
recv_cookie_cipher.as_ref(),
)
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "cookie decrypt failed".into(),
})?;
if cookie_plain_recv.len() < 64 {
return Err(ZmqError::CurveHandshakeFailed {
reason: "cookie plaintext too short".into(),
});
}
if cookie_plain_recv[..32] != client_eph_bytes {
return Err(ZmqError::CurveHandshakeFailed {
reason: "cookie C' mismatch".into(),
});
}
let init_nonce_ctr = u64::from_be_bytes(init_nonce_short);
let init_nonce = build_nonce(b"CurveZMQINITIATE", init_nonce_ctr);
let init_dec = SalsaBox::new(&client_eph_pub, &server_eph_sec);
let init_plain = init_dec
.decrypt(&init_nonce, init_box_bytes.as_ref())
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "INITIATE box decrypt failed".into(),
})?;
const VOUCH_CIPHER_LEN: usize = 80;
if init_plain.len() < 32 + 16 + VOUCH_CIPHER_LEN {
return Err(ZmqError::CurveHandshakeFailed {
reason: "INITIATE plaintext too short".into(),
});
}
let mut client_perm_bytes = [0u8; 32];
client_perm_bytes.copy_from_slice(&init_plain[..32]);
let client_perm_pub = PublicKey::from(client_perm_bytes);
let vouch_nonce_random = &init_plain[32..48];
let vouch_cipher = &init_plain[48..48 + VOUCH_CIPHER_LEN];
let mut vouch_nonce = [0u8; 24];
vouch_nonce[..8].copy_from_slice(b"VOUCH---");
vouch_nonce[8..24].copy_from_slice(vouch_nonce_random);
let vouch_dec = SalsaBox::new(&client_perm_pub, &server_eph_sec);
let vouch_plain = vouch_dec
.decrypt(
&crypto_box::Nonce::clone_from_slice(&vouch_nonce),
vouch_cipher,
)
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "vouch decrypt failed".into(),
})?;
if vouch_plain.len() < 32 || vouch_plain[..32] != client_eph_bytes {
return Err(ZmqError::CurveHandshakeFailed {
reason: "vouch C' mismatch".into(),
});
}
if vouch_plain.len() >= 64 && vouch_plain[32..64] != our_pub_bytes {
return Err(ZmqError::CurveHandshakeFailed {
reason: "vouch S mismatch".into(),
});
}
let server_metadata = encode_metadata(our_socket_type, options.peer_id.as_ref());
let ready_nonce_ctr: u64 = 1;
let ready_nonce = build_nonce(b"CurveZMQREADY---", ready_nonce_ctr);
let ready_box = SalsaBox::new(&client_eph_pub, &server_eph_sec);
let ready_cipher = ready_box
.encrypt(&ready_nonce, server_metadata.as_ref())
.map_err(|_e| ZmqError::CurveHandshakeFailed {
reason: "READY encrypt failed".into(),
})?;
send_curve_frame(
io,
CurveFrame::Ready {
nonce_short: ready_nonce_ctr.to_be_bytes(),
box_: Bytes::from(ready_cipher),
},
)
.await?;
let session_box = SalsaBox::new(&client_eph_pub, &server_eph_sec);
Ok(SessionState {
curve: Some(CurveSession {
session_box,
tx_nonce: 2,
rx_nonce: 2,
is_server: true,
}),
..Default::default()
})
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
async fn recv_security_raw<R, W>(io: &mut FramedIo<R, W>) -> ZmqResult<Bytes>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
match io.read_half.next().await {
Some(Ok(Message::SecurityRaw(b))) => Ok(b),
Some(Ok(_)) => Err(ZmqError::Other(
"security handshake: unexpected message type".into(),
)),
Some(Err(e)) => Err(e.into()),
None => Err(ZmqError::Other(
"security handshake: peer closed connection".into(),
)),
}
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
async fn send_security_frame<R, W>(io: &mut FramedIo<R, W>, buf: bytes::BytesMut) -> ZmqResult<()>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
io.write_half
.send(Message::SecurityRaw(buf.freeze()))
.await
.map_err(ZmqError::from)
}
#[cfg(all(
feature = "curve",
any(feature = "tcp", all(feature = "ipc", target_family = "unix"))
))]
async fn send_curve_frame<R, W>(io: &mut FramedIo<R, W>, frame: CurveFrame) -> ZmqResult<()>
where
R: Stream<Item = Result<Message, CodecError>> + Unpin,
W: Sink<Message, Error = CodecError> + Unpin,
{
let encoded: bytes::BytesMut = frame.into();
send_security_frame(io, encoded).await
}
#[cfg(any(feature = "tcp", all(feature = "ipc", target_family = "unix")))]
fn encode_metadata(
socket_type: crate::SocketType,
identity: Option<&crate::PeerIdentity>,
) -> Bytes {
let socket_type_str = socket_type.as_str();
let mut buf = bytes::BytesMut::new();
let key = b"Socket-Type";
let val = socket_type_str.as_bytes();
buf.put_u8(key.len() as u8);
buf.extend_from_slice(key);
buf.put_u32(val.len() as u32);
buf.extend_from_slice(val);
if let Some(id) = identity {
if !id.is_empty() {
let key2 = b"Identity";
buf.put_u8(key2.len() as u8);
buf.extend_from_slice(key2);
buf.put_u32(id.len() as u32);
buf.extend_from_slice(id.as_ref());
}
}
buf.freeze()
}
#[cfg(feature = "curve")]
pub(crate) fn build_nonce(prefix: &[u8; 16], counter: u64) -> crypto_box::Nonce {
let mut nonce = [0u8; 24];
nonce[..16].copy_from_slice(prefix);
nonce[16..24].copy_from_slice(&counter.to_be_bytes());
crypto_box::Nonce::clone_from_slice(&nonce)
}