rustdds/dds/with_key/datawriter.rs
1use std::{
2 marker::PhantomData,
3 pin::Pin,
4 sync::{
5 atomic::{AtomicI64, Ordering},
6 Arc, Mutex,
7 },
8 task::{Context, Poll, Waker},
9 time::{Duration, Instant},
10};
11
12use futures::{Future, Stream};
13use mio_06::{Events, PollOpt, Ready, Token};
14use mio_extras::channel::{self as mio_channel, SendError, TrySendError};
15#[allow(unused_imports)]
16use log::{debug, error, info, trace, warn};
17
18use crate::{
19 dds::{
20 adapters::with_key::SerializerAdapter,
21 ddsdata::DDSData,
22 helpers::*,
23 pubsub::Publisher,
24 qos::{
25 policy::{Liveliness, Reliability},
26 HasQoSPolicy, QosPolicies,
27 },
28 result::{CreateResult, WriteError, WriteResult},
29 statusevents::*,
30 topic::Topic,
31 },
32 discovery::{discovery::DiscoveryCommand, sedp_messages::SubscriptionBuiltinTopicData},
33 messages::submessages::elements::serialized_payload::SerializedPayload,
34 rtps::writer::WriterCommand,
35 serialization::CDRSerializerAdapter,
36 structure::{
37 cache_change::ChangeKind, duration, entity::RTPSEntity, guid::GUID, rpc::SampleIdentity,
38 sequence_number::SequenceNumber, time::Timestamp,
39 },
40 Keyed, TopicDescription,
41};
42
43// TODO: Move the write options and the builder type to some lower-level module
44// to avoid circular dependencies.
45#[derive(Debug, Default)]
46pub struct WriteOptionsBuilder {
47 related_sample_identity: Option<SampleIdentity>,
48 source_timestamp: Option<Timestamp>,
49 to_single_reader: Option<GUID>,
50}
51
52impl WriteOptionsBuilder {
53 pub fn new() -> Self {
54 Self::default()
55 }
56
57 pub fn build(self) -> WriteOptions {
58 WriteOptions {
59 related_sample_identity: self.related_sample_identity,
60 source_timestamp: self.source_timestamp,
61 to_single_reader: self.to_single_reader,
62 }
63 }
64
65 #[must_use]
66 pub fn related_sample_identity(mut self, related_sample_identity: SampleIdentity) -> Self {
67 self.related_sample_identity = Some(related_sample_identity);
68 self
69 }
70
71 #[must_use]
72 pub fn related_sample_identity_opt(
73 mut self,
74 related_sample_identity_opt: Option<SampleIdentity>,
75 ) -> Self {
76 self.related_sample_identity = related_sample_identity_opt;
77 self
78 }
79
80 #[must_use]
81 pub fn source_timestamp(mut self, source_timestamp: Timestamp) -> Self {
82 self.source_timestamp = Some(source_timestamp);
83 self
84 }
85
86 #[must_use]
87 pub fn to_single_reader(mut self, reader: GUID) -> Self {
88 self.to_single_reader = Some(reader);
89 self
90 }
91}
92
93/// Type to be used with write_with_options.
94/// Use WriteOptionsBuilder to construct this.
95#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
96pub struct WriteOptions {
97 related_sample_identity: Option<SampleIdentity>, // for DDS-RPC
98 source_timestamp: Option<Timestamp>, // from DDS spec
99 to_single_reader: Option<GUID>, /* try to send to one Reader only
100 * future extension room fo other fields. */
101}
102
103impl WriteOptions {
104 pub fn related_sample_identity(&self) -> Option<SampleIdentity> {
105 self.related_sample_identity
106 }
107
108 pub fn source_timestamp(&self) -> Option<Timestamp> {
109 self.source_timestamp
110 }
111
112 pub fn to_single_reader(&self) -> Option<GUID> {
113 self.to_single_reader
114 }
115}
116
117impl From<Option<Timestamp>> for WriteOptions {
118 fn from(source_timestamp: Option<Timestamp>) -> Self {
119 Self {
120 related_sample_identity: None,
121 source_timestamp,
122 to_single_reader: None,
123 }
124 }
125}
126
127/// Simplified type for CDR encoding
128pub type DataWriterCdr<D> = DataWriter<D, CDRSerializerAdapter<D>>;
129
130/// DDS DataWriter for keyed topics
131///
132/// # Examples
133///
134/// ```
135/// use serde::{Serialize, Deserialize};
136/// use rustdds::*;
137/// use rustdds::with_key::DataWriter;
138/// use rustdds::serialization::CDRSerializerAdapter;
139///
140/// let domain_participant = DomainParticipant::new(0).unwrap();
141/// let qos = QosPolicyBuilder::new().build();
142/// let publisher = domain_participant.create_publisher(&qos).unwrap();
143///
144/// #[derive(Serialize, Deserialize, Debug)]
145/// struct SomeType { a: i32 }
146/// impl Keyed for SomeType {
147/// type K = i32;
148///
149/// fn key(&self) -> Self::K {
150/// self.a
151/// }
152/// }
153///
154/// // WithKey is important
155/// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
156/// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None);
157/// ```
158pub struct DataWriter<D: Keyed, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>> {
159 data_phantom: PhantomData<D>,
160 ser_phantom: PhantomData<SA>,
161 my_publisher: Publisher,
162 my_topic: Topic,
163 qos_policy: QosPolicies,
164 my_guid: GUID,
165 cc_upload: mio_channel::SyncSender<WriterCommand>,
166 cc_upload_waker: Arc<Mutex<Option<Waker>>>,
167 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
168 status_receiver: StatusChannelReceiver<DataWriterStatus>,
169 available_sequence_number: AtomicI64,
170}
171
172impl<D, SA> Drop for DataWriter<D, SA>
173where
174 D: Keyed,
175 SA: SerializerAdapter<D>,
176{
177 fn drop(&mut self) {
178 // Tell Publisher to drop the corresponding RTPS Writer
179 self.my_publisher.remove_writer(self.my_guid);
180
181 // Notify Discovery that we are no longer
182 match self
183 .discovery_command
184 .send(DiscoveryCommand::RemoveLocalWriter { guid: self.guid() })
185 {
186 Ok(_) => {}
187
188 // This is fairly normal at shutdown, as the other end is down already.
189 Err(SendError::Disconnected(_cmd)) => {
190 debug!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand: Disconnected.");
191 }
192 // other errors must be taken more seriously
193 Err(e) => error!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand. {e:?}"),
194 }
195 }
196}
197
198impl<D, SA> DataWriter<D, SA>
199where
200 D: Keyed,
201 SA: SerializerAdapter<D>,
202{
203 #[allow(clippy::too_many_arguments)]
204 pub(crate) fn new(
205 publisher: Publisher,
206 topic: Topic,
207 qos: QosPolicies,
208 guid: GUID,
209 cc_upload: mio_channel::SyncSender<WriterCommand>,
210 cc_upload_waker: Arc<Mutex<Option<Waker>>>,
211 discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
212 status_receiver: StatusChannelReceiver<DataWriterStatus>,
213 ) -> CreateResult<Self> {
214 if let Some(lv) = qos.liveliness {
215 match lv {
216 Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
217 Liveliness::ManualByParticipant { .. } => {
218 if let Err(e) = discovery_command.send(DiscoveryCommand::ManualAssertLiveliness) {
219 error!("Failed to send DiscoveryCommand - Refresh. {e:?}");
220 }
221 }
222 }
223 };
224 Ok(Self {
225 data_phantom: PhantomData,
226 ser_phantom: PhantomData,
227 my_publisher: publisher,
228 my_topic: topic,
229 qos_policy: qos,
230 my_guid: guid,
231 cc_upload,
232 cc_upload_waker,
233 discovery_command,
234 status_receiver,
235 available_sequence_number: AtomicI64::new(1), // valid numbering starts from 1
236 })
237 }
238
239 fn next_sequence_number(&self) -> SequenceNumber {
240 SequenceNumber::from(
241 self
242 .available_sequence_number
243 .fetch_add(1, Ordering::Relaxed),
244 )
245 }
246
247 fn undo_sequence_number(&self) {
248 self
249 .available_sequence_number
250 .fetch_sub(1, Ordering::Relaxed);
251 }
252
253 /// Manually refreshes liveliness
254 ///
255 /// Corresponds to DDS Spec 1.4 Section 2.2.2.4.2.22 assert_liveliness.
256 ///
257 /// # Examples
258 ///
259 /// ```
260 /// # use serde::{Serialize, Deserialize};
261 /// # use rustdds::*;
262 /// # use rustdds::with_key::DataWriter;
263 /// # use rustdds::serialization::CDRSerializerAdapter;
264 /// #
265 /// let domain_participant = DomainParticipant::new(0).unwrap();
266 /// let qos = QosPolicyBuilder::new().build();
267 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
268 ///
269 /// #[derive(Serialize, Deserialize, Debug)]
270 /// struct SomeType { a: i32 }
271 /// impl Keyed for SomeType {
272 /// type K = i32;
273 ///
274 /// fn key(&self) -> Self::K {
275 /// self.a
276 /// }
277 /// }
278 ///
279 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
280 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
281 ///
282 /// data_writer.refresh_manual_liveliness();
283 /// ```
284 pub fn refresh_manual_liveliness(&self) {
285 if let Some(lv) = self.qos().liveliness {
286 match lv {
287 Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
288 Liveliness::ManualByParticipant { .. } => {
289 if let Err(e) = self
290 .discovery_command
291 .send(DiscoveryCommand::ManualAssertLiveliness)
292 {
293 error!("Failed to send DiscoveryCommand - Refresh. {e:?}");
294 }
295 }
296 }
297 };
298 }
299
300 /// Writes single data instance to a topic.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// # use serde::{Serialize, Deserialize};
306 /// # use rustdds::*;
307 /// # use rustdds::with_key::DataWriter;
308 /// # use rustdds::serialization::CDRSerializerAdapter;
309 /// #
310 /// let domain_participant = DomainParticipant::new(0).unwrap();
311 /// let qos = QosPolicyBuilder::new().build();
312 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
313 ///
314 /// #[derive(Serialize, Deserialize, Debug)]
315 /// struct SomeType { a: i32 }
316 /// impl Keyed for SomeType {
317 /// type K = i32;
318 ///
319 /// fn key(&self) -> Self::K {
320 /// self.a
321 /// }
322 /// }
323 ///
324 /// // WithKey is important
325 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
326 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
327 ///
328 /// let some_data = SomeType { a: 1 };
329 /// data_writer.write(some_data, None).unwrap();
330 /// ```
331 pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> WriteResult<(), D> {
332 self.write_with_options(data, WriteOptions::from(source_timestamp))?;
333 Ok(())
334 }
335
336 pub fn write_with_options(
337 &self,
338 data: D,
339 write_options: WriteOptions,
340 ) -> WriteResult<SampleIdentity, D> {
341 // serialize
342 let send_buffer = match SA::to_bytes(&data) {
343 Ok(b) => b,
344 Err(e) => {
345 return Err(WriteError::Serialization {
346 reason: format!("{e}"),
347 data,
348 })
349 }
350 };
351
352 let ddsdata = DDSData::new(SerializedPayload::new_from_bytes(
353 SA::output_encoding(),
354 send_buffer,
355 ));
356 let sequence_number = self.next_sequence_number();
357 let writer_command = WriterCommand::DDSData {
358 ddsdata,
359 write_options,
360 sequence_number,
361 };
362
363 let timeout = self.qos().reliable_max_blocking_time();
364
365 match try_send_timeout(&self.cc_upload, writer_command, timeout) {
366 Ok(_) => {
367 self.refresh_manual_liveliness();
368 Ok(SampleIdentity {
369 writer_guid: self.my_guid,
370 sequence_number,
371 })
372 }
373 Err(TrySendError::Full(_writer_command)) => {
374 warn!(
375 "Write timed out: topic={:?} timeout={:?}",
376 self.my_topic.name(),
377 timeout,
378 );
379 self.undo_sequence_number();
380 Err(WriteError::WouldBlock { data })
381 }
382 Err(TrySendError::Disconnected(_)) => {
383 self.undo_sequence_number();
384 Err(WriteError::Poisoned {
385 reason: "Cannot send to Writer".to_string(),
386 data,
387 })
388 }
389 Err(TrySendError::Io(e)) => {
390 self.undo_sequence_number();
391 Err(e.into())
392 }
393 }
394 }
395
396 /// This operation blocks the calling thread until either all data written by
397 /// the reliable DataWriter entities is acknowledged by all
398 /// matched reliable DataReader entities, or else the duration specified by
399 /// the `max_wait` parameter elapses, whichever happens first.
400 ///
401 /// See DDS Spec 1.4 Section 2.2.2.4.1.12 wait_for_acknowledgments.
402 ///
403 /// If this DataWriter is not set to Reliable, or there are no matched
404 /// DataReaders with Reliable QoS, the call succeeds immediately.
405 ///
406 /// Return values
407 /// * `Ok(true)` - all acknowledged
408 /// * `Ok(false)`- timed out waiting for acknowledgments
409 /// * `Err(_)` - something went wrong
410 ///
411 /// # Examples
412 ///
413 /// ```
414 /// # use serde::{Serialize, Deserialize};
415 /// # use rustdds::*;
416 /// #
417 /// let domain_participant = DomainParticipant::new(0).unwrap();
418 /// let qos = QosPolicyBuilder::new().build();
419 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
420 ///
421 /// #[derive(Serialize, Deserialize, Debug)]
422 /// struct SomeType { a: i32 }
423 /// impl Keyed for SomeType {
424 /// type K = i32;
425 ///
426 /// fn key(&self) -> Self::K {
427 /// self.a
428 /// }
429 /// }
430 ///
431 /// // WithKey is important
432 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
433 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
434 ///
435 /// let some_data = SomeType { a: 1 };
436 /// data_writer.write(some_data, None).unwrap();
437 /// data_writer.wait_for_acknowledgments(std::time::Duration::from_millis(100));
438 /// ```
439 pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> WriteResult<bool, ()> {
440 match &self.qos_policy.reliability {
441 None | Some(Reliability::BestEffort) => Ok(true),
442 Some(Reliability::Reliable { .. }) => {
443 let (acked_sender, mut acked_receiver) = sync_status_channel::<()>(1)?;
444 let poll = mio_06::Poll::new()?;
445 poll.register(
446 acked_receiver.as_status_evented(),
447 Token(0),
448 Ready::readable(),
449 PollOpt::edge(),
450 )?;
451 self
452 .cc_upload
453 .try_send(WriterCommand::WaitForAcknowledgments {
454 all_acked: acked_sender,
455 })
456 .unwrap_or_else(|e| {
457 warn!("wait_for_acknowledgments: cannot initiate waiting. This will timeout. {e}");
458 });
459
460 let mut events = Events::with_capacity(1);
461 poll.poll(&mut events, Some(max_wait))?;
462 if let Some(_event) = events.iter().next() {
463 match acked_receiver.try_recv() {
464 Ok(_) => Ok(true), // got token
465 Err(e) => {
466 warn!("wait_for_acknowledgments - Spurious poll event? - {e}");
467 Ok(false) // TODO: We could also loop here
468 }
469 }
470 } else {
471 // no token, so presumably timed out
472 Ok(false)
473 }
474 }
475 } // match
476 }
477
478 /*
479
480 /// Unimplemented. <b>Do not use</b>.
481 ///
482 /// # Examples
483 ///
484 /// ```no_run
485 // TODO: enable when functional
486 /// # use serde::{Serialize, Deserialize};
487 /// # use rustdds::*;
488 /// # use rustdds::with_key::DataWriter;
489 /// # use rustdds::serialization::CDRSerializerAdapter;
490 /// #
491 /// let domain_participant = DomainParticipant::new(0).unwrap();
492 /// let qos = QosPolicyBuilder::new().build();
493 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
494 ///
495 /// #[derive(Serialize, Deserialize, Debug)]
496 /// struct SomeType { a: i32 }
497 /// impl Keyed for SomeType {
498 /// type K = i32;
499 ///
500 /// fn key(&self) -> Self::K {
501 /// self.a
502 /// }
503 /// }
504 ///
505 /// // WithKey is important
506 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
507 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
508 ///
509 /// // Liveliness lost status has changed
510 ///
511 /// if let Ok(lls) = data_writer.get_liveliness_lost_status() {
512 /// // do something
513 /// }
514 /// ```
515 pub fn get_liveliness_lost_status(&self) -> Result<LivelinessLostStatus> {
516 todo!()
517 }
518
519 /// Should get latest offered deadline missed status. <b>Do not use yet</b> use `get_status_lister` instead for the moment.
520 ///
521 /// # Examples
522 ///
523 /// ```
524 /// # use serde::{Serialize, Deserialize};
525 /// # use rustdds::*;
526 /// # use rustdds::with_key::DataWriter;
527 /// # use rustdds::serialization::CDRSerializerAdapter;
528 /// #
529 /// let domain_participant = DomainParticipant::new(0).unwrap();
530 /// let qos = QosPolicyBuilder::new().build();
531 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
532 ///
533 /// #[derive(Serialize, Deserialize, Debug)]
534 /// struct SomeType { a: i32 }
535 /// impl Keyed for SomeType {
536 /// type K = i32;
537 ///
538 /// fn key(&self) -> Self::K {
539 /// self.a
540 /// }
541 /// }
542 ///
543 /// // WithKey is important
544 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
545 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
546 ///
547 /// // Deadline missed status has changed
548 ///
549 /// if let Ok(odms) = data_writer.get_offered_deadline_missed_status() {
550 /// // do something
551 /// }
552 /// ```
553 pub fn get_offered_deadline_missed_status(&self) -> Result<OfferedDeadlineMissedStatus> {
554 let mut fstatus = OfferedDeadlineMissedStatus::new();
555 while let Ok(status) = self.status_receiver.try_recv() {
556 match status {
557 StatusChange::OfferedDeadlineMissedStatus(status) => fstatus = status,
558 // TODO: possibly save old statuses
559 _ => (),
560 }
561 }
562
563 match self
564 .cc_upload
565 .try_send(WriterCommand::ResetOfferedDeadlineMissedStatus {
566 writer_guid: self.guid(),
567 }) {
568 Ok(_) => (),
569 Err(e) => error!("Unable to send ResetOfferedDeadlineMissedStatus. {e:?}"),
570 };
571
572 Ok(fstatus)
573 }
574
575 /// Unimplemented. <b>Do not use</b>.
576 ///
577 /// # Examples
578 ///
579 /// ```no_run
580 // TODO: enable when functional
581 /// # use serde::{Serialize, Deserialize};
582 /// # use rustdds::*;
583 /// # use rustdds::with_key::DataWriter;
584 /// # use rustdds::serialization::CDRSerializerAdapter;
585 /// #
586 /// let domain_participant = DomainParticipant::new(0).unwrap();
587 /// let qos = QosPolicyBuilder::new().build();
588 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
589 ///
590 /// #[derive(Serialize, Deserialize, Debug)]
591 /// struct SomeType { a: i32 }
592 /// impl Keyed for SomeType {
593 /// type K = i32;
594 ///
595 /// fn key(&self) -> Self::K {
596 /// self.a
597 /// }
598 /// }
599 ///
600 /// // WithKey is important
601 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
602 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
603 ///
604 /// // Liveliness lost status has changed
605 ///
606 /// if let Ok(oiqs) = data_writer.get_offered_incompatible_qos_status() {
607 /// // do something
608 /// }
609 /// ```
610 pub fn get_offered_incompatible_qos_status(&self) -> Result<OfferedIncompatibleQosStatus> {
611 todo!()
612 }
613
614 /// Unimplemented. <b>Do not use</b>.
615 ///
616 /// # Examples
617 ///
618 /// ```no_run
619 // TODO: enable when functional
620 /// # use serde::{Serialize, Deserialize};
621 /// # use rustdds::*;
622 /// # use rustdds::with_key::DataWriter;
623 /// # use rustdds::serialization::CDRSerializerAdapter;
624 /// #
625 /// let domain_participant = DomainParticipant::new(0).unwrap();
626 /// let qos = QosPolicyBuilder::new().build();
627 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
628 ///
629 /// #[derive(Serialize, Deserialize, Debug)]
630 /// struct SomeType { a: i32 }
631 /// impl Keyed for SomeType {
632 /// type K = i32;
633 ///
634 /// fn key(&self) -> Self::K {
635 /// self.a
636 /// }
637 /// }
638 ///
639 /// // WithKey is important
640 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
641 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
642 ///
643 /// // Liveliness lost status has changed
644 ///
645 /// if let Ok(pms) = data_writer.get_publication_matched_status() {
646 /// // do something
647 /// }
648 /// ```
649 pub fn get_publication_matched_status(&self) -> Result<PublicationMatchedStatus> {
650 todo!()
651 }
652
653 */
654
655 /// Topic assigned to this DataWriter
656 ///
657 /// # Examples
658 ///
659 /// ```
660 /// # use serde::{Serialize, Deserialize};
661 /// # use rustdds::*;
662 /// # use rustdds::with_key::DataWriter;
663 /// # use rustdds::serialization::CDRSerializerAdapter;
664 /// #
665 /// let domain_participant = DomainParticipant::new(0).unwrap();
666 /// let qos = QosPolicyBuilder::new().build();
667 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
668 ///
669 /// #[derive(Serialize, Deserialize, Debug)]
670 /// struct SomeType { a: i32 }
671 /// impl Keyed for SomeType {
672 /// type K = i32;
673 ///
674 /// fn key(&self) -> Self::K {
675 /// self.a
676 /// }
677 /// }
678 ///
679 /// // WithKey is important
680 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
681 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
682 ///
683 /// assert_eq!(data_writer.topic(), &topic);
684 /// ```
685 pub fn topic(&self) -> &Topic {
686 &self.my_topic
687 }
688
689 /// Publisher assigned to this DataWriter
690 ///
691 /// # Examples
692 ///
693 /// ```
694 /// # use serde::{Serialize, Deserialize};
695 /// # use rustdds::*;
696 /// # use rustdds::with_key::DataWriter;
697 /// # use rustdds::serialization::CDRSerializerAdapter;
698 /// #
699 /// let domain_participant = DomainParticipant::new(0).unwrap();
700 /// let qos = QosPolicyBuilder::new().build();
701 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
702 ///
703 /// #[derive(Serialize, Deserialize, Debug)]
704 /// struct SomeType { a: i32 }
705 /// impl Keyed for SomeType {
706 /// type K = i32;
707 ///
708 /// fn key(&self) -> Self::K {
709 /// self.a
710 /// }
711 /// }
712 ///
713 /// // WithKey is important
714 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
715 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
716 ///
717 /// assert_eq!(data_writer.publisher(), &publisher);
718 pub fn publisher(&self) -> &Publisher {
719 &self.my_publisher
720 }
721
722 /// Manually asserts liveliness (use this instead of refresh) according to QoS
723 ///
724 /// # Examples
725 ///
726 /// ```
727 /// # use serde::{Serialize, Deserialize};
728 /// # use rustdds::*;
729 /// # use rustdds::with_key::DataWriter;
730 /// # use rustdds::serialization::CDRSerializerAdapter;
731 /// #
732 /// let domain_participant = DomainParticipant::new(0).unwrap();
733 /// let qos = QosPolicyBuilder::new().build();
734 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
735 ///
736 /// #[derive(Serialize, Deserialize, Debug)]
737 /// struct SomeType { a: i32 }
738 /// impl Keyed for SomeType {
739 /// type K = i32;
740 ///
741 /// fn key(&self) -> Self::K {
742 /// self.a
743 /// }
744 /// }
745 ///
746 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
747 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
748 ///
749 /// data_writer.assert_liveliness().unwrap();
750 /// ```
751 ///
752 /// An `Err` result means that livelines assertion message could not be sent,
753 /// likely because Discovery has too much work to do.
754 pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
755 self.refresh_manual_liveliness();
756
757 match self.qos().liveliness {
758 Some(Liveliness::ManualByTopic { lease_duration: _ }) => {
759 self
760 .discovery_command
761 .send(DiscoveryCommand::AssertTopicLiveliness {
762 writer_guid: self.guid(),
763 manual_assertion: true, // by definition of this function
764 })
765 .map_err(|e| {
766 error!("assert_liveness - Failed to send DiscoveryCommand. {e:?}");
767 WriteError::WouldBlock { data: () }
768 })
769 }
770 _other => Ok(()),
771 }
772 }
773
774 /// Unimplemented. <b>Do not use</b>.
775 ///
776 /// # Examples
777 ///
778 /// ```no_run
779 // TODO: enable when available
780 /// # use serde::{Serialize, Deserialize};
781 /// # use rustdds::*;
782 /// # use rustdds::with_key::DataWriter;
783 /// # use rustdds::serialization::CDRSerializerAdapter;
784 /// #
785 /// let domain_participant = DomainParticipant::new(0).unwrap();
786 /// let qos = QosPolicyBuilder::new().build();
787 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
788 ///
789 /// #[derive(Serialize, Deserialize, Debug)]
790 /// struct SomeType { a: i32 }
791 /// impl Keyed for SomeType {
792 /// type K = i32;
793 ///
794 /// fn key(&self) -> Self::K {
795 /// self.a
796 /// }
797 /// }
798 ///
799 /// // WithKey is important
800 /// let topic = domain_participant.create_topic("some_topic".to_string(),
801 /// "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
802 /// let data_writer = publisher.create_datawriter::<SomeType,
803 /// CDRSerializerAdapter<_>>(&topic, None).unwrap();
804 ///
805 /// for sub in data_writer.get_matched_subscriptions().iter() {
806 /// // do something
807 /// }
808 pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
809 todo!()
810 }
811
812 /// Disposes data instance with specified key
813 ///
814 /// # Arguments
815 ///
816 /// * `key` - Key of the instance
817 /// * `source_timestamp` - DDS source timestamp (None uses now as time as
818 /// specified in DDS spec)
819 ///
820 /// # Examples
821 ///
822 /// ```
823 /// # use serde::{Serialize, Deserialize};
824 /// # use rustdds::*;
825 /// # use rustdds::with_key::DataWriter;
826 /// # use rustdds::serialization::CDRSerializerAdapter;
827 /// #
828 /// let domain_participant = DomainParticipant::new(0).unwrap();
829 /// let qos = QosPolicyBuilder::new().build();
830 /// let publisher = domain_participant.create_publisher(&qos).unwrap();
831 ///
832 /// #[derive(Serialize, Deserialize, Debug)]
833 /// struct SomeType { a: i32, val: usize }
834 /// impl Keyed for SomeType {
835 /// type K = i32;
836 ///
837 /// fn key(&self) -> Self::K {
838 /// self.a
839 /// }
840 /// }
841 ///
842 /// // WithKey is important
843 /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
844 /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
845 ///
846 /// let some_data_1_1 = SomeType { a: 1, val: 3};
847 /// let some_data_1_2 = SomeType { a: 1, val: 4};
848 /// // different key
849 /// let some_data_2_1 = SomeType { a: 2, val: 5};
850 /// let some_data_2_2 = SomeType { a: 2, val: 6};
851 ///
852 /// data_writer.write(some_data_1_1, None).unwrap();
853 /// data_writer.write(some_data_1_2, None).unwrap();
854 /// data_writer.write(some_data_2_1, None).unwrap();
855 /// data_writer.write(some_data_2_2, None).unwrap();
856 ///
857 /// // disposes both some_data_1_1 and some_data_1_2. They are no longer offered by this writer to this topic.
858 /// data_writer.dispose(&1, None).unwrap();
859 /// ```
860 pub fn dispose(
861 &self,
862 key: &<D as Keyed>::K,
863 source_timestamp: Option<Timestamp>,
864 ) -> WriteResult<(), ()> {
865 let send_buffer = SA::key_to_bytes(key).map_err(|e| WriteError::Serialization {
866 reason: format!("{e}"),
867 data: (),
868 })?; // serialize key
869
870 let ddsdata = DDSData::new_disposed_by_key(
871 ChangeKind::NotAliveDisposed,
872 SerializedPayload::new_from_bytes(SA::output_encoding(), send_buffer),
873 );
874 self
875 .cc_upload
876 .send(WriterCommand::DDSData {
877 ddsdata,
878 write_options: WriteOptions::from(source_timestamp),
879 sequence_number: self.next_sequence_number(),
880 })
881 .map_err(|e| {
882 self.undo_sequence_number();
883 WriteError::Serialization {
884 reason: format!("{e}"),
885 data: (),
886 }
887 })?;
888
889 self.refresh_manual_liveliness();
890 Ok(())
891 }
892}
893
894impl<'a, D, SA> StatusEvented<'a, DataWriterStatus, StatusReceiverStream<'a, DataWriterStatus>>
895 for DataWriter<D, SA>
896where
897 D: Keyed,
898 SA: SerializerAdapter<D>,
899{
900 fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
901 self.status_receiver.as_status_evented()
902 }
903
904 fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
905 self.status_receiver.as_status_source()
906 }
907
908 fn as_async_status_stream(&'a self) -> StatusReceiverStream<'a, DataWriterStatus> {
909 self.status_receiver.as_async_status_stream()
910 }
911
912 fn try_recv_status(&self) -> Option<DataWriterStatus> {
913 self.status_receiver.try_recv_status()
914 }
915}
916
917impl<D, SA> RTPSEntity for DataWriter<D, SA>
918where
919 D: Keyed,
920 SA: SerializerAdapter<D>,
921{
922 fn guid(&self) -> GUID {
923 self.my_guid
924 }
925}
926
927impl<D, SA> HasQoSPolicy for DataWriter<D, SA>
928where
929 D: Keyed,
930 SA: SerializerAdapter<D>,
931{
932 fn qos(&self) -> QosPolicies {
933 self.qos_policy.clone()
934 }
935}
936
937//-------------------------------------------------------------------------------
938// async writing implementation
939//
940
941// A future for an asynchronous write operation
942#[must_use = "futures do nothing unless you `.await` or poll them"]
943pub struct AsyncWrite<'a, D, SA>
944where
945 D: Keyed,
946 SA: SerializerAdapter<D>,
947{
948 writer: &'a DataWriter<D, SA>,
949 writer_command: Option<WriterCommand>,
950 sequence_number: SequenceNumber,
951 timeout: Option<duration::Duration>,
952 timeout_instant: Instant,
953 sample: Option<D>,
954}
955
956// This is required, because AsyncWrite contains "D".
957// TODO: Is it ok to promise Unpin here?
958impl<D, SA> Unpin for AsyncWrite<'_, D, SA>
959where
960 D: Keyed,
961 SA: SerializerAdapter<D>,
962{
963}
964
965impl<D, SA> Future for AsyncWrite<'_, D, SA>
966where
967 D: Keyed,
968 SA: SerializerAdapter<D>,
969{
970 type Output = WriteResult<SampleIdentity, D>;
971
972 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
973 match self.writer_command.take() {
974 Some(wc) => {
975 match self.writer.cc_upload.try_send(wc) {
976 Ok(()) => {
977 self.writer.refresh_manual_liveliness();
978 Poll::Ready(Ok(SampleIdentity {
979 writer_guid: self.writer.my_guid,
980 sequence_number: self.sequence_number,
981 }))
982 }
983 Err(TrySendError::Full(wc)) => {
984 *self.writer.cc_upload_waker.lock().unwrap() = Some(cx.waker().clone());
985 if Instant::now() < self.timeout_instant {
986 // Put our command back
987 self.writer_command = Some(wc);
988 Poll::Pending
989 } else {
990 // TODO: unwrap
991 Poll::Ready(Err(WriteError::WouldBlock {
992 data: self.sample.take().unwrap(),
993 }))
994 }
995 }
996 Err(other_err) => {
997 warn!(
998 "Failed to write new data: topic={:?} reason={:?} timeout={:?}",
999 self.writer.my_topic.name(),
1000 other_err,
1001 self.timeout
1002 );
1003 // TODO: Is this (undo) the right thing to do, if there are
1004 // several futures in progress? (Can this result in confused numbering?)
1005 self.writer.undo_sequence_number();
1006 Poll::Ready(Err(WriteError::Poisoned {
1007 reason: format!("{other_err}"),
1008 data: self.sample.take().unwrap(),
1009 }))
1010 }
1011 }
1012 }
1013 None => {
1014 // the dog ate my homework
1015 // this should not happen
1016 Poll::Ready(Err(WriteError::Internal {
1017 reason: "someone stole my WriterCommand".to_owned(),
1018 }))
1019 }
1020 }
1021 }
1022}
1023
1024// A future for an asynchronous operation of waiting for acknowledgements.
1025// Handles both the sending of the WaitForAcknowledgments command and
1026// the waiting for the acknowledgements.
1027#[must_use = "futures do nothing unless you `.await` or poll them"]
1028pub enum AsyncWaitForAcknowledgments<'a, D, SA>
1029where
1030 D: Keyed,
1031 SA: SerializerAdapter<D>,
1032{
1033 Waiting {
1034 ack_wait_receiver: StatusChannelReceiver<()>,
1035 },
1036 Done,
1037 WaitingSendCommand {
1038 writer: &'a DataWriter<D, SA>,
1039 ack_wait_receiver: StatusChannelReceiver<()>,
1040 ack_wait_sender: StatusChannelSender<()>,
1041 },
1042 Fail(WriteError<()>),
1043}
1044
1045impl<D, SA> Future for AsyncWaitForAcknowledgments<'_, D, SA>
1046where
1047 D: Keyed,
1048 SA: SerializerAdapter<D>,
1049{
1050 type Output = WriteResult<bool, ()>;
1051
1052 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1053 match *self {
1054 AsyncWaitForAcknowledgments::Done => Poll::Ready(Ok(true)),
1055 AsyncWaitForAcknowledgments::Fail(_) => {
1056 let mut dummy = AsyncWaitForAcknowledgments::Done;
1057 core::mem::swap(&mut dummy, &mut self);
1058 match dummy {
1059 AsyncWaitForAcknowledgments::Fail(e) => Poll::Ready(Err(e)),
1060 _ => unreachable!(),
1061 }
1062 }
1063 AsyncWaitForAcknowledgments::Waiting {
1064 ref ack_wait_receiver,
1065 } => {
1066 match Pin::new(&mut ack_wait_receiver.as_async_status_stream()).poll_next(cx) {
1067 Poll::Pending => Poll::Pending,
1068
1069 // this should not really happen, but let's judge that as a "no"
1070 Poll::Ready(None) => Poll::Ready(Ok(false)),
1071
1072 // Poll::Ready(Some(Err(_read_error)))
1073 // // RecvError means the sending side has disconnected.
1074 // // We assume this would only be because the event loop thread is dead.
1075 // => Poll::Ready(Err(WriteError::Poisoned{ reason: "RecvError".to_string(), data:()})),
1076 Poll::Ready(Some(())) => Poll::Ready(Ok(true)),
1077 // There is no timeout support here, so we never really
1078 // return Ok(false)
1079 }
1080 }
1081 AsyncWaitForAcknowledgments::WaitingSendCommand { .. } => {
1082 let mut dummy = AsyncWaitForAcknowledgments::Done;
1083 core::mem::swap(&mut dummy, &mut self);
1084 let (writer, ack_wait_receiver, ack_wait_sender) = match dummy {
1085 AsyncWaitForAcknowledgments::WaitingSendCommand {
1086 writer,
1087 ack_wait_receiver,
1088 ack_wait_sender,
1089 } => (writer, ack_wait_receiver, ack_wait_sender),
1090 _ => unreachable!(),
1091 };
1092
1093 match writer
1094 .cc_upload
1095 .try_send(WriterCommand::WaitForAcknowledgments {
1096 all_acked: ack_wait_sender,
1097 }) {
1098 Ok(()) => {
1099 *self = AsyncWaitForAcknowledgments::Waiting { ack_wait_receiver };
1100 Poll::Pending
1101 }
1102
1103 Err(TrySendError::Full(WriterCommand::WaitForAcknowledgments {
1104 all_acked: ack_wait_sender,
1105 })) => {
1106 *self = AsyncWaitForAcknowledgments::WaitingSendCommand {
1107 writer,
1108 ack_wait_receiver,
1109 ack_wait_sender,
1110 };
1111 Poll::Pending
1112 }
1113 Err(TrySendError::Full(_other_writer_command)) =>
1114 // We are sending WaitForAcknowledgments, so the channel
1115 // should return only that, if any.
1116 {
1117 unreachable!()
1118 }
1119 Err(e) => Poll::Ready(Err(WriteError::Poisoned {
1120 reason: format!("{e}"),
1121 data: (),
1122 })),
1123 }
1124 }
1125 }
1126 }
1127}
1128
1129impl<D, SA> DataWriter<D, SA>
1130where
1131 D: Keyed,
1132 SA: SerializerAdapter<D>,
1133{
1134 pub async fn async_write(
1135 &self,
1136 data: D,
1137 source_timestamp: Option<Timestamp>,
1138 ) -> WriteResult<(), D> {
1139 match self
1140 .async_write_with_options(data, WriteOptions::from(source_timestamp))
1141 .await
1142 {
1143 Ok(_sample_identity) => Ok(()),
1144 Err(e) => Err(e),
1145 }
1146 }
1147
1148 pub async fn async_write_with_options(
1149 &self,
1150 data: D,
1151 write_options: WriteOptions,
1152 ) -> WriteResult<SampleIdentity, D> {
1153 // Construct a future for an async write operation and await for its completion
1154
1155 let send_buffer = match SA::to_bytes(&data) {
1156 Ok(s) => s,
1157 Err(e) => {
1158 return Err(WriteError::Serialization {
1159 reason: format!("{e}"),
1160 data,
1161 })
1162 }
1163 };
1164
1165 let dds_data = DDSData::new(SerializedPayload::new_from_bytes(
1166 SA::output_encoding(),
1167 send_buffer,
1168 ));
1169 let sequence_number = self.next_sequence_number();
1170 let writer_command = WriterCommand::DDSData {
1171 ddsdata: dds_data,
1172 write_options,
1173 sequence_number,
1174 };
1175
1176 let timeout = self.qos().reliable_max_blocking_time();
1177
1178 let write_future = AsyncWrite {
1179 writer: self,
1180 writer_command: Some(writer_command),
1181 sequence_number,
1182 timeout,
1183 timeout_instant: std::time::Instant::now()
1184 + timeout
1185 .map(|t| t.to_std())
1186 .unwrap_or(crate::dds::helpers::TIMEOUT_FALLBACK.to_std()),
1187 sample: Some(data),
1188 };
1189 write_future.await
1190 }
1191
1192 /// Like the synchronous version.
1193 /// But there is no timeout. Use asyncs to bring your own timeout.
1194 pub async fn async_wait_for_acknowledgments(&self) -> WriteResult<bool, ()> {
1195 match &self.qos_policy.reliability {
1196 None | Some(Reliability::BestEffort) => Ok(true),
1197 Some(Reliability::Reliable { .. }) => {
1198 // Construct a future for an async operation to first send the
1199 // WaitForAcknowledgments command and then wait for the
1200 // acknowledgements. Await for this future to complete.
1201
1202 let (ack_wait_sender, ack_wait_receiver) = sync_status_channel::<()>(1).unwrap(); // TODO: remove unwrap
1203
1204 let async_ack_wait = AsyncWaitForAcknowledgments::WaitingSendCommand {
1205 writer: self,
1206 ack_wait_receiver,
1207 ack_wait_sender,
1208 };
1209 async_ack_wait.await
1210 }
1211 }
1212 }
1213} // impl
1214
1215#[cfg(test)]
1216mod tests {
1217 use std::thread;
1218
1219 use byteorder::LittleEndian;
1220 use log::info;
1221
1222 use super::*;
1223 use crate::{
1224 dds::{key::Key, participant::DomainParticipant},
1225 structure::topic_kind::TopicKind,
1226 test::random_data::*,
1227 };
1228
1229 #[test]
1230 fn dw_write_test() {
1231 let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
1232 let qos = QosPolicies::qos_none();
1233 let _default_dw_qos = QosPolicies::qos_none();
1234 let publisher = domain_participant
1235 .create_publisher(&qos)
1236 .expect("Failed to create publisher");
1237 let topic = domain_participant
1238 .create_topic(
1239 "Aasii".to_string(),
1240 "Huh?".to_string(),
1241 &qos,
1242 TopicKind::WithKey,
1243 )
1244 .expect("Failed to create topic");
1245
1246 let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
1247 publisher
1248 .create_datawriter(&topic, None)
1249 .expect("Failed to create datawriter");
1250
1251 let mut data = RandomData {
1252 a: 4,
1253 b: "Fobar".to_string(),
1254 };
1255
1256 data_writer
1257 .write(data.clone(), None)
1258 .expect("Unable to write data");
1259
1260 data.a = 5;
1261 let timestamp = Timestamp::now();
1262 data_writer
1263 .write(data, Some(timestamp))
1264 .expect("Unable to write data with timestamp");
1265
1266 // TODO: verify that data is sent/written correctly
1267 // TODO: write also with timestamp
1268 }
1269
1270 #[test]
1271 fn dw_dispose_test() {
1272 let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
1273 let qos = QosPolicies::qos_none();
1274 let publisher = domain_participant
1275 .create_publisher(&qos)
1276 .expect("Failed to create publisher");
1277 let topic = domain_participant
1278 .create_topic(
1279 "Aasii".to_string(),
1280 "Huh?".to_string(),
1281 &qos,
1282 TopicKind::WithKey,
1283 )
1284 .expect("Failed to create topic");
1285
1286 let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
1287 publisher
1288 .create_datawriter(&topic, None)
1289 .expect("Failed to create datawriter");
1290
1291 let data = RandomData {
1292 a: 4,
1293 b: "Fobar".to_string(),
1294 };
1295
1296 let key = &data.key().hash_key(false);
1297 info!("key: {key:?}");
1298
1299 data_writer
1300 .write(data.clone(), None)
1301 .expect("Unable to write data");
1302
1303 thread::sleep(Duration::from_millis(100));
1304 data_writer
1305 .dispose(&data.key(), None)
1306 .expect("Unable to dispose data");
1307
1308 // TODO: verify that dispose is sent correctly
1309 }
1310
1311 #[test]
1312 fn dw_wait_for_ack_test() {
1313 let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
1314 let qos = QosPolicies::qos_none();
1315 let publisher = domain_participant
1316 .create_publisher(&qos)
1317 .expect("Failed to create publisher");
1318 let topic = domain_participant
1319 .create_topic(
1320 "Aasii".to_string(),
1321 "Huh?".to_string(),
1322 &qos,
1323 TopicKind::WithKey,
1324 )
1325 .expect("Failed to create topic");
1326
1327 let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
1328 publisher
1329 .create_datawriter(&topic, None)
1330 .expect("Failed to create datawriter");
1331
1332 let data = RandomData {
1333 a: 4,
1334 b: "Fobar".to_string(),
1335 };
1336
1337 data_writer.write(data, None).expect("Unable to write data");
1338
1339 let res = data_writer
1340 .wait_for_acknowledgments(Duration::from_secs(2))
1341 .unwrap();
1342 assert!(res); // we should get "true" immediately, because we have
1343 // no Reliable QoS
1344 }
1345}