use std::{
marker::PhantomData,
pin::Pin,
sync::{
atomic::{AtomicI64, Ordering},
Arc, Mutex,
},
task::{Context, Poll, Waker},
time::{Duration, Instant},
};
use futures::{Future, Stream};
use mio_06::{Events, PollOpt, Ready, Token};
use mio_extras::channel::{self as mio_channel, SendError, TrySendError};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{
adapters::with_key::SerializerAdapter,
ddsdata::DDSData,
helpers::*,
pubsub::Publisher,
qos::{
policy::{Liveliness, Reliability},
HasQoSPolicy, QosPolicies,
},
result::{CreateResult, WriteError, WriteResult},
statusevents::*,
topic::Topic,
},
discovery::{discovery::DiscoveryCommand, sedp_messages::SubscriptionBuiltinTopicData},
messages::submessages::elements::serialized_payload::SerializedPayload,
rtps::writer::WriterCommand,
serialization::CDRSerializerAdapter,
structure::{
cache_change::ChangeKind, duration, entity::RTPSEntity, guid::GUID, rpc::SampleIdentity,
sequence_number::SequenceNumber, time::Timestamp,
},
Keyed, TopicDescription,
};
#[derive(Debug, Default)]
pub struct WriteOptionsBuilder {
related_sample_identity: Option<SampleIdentity>,
source_timestamp: Option<Timestamp>,
to_single_reader: Option<GUID>,
}
impl WriteOptionsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn build(self) -> WriteOptions {
WriteOptions {
related_sample_identity: self.related_sample_identity,
source_timestamp: self.source_timestamp,
to_single_reader: self.to_single_reader,
}
}
#[must_use]
pub fn related_sample_identity(mut self, related_sample_identity: SampleIdentity) -> Self {
self.related_sample_identity = Some(related_sample_identity);
self
}
#[must_use]
pub fn related_sample_identity_opt(
mut self,
related_sample_identity_opt: Option<SampleIdentity>,
) -> Self {
self.related_sample_identity = related_sample_identity_opt;
self
}
#[must_use]
pub fn source_timestamp(mut self, source_timestamp: Timestamp) -> Self {
self.source_timestamp = Some(source_timestamp);
self
}
#[must_use]
pub fn to_single_reader(mut self, reader: GUID) -> Self {
self.to_single_reader = Some(reader);
self
}
}
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
pub struct WriteOptions {
related_sample_identity: Option<SampleIdentity>, source_timestamp: Option<Timestamp>, to_single_reader: Option<GUID>,
}
impl WriteOptions {
pub fn related_sample_identity(&self) -> Option<SampleIdentity> {
self.related_sample_identity
}
pub fn source_timestamp(&self) -> Option<Timestamp> {
self.source_timestamp
}
pub fn to_single_reader(&self) -> Option<GUID> {
self.to_single_reader
}
}
impl From<Option<Timestamp>> for WriteOptions {
fn from(source_timestamp: Option<Timestamp>) -> Self {
Self {
related_sample_identity: None,
source_timestamp,
to_single_reader: None,
}
}
}
pub type DataWriterCdr<D> = DataWriter<D, CDRSerializerAdapter<D>>;
pub struct DataWriter<D: Keyed, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>> {
data_phantom: PhantomData<D>,
ser_phantom: PhantomData<SA>,
my_publisher: Publisher,
my_topic: Topic,
qos_policy: QosPolicies,
my_guid: GUID,
cc_upload: mio_channel::SyncSender<WriterCommand>,
cc_upload_waker: Arc<Mutex<Option<Waker>>>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusChannelReceiver<DataWriterStatus>,
available_sequence_number: AtomicI64,
}
impl<D, SA> Drop for DataWriter<D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
fn drop(&mut self) {
self.my_publisher.remove_writer(self.my_guid);
match self
.discovery_command
.send(DiscoveryCommand::RemoveLocalWriter { guid: self.guid() })
{
Ok(_) => {}
Err(SendError::Disconnected(_cmd)) => {
debug!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand: Disconnected.");
}
Err(e) => error!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand. {e:?}"),
}
}
}
impl<D, SA> DataWriter<D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
publisher: Publisher,
topic: Topic,
qos: QosPolicies,
guid: GUID,
cc_upload: mio_channel::SyncSender<WriterCommand>,
cc_upload_waker: Arc<Mutex<Option<Waker>>>,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
status_receiver: StatusChannelReceiver<DataWriterStatus>,
) -> CreateResult<Self> {
if let Some(lv) = qos.liveliness {
match lv {
Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
Liveliness::ManualByParticipant { .. } => {
if let Err(e) = discovery_command.send(DiscoveryCommand::ManualAssertLiveliness) {
error!("Failed to send DiscoveryCommand - Refresh. {e:?}");
}
}
}
};
Ok(Self {
data_phantom: PhantomData,
ser_phantom: PhantomData,
my_publisher: publisher,
my_topic: topic,
qos_policy: qos,
my_guid: guid,
cc_upload,
cc_upload_waker,
discovery_command,
status_receiver,
available_sequence_number: AtomicI64::new(1), })
}
fn next_sequence_number(&self) -> SequenceNumber {
SequenceNumber::from(
self
.available_sequence_number
.fetch_add(1, Ordering::Relaxed),
)
}
fn undo_sequence_number(&self) {
self
.available_sequence_number
.fetch_sub(1, Ordering::Relaxed);
}
pub fn refresh_manual_liveliness(&self) {
if let Some(lv) = self.qos().liveliness {
match lv {
Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
Liveliness::ManualByParticipant { .. } => {
if let Err(e) = self
.discovery_command
.send(DiscoveryCommand::ManualAssertLiveliness)
{
error!("Failed to send DiscoveryCommand - Refresh. {e:?}");
}
}
}
};
}
pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> WriteResult<(), D> {
self.write_with_options(data, WriteOptions::from(source_timestamp))?;
Ok(())
}
pub fn write_with_options(
&self,
data: D,
write_options: WriteOptions,
) -> WriteResult<SampleIdentity, D> {
let send_buffer = match SA::to_bytes(&data) {
Ok(b) => b,
Err(e) => {
return Err(WriteError::Serialization {
reason: format!("{e}"),
data,
})
}
};
let ddsdata = DDSData::new(SerializedPayload::new_from_bytes(
SA::output_encoding(),
send_buffer,
));
let sequence_number = self.next_sequence_number();
let writer_command = WriterCommand::DDSData {
ddsdata,
write_options,
sequence_number,
};
let timeout = self.qos().reliable_max_blocking_time();
match try_send_timeout(&self.cc_upload, writer_command, timeout) {
Ok(_) => {
self.refresh_manual_liveliness();
Ok(SampleIdentity {
writer_guid: self.my_guid,
sequence_number,
})
}
Err(TrySendError::Full(_writer_command)) => {
warn!(
"Write timed out: topic={:?} timeout={:?}",
self.my_topic.name(),
timeout,
);
self.undo_sequence_number();
Err(WriteError::WouldBlock { data })
}
Err(TrySendError::Disconnected(_)) => {
self.undo_sequence_number();
Err(WriteError::Poisoned {
reason: "Cannot send to Writer".to_string(),
data,
})
}
Err(TrySendError::Io(e)) => {
self.undo_sequence_number();
Err(e.into())
}
}
}
pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> WriteResult<bool, ()> {
match &self.qos_policy.reliability {
None | Some(Reliability::BestEffort) => Ok(true),
Some(Reliability::Reliable { .. }) => {
let (acked_sender, mut acked_receiver) = sync_status_channel::<()>(1)?;
let poll = mio_06::Poll::new()?;
poll.register(
acked_receiver.as_status_evented(),
Token(0),
Ready::readable(),
PollOpt::edge(),
)?;
self
.cc_upload
.try_send(WriterCommand::WaitForAcknowledgments {
all_acked: acked_sender,
})
.unwrap_or_else(|e| {
warn!("wait_for_acknowledgments: cannot initiate waiting. This will timeout. {e}");
});
let mut events = Events::with_capacity(1);
poll.poll(&mut events, Some(max_wait))?;
if let Some(_event) = events.iter().next() {
match acked_receiver.try_recv() {
Ok(_) => Ok(true), Err(e) => {
warn!("wait_for_acknowledgments - Spurious poll event? - {e}");
Ok(false) }
}
} else {
Ok(false)
}
}
} }
pub fn topic(&self) -> &Topic {
&self.my_topic
}
pub fn publisher(&self) -> &Publisher {
&self.my_publisher
}
pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
self.refresh_manual_liveliness();
match self.qos().liveliness {
Some(Liveliness::ManualByTopic { lease_duration: _ }) => {
self
.discovery_command
.send(DiscoveryCommand::AssertTopicLiveliness {
writer_guid: self.guid(),
manual_assertion: true, })
.map_err(|e| {
error!("assert_liveness - Failed to send DiscoveryCommand. {e:?}");
WriteError::WouldBlock { data: () }
})
}
_other => Ok(()),
}
}
pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
todo!()
}
pub fn dispose(
&self,
key: &<D as Keyed>::K,
source_timestamp: Option<Timestamp>,
) -> WriteResult<(), ()> {
let send_buffer = SA::key_to_bytes(key).map_err(|e| WriteError::Serialization {
reason: format!("{e}"),
data: (),
})?;
let ddsdata = DDSData::new_disposed_by_key(
ChangeKind::NotAliveDisposed,
SerializedPayload::new_from_bytes(SA::output_encoding(), send_buffer),
);
self
.cc_upload
.send(WriterCommand::DDSData {
ddsdata,
write_options: WriteOptions::from(source_timestamp),
sequence_number: self.next_sequence_number(),
})
.map_err(|e| {
self.undo_sequence_number();
WriteError::Serialization {
reason: format!("{e}"),
data: (),
}
})?;
self.refresh_manual_liveliness();
Ok(())
}
}
impl<'a, D, SA> StatusEvented<'a, DataWriterStatus, StatusReceiverStream<'a, DataWriterStatus>>
for DataWriter<D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
self.status_receiver.as_status_evented()
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self.status_receiver.as_status_source()
}
fn as_async_status_stream(&'a self) -> StatusReceiverStream<'a, DataWriterStatus> {
self.status_receiver.as_async_status_stream()
}
fn try_recv_status(&self) -> Option<DataWriterStatus> {
self.status_receiver.try_recv_status()
}
}
impl<D, SA> RTPSEntity for DataWriter<D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.my_guid
}
}
impl<D, SA> HasQoSPolicy for DataWriter<D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
fn qos(&self) -> QosPolicies {
self.qos_policy.clone()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct AsyncWrite<'a, D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
writer: &'a DataWriter<D, SA>,
writer_command: Option<WriterCommand>,
sequence_number: SequenceNumber,
timeout: Option<duration::Duration>,
timeout_instant: Instant,
sample: Option<D>,
}
impl<D, SA> Unpin for AsyncWrite<'_, D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
}
impl<D, SA> Future for AsyncWrite<'_, D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
type Output = WriteResult<SampleIdentity, D>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.writer_command.take() {
Some(wc) => {
match self.writer.cc_upload.try_send(wc) {
Ok(()) => {
self.writer.refresh_manual_liveliness();
Poll::Ready(Ok(SampleIdentity {
writer_guid: self.writer.my_guid,
sequence_number: self.sequence_number,
}))
}
Err(TrySendError::Full(wc)) => {
*self.writer.cc_upload_waker.lock().unwrap() = Some(cx.waker().clone());
if Instant::now() < self.timeout_instant {
self.writer_command = Some(wc);
Poll::Pending
} else {
Poll::Ready(Err(WriteError::WouldBlock {
data: self.sample.take().unwrap(),
}))
}
}
Err(other_err) => {
warn!(
"Failed to write new data: topic={:?} reason={:?} timeout={:?}",
self.writer.my_topic.name(),
other_err,
self.timeout
);
self.writer.undo_sequence_number();
Poll::Ready(Err(WriteError::Poisoned {
reason: format!("{other_err}"),
data: self.sample.take().unwrap(),
}))
}
}
}
None => {
Poll::Ready(Err(WriteError::Internal {
reason: "someone stole my WriterCommand".to_owned(),
}))
}
}
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum AsyncWaitForAcknowledgments<'a, D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
Waiting {
ack_wait_receiver: StatusChannelReceiver<()>,
},
Done,
WaitingSendCommand {
writer: &'a DataWriter<D, SA>,
ack_wait_receiver: StatusChannelReceiver<()>,
ack_wait_sender: StatusChannelSender<()>,
},
Fail(WriteError<()>),
}
impl<D, SA> Future for AsyncWaitForAcknowledgments<'_, D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
type Output = WriteResult<bool, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match *self {
AsyncWaitForAcknowledgments::Done => Poll::Ready(Ok(true)),
AsyncWaitForAcknowledgments::Fail(_) => {
let mut dummy = AsyncWaitForAcknowledgments::Done;
core::mem::swap(&mut dummy, &mut self);
match dummy {
AsyncWaitForAcknowledgments::Fail(e) => Poll::Ready(Err(e)),
_ => unreachable!(),
}
}
AsyncWaitForAcknowledgments::Waiting {
ref ack_wait_receiver,
} => {
match Pin::new(&mut ack_wait_receiver.as_async_status_stream()).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Ok(false)),
Poll::Ready(Some(())) => Poll::Ready(Ok(true)),
}
}
AsyncWaitForAcknowledgments::WaitingSendCommand { .. } => {
let mut dummy = AsyncWaitForAcknowledgments::Done;
core::mem::swap(&mut dummy, &mut self);
let (writer, ack_wait_receiver, ack_wait_sender) = match dummy {
AsyncWaitForAcknowledgments::WaitingSendCommand {
writer,
ack_wait_receiver,
ack_wait_sender,
} => (writer, ack_wait_receiver, ack_wait_sender),
_ => unreachable!(),
};
match writer
.cc_upload
.try_send(WriterCommand::WaitForAcknowledgments {
all_acked: ack_wait_sender,
}) {
Ok(()) => {
*self = AsyncWaitForAcknowledgments::Waiting { ack_wait_receiver };
Poll::Pending
}
Err(TrySendError::Full(WriterCommand::WaitForAcknowledgments {
all_acked: ack_wait_sender,
})) => {
*self = AsyncWaitForAcknowledgments::WaitingSendCommand {
writer,
ack_wait_receiver,
ack_wait_sender,
};
Poll::Pending
}
Err(TrySendError::Full(_other_writer_command)) =>
{
unreachable!()
}
Err(e) => Poll::Ready(Err(WriteError::Poisoned {
reason: format!("{e}"),
data: (),
})),
}
}
}
}
}
impl<D, SA> DataWriter<D, SA>
where
D: Keyed,
SA: SerializerAdapter<D>,
{
pub async fn async_write(
&self,
data: D,
source_timestamp: Option<Timestamp>,
) -> WriteResult<(), D> {
match self
.async_write_with_options(data, WriteOptions::from(source_timestamp))
.await
{
Ok(_sample_identity) => Ok(()),
Err(e) => Err(e),
}
}
pub async fn async_write_with_options(
&self,
data: D,
write_options: WriteOptions,
) -> WriteResult<SampleIdentity, D> {
let send_buffer = match SA::to_bytes(&data) {
Ok(s) => s,
Err(e) => {
return Err(WriteError::Serialization {
reason: format!("{e}"),
data,
})
}
};
let dds_data = DDSData::new(SerializedPayload::new_from_bytes(
SA::output_encoding(),
send_buffer,
));
let sequence_number = self.next_sequence_number();
let writer_command = WriterCommand::DDSData {
ddsdata: dds_data,
write_options,
sequence_number,
};
let timeout = self.qos().reliable_max_blocking_time();
let write_future = AsyncWrite {
writer: self,
writer_command: Some(writer_command),
sequence_number,
timeout,
timeout_instant: std::time::Instant::now()
+ timeout
.map(|t| t.to_std())
.unwrap_or(crate::dds::helpers::TIMEOUT_FALLBACK.to_std()),
sample: Some(data),
};
write_future.await
}
pub async fn async_wait_for_acknowledgments(&self) -> WriteResult<bool, ()> {
match &self.qos_policy.reliability {
None | Some(Reliability::BestEffort) => Ok(true),
Some(Reliability::Reliable { .. }) => {
let (ack_wait_sender, ack_wait_receiver) = sync_status_channel::<()>(1).unwrap();
let async_ack_wait = AsyncWaitForAcknowledgments::WaitingSendCommand {
writer: self,
ack_wait_receiver,
ack_wait_sender,
};
async_ack_wait.await
}
}
}
}
#[cfg(test)]
mod tests {
use std::thread;
use byteorder::LittleEndian;
use log::info;
use super::*;
use crate::{
dds::{key::Key, participant::DomainParticipant},
structure::topic_kind::TopicKind,
test::random_data::*,
};
#[test]
fn dw_write_test() {
let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let mut data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
data.a = 5;
let timestamp = Timestamp::now();
data_writer
.write(data, Some(timestamp))
.expect("Unable to write data with timestamp");
}
#[test]
fn dw_dispose_test() {
let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let key = &data.key().hash_key(false);
info!("key: {key:?}");
data_writer
.write(data.clone(), None)
.expect("Unable to write data");
thread::sleep(Duration::from_millis(100));
data_writer
.dispose(&data.key(), None)
.expect("Unable to dispose data");
}
#[test]
fn dw_wait_for_ack_test() {
let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
let qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
publisher
.create_datawriter(&topic, None)
.expect("Failed to create datawriter");
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
data_writer.write(data, None).expect("Unable to write data");
let res = data_writer
.wait_for_acknowledgments(Duration::from_secs(2))
.unwrap();
assert!(res); }
}