mod common;
#[cfg(test)]
mod tests {
mod broker_tests;
mod bus_tests;
mod future_tests;
mod mem_tests;
mod msg_tests;
mod options_tests;
mod pair_tests;
mod pipe_tests;
mod pubsub_tests;
mod pushpull_tests;
mod reqrep_tests;
mod stats_tests;
mod stream_tests;
use crate::common::*;
use futures::{executor::block_on, future};
use runng::{asyncio::*, factory::latest::ProtocolFactory, protocol::*, *};
use std::{thread, time::Duration};
#[test]
fn listenerdialer() -> runng::Result<()> {
let url = get_url();
let factory = ProtocolFactory::default();
let replier = factory.replier_open()?;
{
{
let listener = replier.listener_create(&url)?;
listener.start()?;
let requester = factory.requester_open()?;
{
let req_dialer = requester.dialer_create(&url)?;
assert_eq!(
url,
req_dialer
.get_string(NngOption::URL)
.unwrap()
.to_str()
.unwrap()
);
req_dialer.start()?;
requester.sendmsg(msg::NngMsg::new()?)?;
let _request = replier.recvmsg()?;
}
requester.sendmsg(msg::NngMsg::new()?)?;
let _request = replier.recvmsg()?;
}
let mut requester = factory.requester_open()?;
requester.dial(&url)?;
requester.sendmsg(msg::NngMsg::new()?)?;
let _request = replier.recvmsg()?;
}
Ok(())
}
#[test]
fn results() -> std::result::Result<(), runng_sys::EnumFromIntError> {
use std::convert::TryFrom;
assert_eq!(
NngErrno::EINTR,
NngErrno::try_from(runng_sys::NNG_EINTR as i32).unwrap()
);
Ok(())
}
#[test]
fn pubsub() -> runng::Result<()> {
let url = get_url();
let factory = ProtocolFactory::default();
let mut publisher = factory.publisher_open()?;
publisher.listen(&url)?;
let mut subscriber = factory.subscriber_open()?;
subscriber.dial(&url)?;
let num_msg_per_subscriber = 4;
let sub_thread = thread::spawn(move || -> runng::Result<()> {
let mut sub_ctx = subscriber.create_async_stream(1)?;
let topic: Vec<u8> = vec![0; 4];
sub_ctx.subscribe(topic.as_slice())?;
let fut = sub_ctx
.receive()
.unwrap()
.take_while(|res| {
const SIZE_OF_TOPIC: usize = std::mem::size_of::<u32>();
let is_taking = match res {
Ok(msg) => msg.len() - SIZE_OF_TOPIC > 0,
Err(_) => false,
};
future::ready(is_taking)
})
.for_each(|_| {
future::ready(())
});
block_on(fut);
Ok(())
});
let pub_thread = thread::spawn(move || -> runng::Result<()> {
let mut pub_ctx = publisher.create_async()?;
let mut msg = msg::NngMsg::new()?;
msg.append_u32(0)?; msg.append_u32(1)?;
for _ in 0..num_msg_per_subscriber {
let msg = msg.dup()?;
block_on(pub_ctx.send(msg))?;
thread::sleep(Duration::from_millis(25));
}
let mut msg = msg::NngMsg::new()?;
msg.append_u32(0)?; block_on(pub_ctx.send(msg))?;
Ok(())
});
sub_thread.join().unwrap()?;
pub_thread.join().unwrap()?;
Ok(())
}
#[test]
fn broker() -> runng::Result<()> {
let url_broker_in = get_url();
let url_broker_out = get_url();
let factory = ProtocolFactory::default();
let mut broker_pull = factory.puller_open()?;
broker_pull.listen(&url_broker_in)?;
let mut broker_push = factory.publisher_open()?;
broker_push.listen(&url_broker_out)?;
thread::sleep(Duration::from_millis(50));
thread::spawn(move || -> runng::Result<()> {
let mut broker_pull_ctx = broker_pull.create_async_stream(1)?;
let mut broker_push_ctx = broker_push.create_async()?;
let fut = broker_pull_ctx.receive().unwrap().for_each(|msg| {
if let Ok(msg) = msg {
futures::future::Either::Left(broker_push_ctx.send(msg).then(|res| {
res.unwrap();
future::ready(())
}))
} else {
futures::future::Either::Right(future::ready(()))
}
});
block_on(fut);
Ok(())
});
let mut publisher = factory.pusher_open()?;
publisher.dial(&url_broker_in)?;
let mut subscriber = factory.subscriber_open()?;
subscriber.dial(&url_broker_out)?;
thread::spawn(move || -> runng::Result<()> {
let mut sub_ctx = subscriber.create_async_stream(1)?;
let topic: Vec<u8> = vec![0; 4];
sub_ctx.subscribe(topic.as_slice())?;
let fut = sub_ctx
.receive()
.unwrap()
.take_while(|res| {
const SIZE_OF_TOPIC: usize = std::mem::size_of::<u32>();
let is_taking = match res {
Ok(msg) => msg.len() - SIZE_OF_TOPIC > 0,
Err(_) => false,
};
future::ready(is_taking)
})
.for_each(|_| {
future::ready(())
});
block_on(fut);
Ok(())
});
thread::sleep(Duration::from_millis(50));
thread::spawn(move || -> runng::Result<()> {
let mut pub_ctx = publisher.create_async()?;
for _ in 0..10 {
let mut msg = msg::NngMsg::new()?;
msg.append_u32(0)?; msg.append_u32(1)?;
block_on(pub_ctx.send(msg))?;
thread::sleep(Duration::from_millis(200));
}
let mut msg = msg::NngMsg::new()?;
msg.append_u32(0)?; block_on(pub_ctx.send(msg))?;
Ok(())
});
thread::sleep(Duration::from_secs(3));
Ok(())
}
}