use crate::{
entity::DdsEntity,
error::{DdsError, DdsResult},
DataReader, DataWriter, DomainParticipant, Publisher, Qos, Subscriber, DdsType,
};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
pub trait RequestReply: DdsType + Clone + Send + 'static {
fn correlation_id(&self) -> u64;
fn set_correlation_id(&mut self, id: u64);
}
pub struct Requester<TReq, TRep>
where
TReq: RequestReply,
TRep: RequestReply,
{
_participant: DomainParticipant,
_publisher: Publisher,
_subscriber: Subscriber,
writer: DataWriter<TReq>,
reader: DataReader<TRep>,
sequence: AtomicU64,
_marker: std::marker::PhantomData<(TReq, TRep)>,
}
impl<TReq, TRep> Requester<TReq, TRep>
where
TReq: RequestReply,
TRep: RequestReply,
{
pub fn new(
participant: &DomainParticipant,
service_name: &str,
request_qos: Option<&Qos>,
reply_qos: Option<&Qos>,
) -> DdsResult<Self> {
let publisher = participant.create_publisher()?;
let subscriber = participant.create_subscriber()?;
let request_topic = if let Some(qos) = request_qos {
participant.create_topic_with_qos::<TReq>(&format!("{}Request", service_name), qos)?
} else {
participant.create_topic::<TReq>(&format!("{}Request", service_name))?
};
let reply_topic = if let Some(qos) = reply_qos {
participant.create_topic_with_qos::<TRep>(&format!("{}Reply", service_name), qos)?
} else {
participant.create_topic::<TRep>(&format!("{}Reply", service_name))?
};
let writer = if let Some(qos) = request_qos {
publisher.create_writer_with_qos(&request_topic, qos)?
} else {
publisher.create_writer(&request_topic)?
};
let reader = if let Some(qos) = reply_qos {
subscriber.create_reader_with_qos(&reply_topic, qos)?
} else {
subscriber.create_reader(&reply_topic)?
};
Ok(Requester {
_participant: DomainParticipant::new(participant.entity() as u32)?,
_publisher: publisher,
_subscriber: subscriber,
writer,
reader,
sequence: AtomicU64::new(1),
_marker: std::marker::PhantomData,
})
}
pub fn request(&self, mut data: TReq, timeout: Duration) -> DdsResult<TRep> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
data.set_correlation_id(seq);
self.writer.write(&data)?;
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
for sample in self.reader.take()? {
if sample.correlation_id() == seq {
return Ok(sample);
}
}
std::thread::sleep(Duration::from_millis(10));
}
Err(DdsError::Timeout)
}
}
pub struct Replier<TReq, TRep>
where
TReq: RequestReply,
TRep: RequestReply,
{
_participant: DomainParticipant,
_publisher: Publisher,
_subscriber: Subscriber,
writer: DataWriter<TRep>,
reader: DataReader<TReq>,
_marker: std::marker::PhantomData<(TReq, TRep)>,
}
impl<TReq, TRep> Replier<TReq, TRep>
where
TReq: RequestReply,
TRep: RequestReply,
{
pub fn new(
participant: &DomainParticipant,
service_name: &str,
request_qos: Option<&Qos>,
reply_qos: Option<&Qos>,
) -> DdsResult<Self> {
let publisher = participant.create_publisher()?;
let subscriber = participant.create_subscriber()?;
let request_topic = if let Some(qos) = request_qos {
participant.create_topic_with_qos::<TReq>(&format!("{}Request", service_name), qos)?
} else {
participant.create_topic::<TReq>(&format!("{}Request", service_name))?
};
let reply_topic = if let Some(qos) = reply_qos {
participant.create_topic_with_qos::<TRep>(&format!("{}Reply", service_name), qos)?
} else {
participant.create_topic::<TRep>(&format!("{}Reply", service_name))?
};
let reader = if let Some(qos) = request_qos {
subscriber.create_reader_with_qos(&request_topic, qos)?
} else {
subscriber.create_reader(&request_topic)?
};
let writer = if let Some(qos) = reply_qos {
publisher.create_writer_with_qos(&reply_topic, qos)?
} else {
publisher.create_writer(&reply_topic)?
};
Ok(Replier {
_participant: DomainParticipant::new(participant.entity() as u32)?,
_publisher: publisher,
_subscriber: subscriber,
writer,
reader,
_marker: std::marker::PhantomData,
})
}
pub fn receive_request(&self, timeout: Duration) -> DdsResult<Option<TReq>> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if let Some(sample) = self.reader.take()?.into_iter().next() {
return Ok(Some(sample));
}
std::thread::sleep(Duration::from_millis(10));
}
Ok(None)
}
pub fn send_reply(&self, data: TRep) -> DdsResult<()> {
self.writer.write(&data)
}
}