use std::{io, time};
use log4rs::{
append::console::ConsoleAppender,
config::{Appender, Root},
Config,
};
use log::LevelFilter;
use futures::StreamExt;
use rustdds::*;
use serde::{Deserialize, Serialize};
use smol::Timer;
const SECOND: time::Duration = time::Duration::from_millis(1000);
fn main() {
configure_logging();
let domain_participant = DomainParticipant::new(0).unwrap();
let qos = QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::ZERO,
})
.build();
let subscriber = domain_participant.create_subscriber(&qos).unwrap();
let publisher = domain_participant.create_publisher(&qos).unwrap();
let some_topic = domain_participant
.create_topic(
"counter".to_string(),
"SomeType".to_string(),
&qos,
TopicKind::NoKey,
)
.unwrap();
#[derive(Serialize, Deserialize, Debug)]
struct SomeType {
a: i32,
}
println!("Input 'false' to create a publisher or 'true' to create a subscriber");
let mut role = String::new();
io::stdin()
.read_line(&mut role)
.expect("Failed to read line");
let role: bool = match role.trim().parse() {
Ok(role) => role,
Err(_) => return,
};
if role {
println!("Creating a subscriber");
let reader = subscriber
.create_datareader_no_key::<SomeType, CDRDeserializerAdapter<SomeType>>(&some_topic, None)
.unwrap();
smol::block_on(async {
let mut datareader_stream = reader.async_sample_stream();
let mut datareader_event_stream = datareader_stream.async_event_stream();
loop {
futures::select! {
r=datareader_stream.select_next_some()=>{
match r{
Ok(d)=>{println!("{}", d.value().a)},
Err(e)=> {
println!("{e:?}");
break;
}
}
}
e=datareader_event_stream.select_next_some()=>{
println!("DataReader event: {e:?}");
}
}
}
})
} else {
println!("Creating a publisher");
let writer = publisher
.create_datawriter_no_key::<SomeType, CDRSerializerAdapter<SomeType>>(&some_topic, None)
.unwrap();
smol::block_on(async {
let mut tick_stream = futures::StreamExt::fuse(Timer::interval(SECOND / 10));
let mut datawriter_event_stream = writer.as_async_status_stream();
let mut i = 0;
loop {
futures::select! {
_= tick_stream.select_next_some()=>{
let some_data = SomeType { a: i };
i += 1;
writer.async_write(some_data,None).await.unwrap_or_else(|e| println!("DataWriter write failed: {e:?}"));
}
e= datawriter_event_stream.select_next_some()=>{
println!("DataWriter event: {e:?}");
}
}
}
})
}
}
fn configure_logging() {
log4rs::init_file(
"logging-config.yaml",
log4rs::config::Deserializers::default(),
)
.unwrap_or_else(|e| {
match e.downcast_ref::<io::Error>() {
Some(os_err) if os_err.kind() == io::ErrorKind::NotFound => {
println!("No config file found in current working directory.");
let stdout = ConsoleAppender::builder().build();
let conf = Config::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
.build(Root::builder().appender("stdout").build(LevelFilter::Error))
.unwrap();
log4rs::init_config(conf).unwrap();
}
other_error => panic!("Config problem: {other_error:?}"),
}
});
}