pub(crate) mod backend;
pub(crate) mod peer_loop;
pub(crate) mod registry;
pub(crate) mod writer;
use crate::async_rt::task::spawn;
use crate::codec::{CodecError, Message};
use crate::error::{Disconnected, SendError};
use crate::io_compat::AsyncVectoredWrite;
#[cfg(feature = "inproc")]
use crate::message::ZmqMessage;
use crate::PeerIdentity;
#[cfg(feature = "inproc")]
use crate::async_rt::notify::AsyncNotify;
use crate::async_rt::signaler::{AsyncSignaler, RuntimeSignaler};
#[cfg(all(test, feature = "tokio"))]
use flume::Receiver;
use flume::{Sender, TrySendError};
use futures::channel::oneshot;
use futures::FutureExt;
use parking_lot::Mutex as SyncMutex;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "inproc")]
pub(crate) type InprocInboundTx = crossbeam_channel::Sender<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>;
#[cfg(feature = "inproc")]
pub(crate) type InprocInboundRx = crossbeam_channel::Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>;
pub(crate) struct HeartbeatConfig {
pub interval: Duration,
pub timeout: Duration,
pub ttl: Duration,
}
pub(crate) struct Outbound {
pub(crate) msg: Message,
}
pub(crate) struct FlushState {
pub(crate) flushed: AtomicU64,
pub(crate) writer_alive: std::sync::atomic::AtomicBool,
pub(crate) signaler: Arc<RuntimeSignaler>,
pub(crate) flush_waiters: AtomicUsize,
}
impl FlushState {
#[inline]
pub(crate) fn notify_flush_waiters(&self) {
if self.flush_waiters.load(Ordering::Acquire) > 0 {
self.signaler.signal();
}
}
}
struct FlushWaiterGuard<'a> {
state: &'a FlushState,
}
impl<'a> FlushWaiterGuard<'a> {
#[inline]
fn new(state: &'a FlushState) -> Self {
state.flush_waiters.fetch_add(1, Ordering::Release);
Self { state }
}
}
impl Drop for FlushWaiterGuard<'_> {
#[inline]
fn drop(&mut self) {
self.state.flush_waiters.fetch_sub(1, Ordering::Release);
}
}
pub(crate) struct FlushToken {
target: u64,
state: Arc<FlushState>,
}
impl FlushToken {
pub(crate) async fn await_flush(self) -> Result<(), Disconnected> {
if self.state.flushed.load(Ordering::Acquire) >= self.target {
return Ok(());
}
if !self.state.writer_alive.load(Ordering::Acquire) {
return Err(Disconnected);
}
let _wg = FlushWaiterGuard::new(&self.state);
loop {
if self.state.flushed.load(Ordering::Acquire) >= self.target {
return Ok(());
}
if !self.state.writer_alive.load(Ordering::Acquire) {
return Err(Disconnected);
}
let fut = self.state.signaler.signaled();
if self.state.flushed.load(Ordering::Acquire) >= self.target {
return Ok(());
}
if !self.state.writer_alive.load(Ordering::Acquire) {
return Err(Disconnected);
}
fut.await;
}
}
}
pub(crate) struct PeerEngine {
outbound: Sender<Outbound>,
enqueue_lock: SyncMutex<()>,
enqueued: AtomicU64,
flush_state: Arc<FlushState>,
inline_write: Option<Arc<dyn crate::engine::writer::InlineWriteTarget>>,
inline_write_max: Option<usize>,
shutdown_tx: Option<oneshot::Sender<()>>,
}
pub(crate) type TaggedInboundTx = Sender<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>;
#[inline]
fn unwrap_outbound_err(e: TrySendError<Outbound>) -> TrySendError<Message> {
match e {
TrySendError::Full(o) => TrySendError::Full(o.msg),
TrySendError::Disconnected(o) => TrySendError::Disconnected(o.msg),
}
}
impl PeerEngine {
pub(crate) fn spawn<R, W>(
peer_key: crate::engine::registry::PeerKey,
_peer_id: PeerIdentity,
read_half: R,
write_half: W,
send_hwm: usize,
shared_inbound: TaggedInboundTx,
config: crate::engine::peer_loop::PeerConfig,
) -> Self
where
R: futures::Stream<Item = Result<Message, CodecError>> + Unpin + Send + 'static,
W: AsyncVectoredWrite + Send + Sync + 'static,
{
let (outbound_tx, outbound_rx) = flume::bounded::<Outbound>(send_hwm);
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let shutdown = shutdown_rx.shared();
let flush_state = Arc::new(FlushState {
flushed: AtomicU64::new(0),
writer_alive: std::sync::atomic::AtomicBool::new(true),
signaler: Arc::new(RuntimeSignaler::new().expect("signaler init")),
flush_waiters: AtomicUsize::new(0),
});
let inline_write_max: Option<usize> = config.inline_write_max.unwrap_or(None);
let inline_enabled = config.inline_write_max.is_some();
let inline_write = {
use crate::engine::peer_loop::{peer_loop, PeerChannels};
use crate::engine::writer::VectoredWriter;
let writer = VectoredWriter::new(write_half);
let inline: Option<Arc<dyn crate::engine::writer::InlineWriteTarget>> =
if inline_enabled {
Some(writer.inline_write_target())
} else {
None
};
spawn(peer_loop(
read_half,
writer,
PeerChannels {
outbound_rx,
shared_inbound,
},
peer_key,
flush_state.clone(),
shutdown.clone(),
config,
));
inline
};
Self {
outbound: outbound_tx,
enqueue_lock: SyncMutex::new(()),
enqueued: AtomicU64::new(0),
flush_state,
inline_write,
inline_write_max,
shutdown_tx: Some(shutdown_tx),
}
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) fn try_send(&self, msg: Message) -> Result<(), TrySendError<Message>> {
self.outbound
.try_send(Outbound { msg })
.map_err(unwrap_outbound_err)
}
#[inline]
fn try_inline_write(&self, msg: &Message) -> Option<Result<(), SendError>> {
let zmsg: &crate::message::ZmqMessage = match msg {
Message::Message(m) => m,
Message::Shared(arc) => arc.as_ref(),
_ => return None,
};
self.try_inline_zmsg(zmsg)
}
#[inline]
fn try_inline_zmsg(&self, msg: &crate::message::ZmqMessage) -> Option<Result<(), SendError>> {
let handle = self.inline_write.as_ref()?;
if !self.outbound.is_empty() {
return None;
}
let cap = self.inline_write_max;
let io_result = match msg.len() {
0 => return None,
1 => {
let payload = msg.get(0).expect("len==1").as_ref();
handle.try_inline_single_frame(payload, cap)?
}
_ => {
let mut frame_buf: [&[u8]; 4] = [&[]; 4];
let n = msg.len();
if n > frame_buf.len() {
return None;
}
for (i, frame) in msg.iter().enumerate() {
frame_buf[i] = frame.as_ref();
}
handle.try_inline_multi_frame(&frame_buf[..n], cap)?
}
};
match io_result {
Ok(()) => {
let _g = self.enqueue_lock.lock();
self.enqueued.fetch_add(1, Ordering::Relaxed);
self.flush_state.flushed.fetch_add(1, Ordering::Release);
self.flush_state.notify_flush_waiters();
crate::wake_counter::bump(&crate::wake_counter::INLINE_WRITES);
Some(Ok(()))
}
Err(_) => Some(Err(SendError::Flush)),
}
}
#[inline]
pub(crate) fn try_inline_fanout(
&self,
shared: &Arc<crate::message::ZmqMessage>,
) -> Option<Result<(), SendError>> {
self.try_inline_zmsg(shared.as_ref())
}
pub(crate) fn try_send_tracked(
&self,
msg: Message,
) -> Result<FlushToken, TrySendError<Message>> {
let _g = self.enqueue_lock.lock();
self.outbound
.try_send(Outbound { msg })
.map_err(unwrap_outbound_err)?;
let target = self.enqueued.fetch_add(1, Ordering::Relaxed) + 1;
Ok(FlushToken {
target,
state: self.flush_state.clone(),
})
}
pub(crate) fn try_send_fire_and_forget(
&self,
msg: Message,
) -> Result<(), TrySendError<Message>> {
self.outbound
.try_send(Outbound { msg })
.map_err(unwrap_outbound_err)
}
pub(crate) async fn send(&self, msg: Message) -> Result<(), SendError> {
if let Some(result) = self.try_inline_write(&msg) {
return result;
}
self.outbound
.send_async(Outbound { msg })
.await
.map_err(|e| SendError::Enqueue(e.0.msg))?;
self.enqueued.fetch_add(1, Ordering::Relaxed);
crate::async_rt::task::yield_now().await;
Ok(())
}
pub(crate) async fn send_flushed(&self, mut msg: Message) -> Result<(), SendError> {
if let Some(result) = self.try_inline_write(&msg) {
return result;
}
let _wg = FlushWaiterGuard::new(&self.flush_state);
let token = loop {
let slot_wake = self.flush_state.signaler.signaled();
let attempt = {
let _g = self.enqueue_lock.lock();
match self.outbound.try_send(Outbound { msg }) {
Ok(()) => {
let target = self.enqueued.fetch_add(1, Ordering::Relaxed) + 1;
Ok(FlushToken {
target,
state: self.flush_state.clone(),
})
}
Err(TrySendError::Full(o)) => Err(Err(o.msg)),
Err(TrySendError::Disconnected(o)) => Err(Ok(o.msg)),
}
};
match attempt {
Ok(token) => break token,
Err(Ok(returned)) => return Err(SendError::Enqueue(returned)),
Err(Err(returned)) => {
msg = returned;
if !self.flush_state.writer_alive.load(Ordering::Acquire) {
return Err(SendError::Enqueue(msg));
}
slot_wake.await;
}
}
};
token.await_flush().await.map_err(|_e| SendError::Flush)
}
pub(crate) fn writer_alive(&self) -> bool {
self.flush_state.writer_alive.load(Ordering::Acquire)
}
pub(crate) async fn drain_outbound(&self, timeout: Option<Duration>) {
let fut = async {
let enqueued = self.enqueued.load(Ordering::Acquire);
let flushed = self.flush_state.flushed.load(Ordering::Acquire);
if flushed >= enqueued {
return;
}
if !self.flush_state.writer_alive.load(Ordering::Acquire) {
return;
}
let _wg = FlushWaiterGuard::new(&self.flush_state);
loop {
let enqueued = self.enqueued.load(Ordering::Acquire);
let flushed = self.flush_state.flushed.load(Ordering::Acquire);
if flushed >= enqueued {
return;
}
if !self.flush_state.writer_alive.load(Ordering::Acquire) {
return;
}
self.flush_state.signaler.signaled().await;
}
};
match timeout {
None => fut.await,
Some(d) => {
let _ = crate::async_rt::task::timeout(d, fut).await;
}
}
}
}
impl Drop for PeerEngine {
fn drop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}
}
#[cfg(feature = "inproc")]
pub(crate) struct InprocEngine {
remote_inproc_tx: InprocInboundTx,
remote_notify: Arc<crate::async_rt::notify::RuntimeNotify>,
remote_peer_key: crate::engine::registry::PeerKey,
}
#[cfg(feature = "inproc")]
impl InprocEngine {
pub(crate) fn new(
remote_inproc_tx: InprocInboundTx,
remote_notify: Arc<crate::async_rt::notify::RuntimeNotify>,
remote_peer_key: crate::engine::registry::PeerKey,
) -> Self {
Self {
remote_inproc_tx,
remote_notify,
remote_peer_key,
}
}
pub(crate) fn try_send_direct(&self, zm: ZmqMessage) -> Result<(), bool> {
let item = (self.remote_peer_key, Ok(crate::codec::Message::Message(zm)));
match self.remote_inproc_tx.try_send(item) {
Ok(()) => {
self.remote_notify.notify_one();
Ok(())
}
Err(crossbeam_channel::TrySendError::Full(_)) => Err(true),
Err(crossbeam_channel::TrySendError::Disconnected(_)) => Err(false),
}
}
pub(crate) fn send_direct(&self, zm: ZmqMessage) -> Result<(), SendError> {
let item = (self.remote_peer_key, Ok(crate::codec::Message::Message(zm)));
match self.remote_inproc_tx.try_send(item) {
Ok(()) => {
self.remote_notify.notify_one();
Ok(())
}
Err(crossbeam_channel::TrySendError::Disconnected(_)) => Err(SendError::Flush),
Err(crossbeam_channel::TrySendError::Full(item)) => {
self.remote_inproc_tx
.send(item)
.map_err(|_e| SendError::Flush)?;
self.remote_notify.notify_one();
Ok(())
}
}
}
#[allow(clippy::unused_self)]
pub(crate) fn writer_alive(&self) -> bool {
true
}
}
#[cfg(feature = "inproc")]
impl Drop for InprocEngine {
fn drop(&mut self) {
let _ = self
.remote_inproc_tx
.try_send((self.remote_peer_key, Err(CodecError::PeerDisconnected)));
self.remote_notify.notify_one();
}
}
#[cfg(feature = "inproc")]
pub(crate) async fn connect_inproc_engine(
local_key: crate::engine::registry::PeerKey,
local_socket_type: crate::SocketType,
local_routing_id: Option<PeerIdentity>,
local_inproc_tx: InprocInboundTx,
local_notify: Arc<crate::async_rt::notify::RuntimeNotify>,
peer: crate::transport::inproc::InprocPeer,
) -> crate::ZmqResult<(InprocEngine, Option<PeerIdentity>)> {
const HANDSHAKE_DROPPED: &str = "inproc peer dropped during handshake";
peer.send_inbound
.send(crate::transport::inproc::InprocChannelInfo {
tx: local_inproc_tx,
notify: local_notify,
socket_type: local_socket_type,
routing_id: local_routing_id,
})
.map_err(|_info| crate::error::ZmqError::Socket(HANDSHAKE_DROPPED.into()))?;
let remote_info = peer
.recv_inbound
.await
.map_err(|_canceled| crate::error::ZmqError::Socket(HANDSHAKE_DROPPED.into()))?;
if !local_socket_type.compatible(remote_info.socket_type) {
log::warn!(
"inproc: incompatible socket types ({:?} ↔ {:?}); refusing connection",
local_socket_type,
remote_info.socket_type
);
return Err(crate::error::ZmqError::Socket(
"inproc peer socket type not compatible".into(),
));
}
peer.send_key
.send(local_key)
.map_err(|_key| crate::error::ZmqError::Socket(HANDSHAKE_DROPPED.into()))?;
let remote_key = peer
.recv_key
.await
.map_err(|_canceled| crate::error::ZmqError::Socket(HANDSHAKE_DROPPED.into()))?;
Ok((
InprocEngine::new(remote_info.tx, remote_info.notify, remote_key),
remote_info.routing_id,
))
}
#[cfg(feature = "inproc")]
pub(crate) fn inproc_placeholder_engine() -> InprocEngine {
let (tx, _rx) = crossbeam_channel::bounded(1);
let notify = Arc::new(crate::async_rt::notify::RuntimeNotify::new());
InprocEngine::new(tx, notify, 0)
}
#[cfg(all(test, feature = "tokio"))]
#[allow(clippy::type_complexity)]
mod tests {
use super::*;
use crate::async_rt;
use crate::codec::DefaultFramedIo as FramedIo;
use crate::message::ZmqMessage;
use bytes::Bytes;
use tokio::net::{TcpListener, TcpStream};
async fn tcp_pair() -> (FramedIo, FramedIo) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let connect_fut = TcpStream::connect(addr);
let (accept_res, connect_res) = futures::join!(listener.accept(), connect_fut);
let (server, _) = accept_res.unwrap();
let client = connect_res.unwrap();
let mut io_a = FramedIo::from_tcp(server);
let mut io_b = FramedIo::from_tcp(client);
let (greet_a, greet_b) = futures::join!(
crate::codec::handshake::greet_exchange(&mut io_a),
crate::codec::handshake::greet_exchange(&mut io_b),
);
greet_a.unwrap();
greet_b.unwrap();
(io_a, io_b)
}
fn spawn_pair(
io_a: FramedIo,
io_b: FramedIo,
send_hwm: usize,
) -> (
PeerEngine,
PeerEngine,
Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
) {
spawn_pair_with_inline(io_a, io_b, send_hwm, None)
}
#[allow(clippy::option_option)]
fn spawn_pair_with_inline(
io_a: FramedIo,
io_b: FramedIo,
send_hwm: usize,
inline_write_max: Option<Option<usize>>,
) -> (
PeerEngine,
PeerEngine,
Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
Receiver<(
crate::engine::registry::PeerKey,
Result<Message, CodecError>,
)>,
) {
#[cfg(feature = "curve")]
let (a_read, a_write, _) = io_a.into_parts();
#[cfg(not(feature = "curve"))]
let (a_read, a_write) = io_a.into_parts();
#[cfg(feature = "curve")]
let (b_read, b_write, _) = io_b.into_parts();
#[cfg(not(feature = "curve"))]
let (b_read, b_write) = io_b.into_parts();
let (a_tx, a_rx) = flume::bounded(1024);
let (b_tx, b_rx) = flume::bounded(1024);
let config_a = crate::engine::peer_loop::PeerConfig {
inline_write_max,
..Default::default()
};
let config_b = crate::engine::peer_loop::PeerConfig {
inline_write_max,
..Default::default()
};
let engine_a = PeerEngine::spawn(
0,
PeerIdentity::new(),
a_read,
a_write.into_engine_writer(),
send_hwm,
a_tx,
config_a,
);
let engine_b = PeerEngine::spawn(
1,
PeerIdentity::new(),
b_read,
b_write.into_engine_writer(),
send_hwm,
b_tx,
config_b,
);
(engine_a, engine_b, a_rx, b_rx)
}
#[async_rt::test]
async fn engine_roundtrip_tcp() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, engine_b, _a_rx, b_rx) = spawn_pair(io_a, io_b, 64);
for i in 0..100u32 {
let msg = ZmqMessage::from(Bytes::from(i.to_be_bytes().to_vec()));
engine_a.send(Message::Message(msg)).await.unwrap();
}
for i in 0..100u32 {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
let frame = m.get(0).expect("frame").clone();
assert_eq!(&frame[..], &i.to_be_bytes()[..]);
}
other => panic!("unexpected message variant: {:?}", other),
}
}
drop(engine_a);
drop(engine_b);
}
#[async_rt::test]
async fn engine_mixed_size_preserves_order() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, b_rx) = spawn_pair(io_a, io_b, 1024);
for i in 0..50u32 {
if i.is_multiple_of(2) {
let msg = ZmqMessage::from(Bytes::from(i.to_be_bytes().to_vec()));
engine_a.send(Message::Message(msg)).await.unwrap();
} else {
let mut payload = vec![i as u8; 256];
payload[0] = (i >> 24) as u8;
payload[1] = (i >> 16) as u8;
payload[2] = (i >> 8) as u8;
payload[3] = i as u8;
let msg = ZmqMessage::from(Bytes::from(payload));
engine_a.send(Message::Message(msg)).await.unwrap();
}
}
for i in 0..50u32 {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
let frame = m.get(0).expect("frame");
if i.is_multiple_of(2) {
assert_eq!(frame.len(), 4, "msg {} was the small variant", i);
assert_eq!(&frame[..], &i.to_be_bytes()[..]);
} else {
assert_eq!(frame.len(), 256, "msg {} was the large variant", i);
assert_eq!(&frame[..4], &i.to_be_bytes()[..]);
}
}
other => panic!("unexpected variant: {:?}", other),
}
}
}
#[async_rt::test]
async fn inline_disabled_returns_none() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, _b_rx) = spawn_pair(io_a, io_b, 64);
let msg = Message::Message(ZmqMessage::from(Bytes::from_static(b"x")));
assert!(engine_a.try_inline_write(&msg).is_none());
}
#[async_rt::test]
async fn inline_enabled_single_frame_sent() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, b_rx) = spawn_pair_with_inline(io_a, io_b, 64, Some(None));
let payload = Bytes::from_static(b"hello-inline");
for _ in 0..10 {
let msg = Message::Message(ZmqMessage::from(payload.clone()));
engine_a.send(msg).await.unwrap();
}
assert_eq!(
engine_a.enqueued.load(Ordering::Acquire),
engine_a.flush_state.flushed.load(Ordering::Acquire),
);
for _ in 0..10 {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
assert_eq!(&m.get(0).unwrap()[..], &payload[..]);
}
other => panic!("unexpected: {:?}", other),
}
}
}
#[async_rt::test]
async fn inline_payload_cap_partitions_paths() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, b_rx) =
spawn_pair_with_inline(io_a, io_b, 64, Some(Some(16)));
let small = Bytes::from_static(b"sml-08-B"); let large = Bytes::from(vec![0xAB; 32]); for i in 0..20u32 {
let payload = if i.is_multiple_of(2) {
small.clone()
} else {
large.clone()
};
let msg = Message::Message(ZmqMessage::from(payload));
engine_a.send(msg).await.unwrap();
}
for i in 0..20u32 {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
let frame = m.get(0).unwrap();
let expected_len = if i.is_multiple_of(2) { 8 } else { 32 };
assert_eq!(
frame.len(),
expected_len,
"message {} wrong size — FIFO broken?",
i
);
}
other => panic!("unexpected: {:?}", other),
}
}
}
#[async_rt::test]
async fn inline_multi_frame_delivers() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, b_rx) = spawn_pair_with_inline(io_a, io_b, 64, Some(None));
for i in 0..5u32 {
let mut m = ZmqMessage::from(Bytes::new());
m.push_back(Bytes::from(i.to_be_bytes().to_vec()));
assert_eq!(m.len(), 2);
engine_a.send(Message::Message(m)).await.unwrap();
}
for i in 0..5u32 {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
assert_eq!(m.len(), 2, "frame count");
assert!(m.get(0).unwrap().is_empty());
assert_eq!(&m.get(1).unwrap()[..], &i.to_be_bytes()[..]);
}
other => panic!("unexpected: {:?}", other),
}
}
}
#[async_rt::test]
async fn inline_fanout_enabled_sends() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, b_rx) = spawn_pair_with_inline(io_a, io_b, 64, Some(None));
let payload = Bytes::from_static(b"fanout-msg");
for _ in 0..5 {
let shared = Arc::new(ZmqMessage::from(payload.clone()));
match engine_a.try_inline_fanout(&shared) {
Some(Ok(())) => {}
other => panic!(
"expected inline Sent, got {:?}",
other.map(|r| r.map_err(|e| format!("{:?}", e)))
),
}
}
for _ in 0..5 {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
assert_eq!(&m.get(0).unwrap()[..], &payload[..]);
}
other => panic!("unexpected: {:?}", other),
}
}
}
#[async_rt::test]
async fn engine_batch_flush() {
let (io_a, io_b) = tcp_pair().await;
let (engine_a, _engine_b, _a_rx, b_rx) = spawn_pair(io_a, io_b, 2048);
let count = 1024u32;
for i in 0..count {
let msg = ZmqMessage::from(Bytes::from(i.to_be_bytes().to_vec()));
engine_a.send(Message::Message(msg)).await.unwrap();
}
for i in 0..count {
let (_peer, got) = b_rx.recv_async().await.expect("closed");
match got.expect("codec error") {
Message::Message(m) => {
let frame = m.get(0).expect("frame").clone();
assert_eq!(&frame[..], &i.to_be_bytes()[..]);
}
other => panic!("unexpected variant: {:?}", other),
}
}
}
}