pub mod common;
#[allow(unused_imports)]
use async_std::{future, prelude::*};
use common::msgs::example_msg::msg::Num;
use safe_drive::{
context::Context,
topic::{publisher::Publisher, subscriber::Subscriber},
};
use std::{error::Error, time::Duration};
const TOPIC_NAME: &str = "test_async_pubsub";
#[test]
fn test_async_pubsub() -> Result<(), Box<dyn Error + Sync + Send + 'static>> {
let ctx = Context::new()?;
let node_pub = ctx.create_node("test_async_pub_node", None, Default::default())?;
let node_sub = ctx.create_node("test_async_sub_node", None, Default::default())?;
let p = common::create_publisher(node_pub, TOPIC_NAME, true).unwrap();
let s = common::create_subscriber(node_sub, TOPIC_NAME, true).unwrap();
async_std::task::block_on(async {
let p = async_std::task::spawn(run_publisher(p));
let s = async_std::task::spawn(run_subscriber(s));
p.await;
s.await;
});
println!("finished");
Ok(())
}
async fn run_publisher(p: Publisher<Num>) {
let dur = Duration::from_millis(100);
for n in 0..3 {
let msg = Num { num: n };
if let Err(e) = p.send(&msg) {
println!("error: {e}");
return;
}
async_std::task::sleep(dur).await;
println!("async publish: msg = {n}");
}
}
async fn run_subscriber(mut s: Subscriber<Num>) {
let dur = Duration::from_millis(500);
for n in 0.. {
match future::timeout(dur, s.recv()).await {
Ok(Ok(msg)) => {
println!("async subscribe: msg = {}", msg.num);
assert_eq!(msg.num, n);
}
Ok(Err(e)) => panic!("{}", e), Err(_) => {
println!("async subscribe: timeout");
break;
}
}
}
}