rustdds/dds/with_key/
datawriter.rs

1use std::{
2  marker::PhantomData,
3  pin::Pin,
4  sync::{
5    atomic::{AtomicI64, Ordering},
6    Arc, Mutex,
7  },
8  task::{Context, Poll, Waker},
9  time::{Duration, Instant},
10};
11
12use futures::{Future, Stream};
13use mio_06::{Events, PollOpt, Ready, Token};
14use mio_extras::channel::{self as mio_channel, SendError, TrySendError};
15#[allow(unused_imports)]
16use log::{debug, error, info, trace, warn};
17
18use crate::{
19  dds::{
20    adapters::with_key::SerializerAdapter,
21    ddsdata::DDSData,
22    helpers::*,
23    pubsub::Publisher,
24    qos::{
25      policy::{Liveliness, Reliability},
26      HasQoSPolicy, QosPolicies,
27    },
28    result::{CreateResult, WriteError, WriteResult},
29    statusevents::*,
30    topic::Topic,
31  },
32  discovery::{discovery::DiscoveryCommand, sedp_messages::SubscriptionBuiltinTopicData},
33  messages::submessages::elements::serialized_payload::SerializedPayload,
34  rtps::writer::WriterCommand,
35  serialization::CDRSerializerAdapter,
36  structure::{
37    cache_change::ChangeKind, duration, entity::RTPSEntity, guid::GUID, rpc::SampleIdentity,
38    sequence_number::SequenceNumber, time::Timestamp,
39  },
40  Keyed, TopicDescription,
41};
42
43// TODO: Move the write options and the builder type to some lower-level module
44// to avoid circular dependencies.
45#[derive(Debug, Default)]
46pub struct WriteOptionsBuilder {
47  related_sample_identity: Option<SampleIdentity>,
48  source_timestamp: Option<Timestamp>,
49  to_single_reader: Option<GUID>,
50}
51
52impl WriteOptionsBuilder {
53  pub fn new() -> Self {
54    Self::default()
55  }
56
57  pub fn build(self) -> WriteOptions {
58    WriteOptions {
59      related_sample_identity: self.related_sample_identity,
60      source_timestamp: self.source_timestamp,
61      to_single_reader: self.to_single_reader,
62    }
63  }
64
65  #[must_use]
66  pub fn related_sample_identity(mut self, related_sample_identity: SampleIdentity) -> Self {
67    self.related_sample_identity = Some(related_sample_identity);
68    self
69  }
70
71  #[must_use]
72  pub fn related_sample_identity_opt(
73    mut self,
74    related_sample_identity_opt: Option<SampleIdentity>,
75  ) -> Self {
76    self.related_sample_identity = related_sample_identity_opt;
77    self
78  }
79
80  #[must_use]
81  pub fn source_timestamp(mut self, source_timestamp: Timestamp) -> Self {
82    self.source_timestamp = Some(source_timestamp);
83    self
84  }
85
86  #[must_use]
87  pub fn to_single_reader(mut self, reader: GUID) -> Self {
88    self.to_single_reader = Some(reader);
89    self
90  }
91}
92
93/// Type to be used with write_with_options.
94/// Use WriteOptionsBuilder to construct this.
95#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Default)]
96pub struct WriteOptions {
97  related_sample_identity: Option<SampleIdentity>, // for DDS-RPC
98  source_timestamp: Option<Timestamp>,             // from DDS spec
99  to_single_reader: Option<GUID>,                  /* try to send to one Reader only
100                                                    * future extension room fo other fields. */
101}
102
103impl WriteOptions {
104  pub fn related_sample_identity(&self) -> Option<SampleIdentity> {
105    self.related_sample_identity
106  }
107
108  pub fn source_timestamp(&self) -> Option<Timestamp> {
109    self.source_timestamp
110  }
111
112  pub fn to_single_reader(&self) -> Option<GUID> {
113    self.to_single_reader
114  }
115}
116
117impl From<Option<Timestamp>> for WriteOptions {
118  fn from(source_timestamp: Option<Timestamp>) -> Self {
119    Self {
120      related_sample_identity: None,
121      source_timestamp,
122      to_single_reader: None,
123    }
124  }
125}
126
127/// Simplified type for CDR encoding
128pub type DataWriterCdr<D> = DataWriter<D, CDRSerializerAdapter<D>>;
129
130/// DDS DataWriter for keyed topics
131///
132/// # Examples
133///
134/// ```
135/// use serde::{Serialize, Deserialize};
136/// use rustdds::*;
137/// use rustdds::with_key::DataWriter;
138/// use rustdds::serialization::CDRSerializerAdapter;
139///
140/// let domain_participant = DomainParticipant::new(0).unwrap();
141/// let qos = QosPolicyBuilder::new().build();
142/// let publisher = domain_participant.create_publisher(&qos).unwrap();
143///
144/// #[derive(Serialize, Deserialize, Debug)]
145/// struct SomeType { a: i32 }
146/// impl Keyed for SomeType {
147///   type K = i32;
148///
149///   fn key(&self) -> Self::K {
150///     self.a
151///   }
152/// }
153///
154/// // WithKey is important
155/// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
156/// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None);
157/// ```
158pub struct DataWriter<D: Keyed, SA: SerializerAdapter<D> = CDRSerializerAdapter<D>> {
159  data_phantom: PhantomData<D>,
160  ser_phantom: PhantomData<SA>,
161  my_publisher: Publisher,
162  my_topic: Topic,
163  qos_policy: QosPolicies,
164  my_guid: GUID,
165  cc_upload: mio_channel::SyncSender<WriterCommand>,
166  cc_upload_waker: Arc<Mutex<Option<Waker>>>,
167  discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
168  status_receiver: StatusChannelReceiver<DataWriterStatus>,
169  available_sequence_number: AtomicI64,
170}
171
172impl<D, SA> Drop for DataWriter<D, SA>
173where
174  D: Keyed,
175  SA: SerializerAdapter<D>,
176{
177  fn drop(&mut self) {
178    // Tell Publisher to drop the corresponding RTPS Writer
179    self.my_publisher.remove_writer(self.my_guid);
180
181    // Notify Discovery that we are no longer
182    match self
183      .discovery_command
184      .send(DiscoveryCommand::RemoveLocalWriter { guid: self.guid() })
185    {
186      Ok(_) => {}
187
188      // This is fairly normal at shutdown, as the other end is down already.
189      Err(SendError::Disconnected(_cmd)) => {
190        debug!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand: Disconnected.");
191      }
192      // other errors must be taken more seriously
193      Err(e) => error!("Failed to send REMOVE_LOCAL_WRITER DiscoveryCommand. {e:?}"),
194    }
195  }
196}
197
198impl<D, SA> DataWriter<D, SA>
199where
200  D: Keyed,
201  SA: SerializerAdapter<D>,
202{
203  #[allow(clippy::too_many_arguments)]
204  pub(crate) fn new(
205    publisher: Publisher,
206    topic: Topic,
207    qos: QosPolicies,
208    guid: GUID,
209    cc_upload: mio_channel::SyncSender<WriterCommand>,
210    cc_upload_waker: Arc<Mutex<Option<Waker>>>,
211    discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
212    status_receiver: StatusChannelReceiver<DataWriterStatus>,
213  ) -> CreateResult<Self> {
214    if let Some(lv) = qos.liveliness {
215      match lv {
216        Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
217        Liveliness::ManualByParticipant { .. } => {
218          if let Err(e) = discovery_command.send(DiscoveryCommand::ManualAssertLiveliness) {
219            error!("Failed to send DiscoveryCommand - Refresh. {e:?}");
220          }
221        }
222      }
223    };
224    Ok(Self {
225      data_phantom: PhantomData,
226      ser_phantom: PhantomData,
227      my_publisher: publisher,
228      my_topic: topic,
229      qos_policy: qos,
230      my_guid: guid,
231      cc_upload,
232      cc_upload_waker,
233      discovery_command,
234      status_receiver,
235      available_sequence_number: AtomicI64::new(1), // valid numbering starts from 1
236    })
237  }
238
239  fn next_sequence_number(&self) -> SequenceNumber {
240    SequenceNumber::from(
241      self
242        .available_sequence_number
243        .fetch_add(1, Ordering::Relaxed),
244    )
245  }
246
247  fn undo_sequence_number(&self) {
248    self
249      .available_sequence_number
250      .fetch_sub(1, Ordering::Relaxed);
251  }
252
253  /// Manually refreshes liveliness
254  ///
255  /// Corresponds to DDS Spec 1.4 Section 2.2.2.4.2.22 assert_liveliness.
256  ///
257  /// # Examples
258  ///
259  /// ```
260  /// # use serde::{Serialize, Deserialize};
261  /// # use rustdds::*;
262  /// # use rustdds::with_key::DataWriter;
263  /// # use rustdds::serialization::CDRSerializerAdapter;
264  /// #
265  /// let domain_participant = DomainParticipant::new(0).unwrap();
266  /// let qos = QosPolicyBuilder::new().build();
267  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
268  ///
269  /// #[derive(Serialize, Deserialize, Debug)]
270  /// struct SomeType { a: i32 }
271  /// impl Keyed for SomeType {
272  ///   type K = i32;
273  ///
274  ///   fn key(&self) -> Self::K {
275  ///     self.a
276  ///   }
277  /// }
278  ///
279  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
280  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
281  ///
282  /// data_writer.refresh_manual_liveliness();
283  /// ```
284  pub fn refresh_manual_liveliness(&self) {
285    if let Some(lv) = self.qos().liveliness {
286      match lv {
287        Liveliness::Automatic { .. } | Liveliness::ManualByTopic { .. } => (),
288        Liveliness::ManualByParticipant { .. } => {
289          if let Err(e) = self
290            .discovery_command
291            .send(DiscoveryCommand::ManualAssertLiveliness)
292          {
293            error!("Failed to send DiscoveryCommand - Refresh. {e:?}");
294          }
295        }
296      }
297    };
298  }
299
300  /// Writes single data instance to a topic.
301  ///
302  /// # Examples
303  ///
304  /// ```
305  /// # use serde::{Serialize, Deserialize};
306  /// # use rustdds::*;
307  /// # use rustdds::with_key::DataWriter;
308  /// # use rustdds::serialization::CDRSerializerAdapter;
309  /// #
310  /// let domain_participant = DomainParticipant::new(0).unwrap();
311  /// let qos = QosPolicyBuilder::new().build();
312  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
313  ///
314  /// #[derive(Serialize, Deserialize, Debug)]
315  /// struct SomeType { a: i32 }
316  /// impl Keyed for SomeType {
317  ///   type K = i32;
318  ///
319  ///   fn key(&self) -> Self::K {
320  ///     self.a
321  ///   }
322  /// }
323  ///
324  /// // WithKey is important
325  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
326  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
327  ///
328  /// let some_data = SomeType { a: 1 };
329  /// data_writer.write(some_data, None).unwrap();
330  /// ```
331  pub fn write(&self, data: D, source_timestamp: Option<Timestamp>) -> WriteResult<(), D> {
332    self.write_with_options(data, WriteOptions::from(source_timestamp))?;
333    Ok(())
334  }
335
336  pub fn write_with_options(
337    &self,
338    data: D,
339    write_options: WriteOptions,
340  ) -> WriteResult<SampleIdentity, D> {
341    // serialize
342    let send_buffer = match SA::to_bytes(&data) {
343      Ok(b) => b,
344      Err(e) => {
345        return Err(WriteError::Serialization {
346          reason: format!("{e}"),
347          data,
348        })
349      }
350    };
351
352    let ddsdata = DDSData::new(SerializedPayload::new_from_bytes(
353      SA::output_encoding(),
354      send_buffer,
355    ));
356    let sequence_number = self.next_sequence_number();
357    let writer_command = WriterCommand::DDSData {
358      ddsdata,
359      write_options,
360      sequence_number,
361    };
362
363    let timeout = self.qos().reliable_max_blocking_time();
364
365    match try_send_timeout(&self.cc_upload, writer_command, timeout) {
366      Ok(_) => {
367        self.refresh_manual_liveliness();
368        Ok(SampleIdentity {
369          writer_guid: self.my_guid,
370          sequence_number,
371        })
372      }
373      Err(TrySendError::Full(_writer_command)) => {
374        warn!(
375          "Write timed out: topic={:?}  timeout={:?}",
376          self.my_topic.name(),
377          timeout,
378        );
379        self.undo_sequence_number();
380        Err(WriteError::WouldBlock { data })
381      }
382      Err(TrySendError::Disconnected(_)) => {
383        self.undo_sequence_number();
384        Err(WriteError::Poisoned {
385          reason: "Cannot send to Writer".to_string(),
386          data,
387        })
388      }
389      Err(TrySendError::Io(e)) => {
390        self.undo_sequence_number();
391        Err(e.into())
392      }
393    }
394  }
395
396  /// This operation blocks the calling thread until either all data written by
397  /// the reliable DataWriter entities is acknowledged by all
398  /// matched reliable DataReader entities, or else the duration specified by
399  /// the `max_wait` parameter elapses, whichever happens first.
400  ///
401  /// See DDS Spec 1.4 Section 2.2.2.4.1.12 wait_for_acknowledgments.
402  ///
403  /// If this DataWriter is not set to Reliable, or there are no matched
404  /// DataReaders with Reliable QoS, the call succeeds immediately.
405  ///
406  /// Return values
407  /// * `Ok(true)` - all acknowledged
408  /// * `Ok(false)`- timed out waiting for acknowledgments
409  /// * `Err(_)` - something went wrong
410  ///
411  /// # Examples
412  ///
413  /// ```
414  /// # use serde::{Serialize, Deserialize};
415  /// # use rustdds::*;
416  /// #
417  /// let domain_participant = DomainParticipant::new(0).unwrap();
418  /// let qos = QosPolicyBuilder::new().build();
419  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
420  ///
421  /// #[derive(Serialize, Deserialize, Debug)]
422  /// struct SomeType { a: i32 }
423  /// impl Keyed for SomeType {
424  ///   type K = i32;
425  ///
426  ///   fn key(&self) -> Self::K {
427  ///     self.a
428  ///   }
429  /// }
430  ///
431  /// // WithKey is important
432  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
433  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
434  ///
435  /// let some_data = SomeType { a: 1 };
436  /// data_writer.write(some_data, None).unwrap();
437  /// data_writer.wait_for_acknowledgments(std::time::Duration::from_millis(100));
438  /// ```
439  pub fn wait_for_acknowledgments(&self, max_wait: Duration) -> WriteResult<bool, ()> {
440    match &self.qos_policy.reliability {
441      None | Some(Reliability::BestEffort) => Ok(true),
442      Some(Reliability::Reliable { .. }) => {
443        let (acked_sender, mut acked_receiver) = sync_status_channel::<()>(1)?;
444        let poll = mio_06::Poll::new()?;
445        poll.register(
446          acked_receiver.as_status_evented(),
447          Token(0),
448          Ready::readable(),
449          PollOpt::edge(),
450        )?;
451        self
452          .cc_upload
453          .try_send(WriterCommand::WaitForAcknowledgments {
454            all_acked: acked_sender,
455          })
456          .unwrap_or_else(|e| {
457            warn!("wait_for_acknowledgments: cannot initiate waiting. This will timeout. {e}");
458          });
459
460        let mut events = Events::with_capacity(1);
461        poll.poll(&mut events, Some(max_wait))?;
462        if let Some(_event) = events.iter().next() {
463          match acked_receiver.try_recv() {
464            Ok(_) => Ok(true), // got token
465            Err(e) => {
466              warn!("wait_for_acknowledgments - Spurious poll event? - {e}");
467              Ok(false) // TODO: We could also loop here
468            }
469          }
470        } else {
471          // no token, so presumably timed out
472          Ok(false)
473        }
474      }
475    } // match
476  }
477
478  /*
479
480  /// Unimplemented. <b>Do not use</b>.
481  ///
482  /// # Examples
483  ///
484  /// ```no_run
485  // TODO: enable when functional
486  /// # use serde::{Serialize, Deserialize};
487  /// # use rustdds::*;
488  /// # use rustdds::with_key::DataWriter;
489  /// # use rustdds::serialization::CDRSerializerAdapter;
490  /// #
491  /// let domain_participant = DomainParticipant::new(0).unwrap();
492  /// let qos = QosPolicyBuilder::new().build();
493  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
494  ///
495  /// #[derive(Serialize, Deserialize, Debug)]
496  /// struct SomeType { a: i32 }
497  /// impl Keyed for SomeType {
498  ///   type K = i32;
499  ///
500  ///   fn key(&self) -> Self::K {
501  ///     self.a
502  ///   }
503  /// }
504  ///
505  /// // WithKey is important
506  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
507  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
508  ///
509  /// // Liveliness lost status has changed
510  ///
511  /// if let Ok(lls) = data_writer.get_liveliness_lost_status() {
512  ///   // do something
513  /// }
514  /// ```
515  pub fn get_liveliness_lost_status(&self) -> Result<LivelinessLostStatus> {
516    todo!()
517  }
518
519  /// Should get latest offered deadline missed status. <b>Do not use yet</b> use `get_status_lister` instead for the moment.
520  ///
521  /// # Examples
522  ///
523  /// ```
524  /// # use serde::{Serialize, Deserialize};
525  /// # use rustdds::*;
526  /// # use rustdds::with_key::DataWriter;
527  /// # use rustdds::serialization::CDRSerializerAdapter;
528  /// #
529  /// let domain_participant = DomainParticipant::new(0).unwrap();
530  /// let qos = QosPolicyBuilder::new().build();
531  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
532  ///
533  /// #[derive(Serialize, Deserialize, Debug)]
534  /// struct SomeType { a: i32 }
535  /// impl Keyed for SomeType {
536  ///   type K = i32;
537  ///
538  ///   fn key(&self) -> Self::K {
539  ///     self.a
540  ///   }
541  /// }
542  ///
543  /// // WithKey is important
544  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
545  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
546  ///
547  /// // Deadline missed status has changed
548  ///
549  /// if let Ok(odms) = data_writer.get_offered_deadline_missed_status() {
550  ///   // do something
551  /// }
552  /// ```
553  pub fn get_offered_deadline_missed_status(&self) -> Result<OfferedDeadlineMissedStatus> {
554    let mut fstatus = OfferedDeadlineMissedStatus::new();
555    while let Ok(status) = self.status_receiver.try_recv() {
556      match status {
557        StatusChange::OfferedDeadlineMissedStatus(status) => fstatus = status,
558  // TODO: possibly save old statuses
559        _ => (),
560      }
561    }
562
563    match self
564      .cc_upload
565      .try_send(WriterCommand::ResetOfferedDeadlineMissedStatus {
566        writer_guid: self.guid(),
567      }) {
568      Ok(_) => (),
569      Err(e) => error!("Unable to send ResetOfferedDeadlineMissedStatus. {e:?}"),
570    };
571
572    Ok(fstatus)
573  }
574
575  /// Unimplemented. <b>Do not use</b>.
576  ///
577  /// # Examples
578  ///
579  /// ```no_run
580  // TODO: enable when functional
581  /// # use serde::{Serialize, Deserialize};
582  /// # use rustdds::*;
583  /// # use rustdds::with_key::DataWriter;
584  /// # use rustdds::serialization::CDRSerializerAdapter;
585  /// #
586  /// let domain_participant = DomainParticipant::new(0).unwrap();
587  /// let qos = QosPolicyBuilder::new().build();
588  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
589  ///
590  /// #[derive(Serialize, Deserialize, Debug)]
591  /// struct SomeType { a: i32 }
592  /// impl Keyed for SomeType {
593  ///   type K = i32;
594  ///
595  ///   fn key(&self) -> Self::K {
596  ///     self.a
597  ///   }
598  /// }
599  ///
600  /// // WithKey is important
601  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
602  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
603  ///
604  /// // Liveliness lost status has changed
605  ///
606  /// if let Ok(oiqs) = data_writer.get_offered_incompatible_qos_status() {
607  ///   // do something
608  /// }
609  /// ```
610  pub fn get_offered_incompatible_qos_status(&self) -> Result<OfferedIncompatibleQosStatus> {
611    todo!()
612  }
613
614  /// Unimplemented. <b>Do not use</b>.
615  ///
616  /// # Examples
617  ///
618  /// ```no_run
619  // TODO: enable when functional
620  /// # use serde::{Serialize, Deserialize};
621  /// # use rustdds::*;
622  /// # use rustdds::with_key::DataWriter;
623  /// # use rustdds::serialization::CDRSerializerAdapter;
624  /// #
625  /// let domain_participant = DomainParticipant::new(0).unwrap();
626  /// let qos = QosPolicyBuilder::new().build();
627  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
628  ///
629  /// #[derive(Serialize, Deserialize, Debug)]
630  /// struct SomeType { a: i32 }
631  /// impl Keyed for SomeType {
632  ///   type K = i32;
633  ///
634  ///   fn key(&self) -> Self::K {
635  ///     self.a
636  ///   }
637  /// }
638  ///
639  /// // WithKey is important
640  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
641  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(topic, None).unwrap();
642  ///
643  /// // Liveliness lost status has changed
644  ///
645  /// if let Ok(pms) = data_writer.get_publication_matched_status() {
646  ///   // do something
647  /// }
648  /// ```
649  pub fn get_publication_matched_status(&self) -> Result<PublicationMatchedStatus> {
650    todo!()
651  }
652
653  */
654
655  /// Topic assigned to this DataWriter
656  ///
657  /// # Examples
658  ///
659  /// ```
660  /// # use serde::{Serialize, Deserialize};
661  /// # use rustdds::*;
662  /// # use rustdds::with_key::DataWriter;
663  /// # use rustdds::serialization::CDRSerializerAdapter;
664  /// #
665  /// let domain_participant = DomainParticipant::new(0).unwrap();
666  /// let qos = QosPolicyBuilder::new().build();
667  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
668  ///
669  /// #[derive(Serialize, Deserialize, Debug)]
670  /// struct SomeType { a: i32 }
671  /// impl Keyed for SomeType {
672  ///   type K = i32;
673  ///
674  ///   fn key(&self) -> Self::K {
675  ///     self.a
676  ///   }
677  /// }
678  ///
679  /// // WithKey is important
680  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
681  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
682  ///
683  /// assert_eq!(data_writer.topic(), &topic);
684  /// ```
685  pub fn topic(&self) -> &Topic {
686    &self.my_topic
687  }
688
689  /// Publisher assigned to this DataWriter
690  ///
691  /// # Examples
692  ///
693  /// ```
694  /// # use serde::{Serialize, Deserialize};
695  /// # use rustdds::*;
696  /// # use rustdds::with_key::DataWriter;
697  /// # use rustdds::serialization::CDRSerializerAdapter;
698  /// #
699  /// let domain_participant = DomainParticipant::new(0).unwrap();
700  /// let qos = QosPolicyBuilder::new().build();
701  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
702  ///
703  /// #[derive(Serialize, Deserialize, Debug)]
704  /// struct SomeType { a: i32 }
705  /// impl Keyed for SomeType {
706  ///   type K = i32;
707  ///
708  ///   fn key(&self) -> Self::K {
709  ///     self.a
710  ///   }
711  /// }
712  ///
713  /// // WithKey is important
714  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
715  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
716  ///
717  /// assert_eq!(data_writer.publisher(), &publisher);
718  pub fn publisher(&self) -> &Publisher {
719    &self.my_publisher
720  }
721
722  /// Manually asserts liveliness (use this instead of refresh) according to QoS
723  ///
724  /// # Examples
725  ///
726  /// ```
727  /// # use serde::{Serialize, Deserialize};
728  /// # use rustdds::*;
729  /// # use rustdds::with_key::DataWriter;
730  /// # use rustdds::serialization::CDRSerializerAdapter;
731  /// #
732  /// let domain_participant = DomainParticipant::new(0).unwrap();
733  /// let qos = QosPolicyBuilder::new().build();
734  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
735  ///
736  /// #[derive(Serialize, Deserialize, Debug)]
737  /// struct SomeType { a: i32 }
738  /// impl Keyed for SomeType {
739  ///   type K = i32;
740  ///
741  ///   fn key(&self) -> Self::K {
742  ///     self.a
743  ///   }
744  /// }
745  ///
746  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
747  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
748  ///
749  /// data_writer.assert_liveliness().unwrap();
750  /// ```
751  ///
752  /// An `Err` result means that livelines assertion message could not be sent,
753  /// likely because Discovery has too much work to do.
754  pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
755    self.refresh_manual_liveliness();
756
757    match self.qos().liveliness {
758      Some(Liveliness::ManualByTopic { lease_duration: _ }) => {
759        self
760          .discovery_command
761          .send(DiscoveryCommand::AssertTopicLiveliness {
762            writer_guid: self.guid(),
763            manual_assertion: true, // by definition of this function
764          })
765          .map_err(|e| {
766            error!("assert_liveness - Failed to send DiscoveryCommand. {e:?}");
767            WriteError::WouldBlock { data: () }
768          })
769      }
770      _other => Ok(()),
771    }
772  }
773
774  /// Unimplemented. <b>Do not use</b>.
775  ///
776  /// # Examples
777  ///
778  /// ```no_run
779  // TODO: enable when available
780  /// # use serde::{Serialize, Deserialize};
781  /// # use rustdds::*;
782  /// # use rustdds::with_key::DataWriter;
783  /// # use rustdds::serialization::CDRSerializerAdapter;
784  /// #
785  /// let domain_participant = DomainParticipant::new(0).unwrap();
786  /// let qos = QosPolicyBuilder::new().build();
787  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
788  ///
789  /// #[derive(Serialize, Deserialize, Debug)]
790  /// struct SomeType { a: i32 }
791  /// impl Keyed for SomeType {
792  ///   type K = i32;
793  ///
794  ///   fn key(&self) -> Self::K {
795  ///     self.a
796  ///   }
797  /// }
798  ///
799  /// // WithKey is important
800  /// let topic = domain_participant.create_topic("some_topic".to_string(),
801  /// "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
802  /// let data_writer = publisher.create_datawriter::<SomeType,
803  /// CDRSerializerAdapter<_>>(&topic, None).unwrap();
804  ///
805  /// for sub in data_writer.get_matched_subscriptions().iter() {
806  ///   // do something
807  /// }
808  pub fn get_matched_subscriptions(&self) -> Vec<SubscriptionBuiltinTopicData> {
809    todo!()
810  }
811
812  /// Disposes data instance with specified key
813  ///
814  /// # Arguments
815  ///
816  /// * `key` - Key of the instance
817  /// * `source_timestamp` - DDS source timestamp (None uses now as time as
818  ///   specified in DDS spec)
819  ///
820  /// # Examples
821  ///
822  /// ```
823  /// # use serde::{Serialize, Deserialize};
824  /// # use rustdds::*;
825  /// # use rustdds::with_key::DataWriter;
826  /// # use rustdds::serialization::CDRSerializerAdapter;
827  /// #
828  /// let domain_participant = DomainParticipant::new(0).unwrap();
829  /// let qos = QosPolicyBuilder::new().build();
830  /// let publisher = domain_participant.create_publisher(&qos).unwrap();
831  ///
832  /// #[derive(Serialize, Deserialize, Debug)]
833  /// struct SomeType { a: i32, val: usize }
834  /// impl Keyed for SomeType {
835  ///   type K = i32;
836  ///
837  ///   fn key(&self) -> Self::K {
838  ///     self.a
839  ///   }
840  /// }
841  ///
842  /// // WithKey is important
843  /// let topic = domain_participant.create_topic("some_topic".to_string(), "SomeType".to_string(), &qos, TopicKind::WithKey).unwrap();
844  /// let data_writer = publisher.create_datawriter::<SomeType, CDRSerializerAdapter<_>>(&topic, None).unwrap();
845  ///
846  /// let some_data_1_1 = SomeType { a: 1, val: 3};
847  /// let some_data_1_2 = SomeType { a: 1, val: 4};
848  /// // different key
849  /// let some_data_2_1 = SomeType { a: 2, val: 5};
850  /// let some_data_2_2 = SomeType { a: 2, val: 6};
851  ///
852  /// data_writer.write(some_data_1_1, None).unwrap();
853  /// data_writer.write(some_data_1_2, None).unwrap();
854  /// data_writer.write(some_data_2_1, None).unwrap();
855  /// data_writer.write(some_data_2_2, None).unwrap();
856  ///
857  /// // disposes both some_data_1_1 and some_data_1_2. They are no longer offered by this writer to this topic.
858  /// data_writer.dispose(&1, None).unwrap();
859  /// ```
860  pub fn dispose(
861    &self,
862    key: &<D as Keyed>::K,
863    source_timestamp: Option<Timestamp>,
864  ) -> WriteResult<(), ()> {
865    let send_buffer = SA::key_to_bytes(key).map_err(|e| WriteError::Serialization {
866      reason: format!("{e}"),
867      data: (),
868    })?; // serialize key
869
870    let ddsdata = DDSData::new_disposed_by_key(
871      ChangeKind::NotAliveDisposed,
872      SerializedPayload::new_from_bytes(SA::output_encoding(), send_buffer),
873    );
874    self
875      .cc_upload
876      .send(WriterCommand::DDSData {
877        ddsdata,
878        write_options: WriteOptions::from(source_timestamp),
879        sequence_number: self.next_sequence_number(),
880      })
881      .map_err(|e| {
882        self.undo_sequence_number();
883        WriteError::Serialization {
884          reason: format!("{e}"),
885          data: (),
886        }
887      })?;
888
889    self.refresh_manual_liveliness();
890    Ok(())
891  }
892}
893
894impl<'a, D, SA> StatusEvented<'a, DataWriterStatus, StatusReceiverStream<'a, DataWriterStatus>>
895  for DataWriter<D, SA>
896where
897  D: Keyed,
898  SA: SerializerAdapter<D>,
899{
900  fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
901    self.status_receiver.as_status_evented()
902  }
903
904  fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
905    self.status_receiver.as_status_source()
906  }
907
908  fn as_async_status_stream(&'a self) -> StatusReceiverStream<'a, DataWriterStatus> {
909    self.status_receiver.as_async_status_stream()
910  }
911
912  fn try_recv_status(&self) -> Option<DataWriterStatus> {
913    self.status_receiver.try_recv_status()
914  }
915}
916
917impl<D, SA> RTPSEntity for DataWriter<D, SA>
918where
919  D: Keyed,
920  SA: SerializerAdapter<D>,
921{
922  fn guid(&self) -> GUID {
923    self.my_guid
924  }
925}
926
927impl<D, SA> HasQoSPolicy for DataWriter<D, SA>
928where
929  D: Keyed,
930  SA: SerializerAdapter<D>,
931{
932  fn qos(&self) -> QosPolicies {
933    self.qos_policy.clone()
934  }
935}
936
937//-------------------------------------------------------------------------------
938// async writing implementation
939//
940
941// A future for an asynchronous write operation
942#[must_use = "futures do nothing unless you `.await` or poll them"]
943pub struct AsyncWrite<'a, D, SA>
944where
945  D: Keyed,
946  SA: SerializerAdapter<D>,
947{
948  writer: &'a DataWriter<D, SA>,
949  writer_command: Option<WriterCommand>,
950  sequence_number: SequenceNumber,
951  timeout: Option<duration::Duration>,
952  timeout_instant: Instant,
953  sample: Option<D>,
954}
955
956// This is required, because AsyncWrite contains "D".
957// TODO: Is it ok to promise Unpin here?
958impl<D, SA> Unpin for AsyncWrite<'_, D, SA>
959where
960  D: Keyed,
961  SA: SerializerAdapter<D>,
962{
963}
964
965impl<D, SA> Future for AsyncWrite<'_, D, SA>
966where
967  D: Keyed,
968  SA: SerializerAdapter<D>,
969{
970  type Output = WriteResult<SampleIdentity, D>;
971
972  fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
973    match self.writer_command.take() {
974      Some(wc) => {
975        match self.writer.cc_upload.try_send(wc) {
976          Ok(()) => {
977            self.writer.refresh_manual_liveliness();
978            Poll::Ready(Ok(SampleIdentity {
979              writer_guid: self.writer.my_guid,
980              sequence_number: self.sequence_number,
981            }))
982          }
983          Err(TrySendError::Full(wc)) => {
984            *self.writer.cc_upload_waker.lock().unwrap() = Some(cx.waker().clone());
985            if Instant::now() < self.timeout_instant {
986              // Put our command back
987              self.writer_command = Some(wc);
988              Poll::Pending
989            } else {
990              // TODO: unwrap
991              Poll::Ready(Err(WriteError::WouldBlock {
992                data: self.sample.take().unwrap(),
993              }))
994            }
995          }
996          Err(other_err) => {
997            warn!(
998              "Failed to write new data: topic={:?}  reason={:?}  timeout={:?}",
999              self.writer.my_topic.name(),
1000              other_err,
1001              self.timeout
1002            );
1003            // TODO: Is this (undo) the right thing to do, if there are
1004            // several futures in progress? (Can this result in confused numbering?)
1005            self.writer.undo_sequence_number();
1006            Poll::Ready(Err(WriteError::Poisoned {
1007              reason: format!("{other_err}"),
1008              data: self.sample.take().unwrap(),
1009            }))
1010          }
1011        }
1012      }
1013      None => {
1014        // the dog ate my homework
1015        // this should not happen
1016        Poll::Ready(Err(WriteError::Internal {
1017          reason: "someone stole my WriterCommand".to_owned(),
1018        }))
1019      }
1020    }
1021  }
1022}
1023
1024// A future for an asynchronous operation of waiting for acknowledgements.
1025// Handles both the sending of the WaitForAcknowledgments command and
1026// the waiting for the acknowledgements.
1027#[must_use = "futures do nothing unless you `.await` or poll them"]
1028pub enum AsyncWaitForAcknowledgments<'a, D, SA>
1029where
1030  D: Keyed,
1031  SA: SerializerAdapter<D>,
1032{
1033  Waiting {
1034    ack_wait_receiver: StatusChannelReceiver<()>,
1035  },
1036  Done,
1037  WaitingSendCommand {
1038    writer: &'a DataWriter<D, SA>,
1039    ack_wait_receiver: StatusChannelReceiver<()>,
1040    ack_wait_sender: StatusChannelSender<()>,
1041  },
1042  Fail(WriteError<()>),
1043}
1044
1045impl<D, SA> Future for AsyncWaitForAcknowledgments<'_, D, SA>
1046where
1047  D: Keyed,
1048  SA: SerializerAdapter<D>,
1049{
1050  type Output = WriteResult<bool, ()>;
1051
1052  fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1053    match *self {
1054      AsyncWaitForAcknowledgments::Done => Poll::Ready(Ok(true)),
1055      AsyncWaitForAcknowledgments::Fail(_) => {
1056        let mut dummy = AsyncWaitForAcknowledgments::Done;
1057        core::mem::swap(&mut dummy, &mut self);
1058        match dummy {
1059          AsyncWaitForAcknowledgments::Fail(e) => Poll::Ready(Err(e)),
1060          _ => unreachable!(),
1061        }
1062      }
1063      AsyncWaitForAcknowledgments::Waiting {
1064        ref ack_wait_receiver,
1065      } => {
1066        match Pin::new(&mut ack_wait_receiver.as_async_status_stream()).poll_next(cx) {
1067          Poll::Pending => Poll::Pending,
1068
1069          // this should not really happen, but let's judge that as a "no"
1070          Poll::Ready(None) => Poll::Ready(Ok(false)),
1071
1072          // Poll::Ready(Some(Err(_read_error)))
1073          //   // RecvError means the sending side has disconnected.
1074          //   // We assume this would only be because the event loop thread is dead.
1075          //   => Poll::Ready(Err(WriteError::Poisoned{ reason: "RecvError".to_string(), data:()})),
1076          Poll::Ready(Some(())) => Poll::Ready(Ok(true)),
1077          // There is no timeout support here, so we never really
1078          // return Ok(false)
1079        }
1080      }
1081      AsyncWaitForAcknowledgments::WaitingSendCommand { .. } => {
1082        let mut dummy = AsyncWaitForAcknowledgments::Done;
1083        core::mem::swap(&mut dummy, &mut self);
1084        let (writer, ack_wait_receiver, ack_wait_sender) = match dummy {
1085          AsyncWaitForAcknowledgments::WaitingSendCommand {
1086            writer,
1087            ack_wait_receiver,
1088            ack_wait_sender,
1089          } => (writer, ack_wait_receiver, ack_wait_sender),
1090          _ => unreachable!(),
1091        };
1092
1093        match writer
1094          .cc_upload
1095          .try_send(WriterCommand::WaitForAcknowledgments {
1096            all_acked: ack_wait_sender,
1097          }) {
1098          Ok(()) => {
1099            *self = AsyncWaitForAcknowledgments::Waiting { ack_wait_receiver };
1100            Poll::Pending
1101          }
1102
1103          Err(TrySendError::Full(WriterCommand::WaitForAcknowledgments {
1104            all_acked: ack_wait_sender,
1105          })) => {
1106            *self = AsyncWaitForAcknowledgments::WaitingSendCommand {
1107              writer,
1108              ack_wait_receiver,
1109              ack_wait_sender,
1110            };
1111            Poll::Pending
1112          }
1113          Err(TrySendError::Full(_other_writer_command)) =>
1114          // We are sending WaitForAcknowledgments, so the channel
1115          // should return only that, if any.
1116          {
1117            unreachable!()
1118          }
1119          Err(e) => Poll::Ready(Err(WriteError::Poisoned {
1120            reason: format!("{e}"),
1121            data: (),
1122          })),
1123        }
1124      }
1125    }
1126  }
1127}
1128
1129impl<D, SA> DataWriter<D, SA>
1130where
1131  D: Keyed,
1132  SA: SerializerAdapter<D>,
1133{
1134  pub async fn async_write(
1135    &self,
1136    data: D,
1137    source_timestamp: Option<Timestamp>,
1138  ) -> WriteResult<(), D> {
1139    match self
1140      .async_write_with_options(data, WriteOptions::from(source_timestamp))
1141      .await
1142    {
1143      Ok(_sample_identity) => Ok(()),
1144      Err(e) => Err(e),
1145    }
1146  }
1147
1148  pub async fn async_write_with_options(
1149    &self,
1150    data: D,
1151    write_options: WriteOptions,
1152  ) -> WriteResult<SampleIdentity, D> {
1153    // Construct a future for an async write operation and await for its completion
1154
1155    let send_buffer = match SA::to_bytes(&data) {
1156      Ok(s) => s,
1157      Err(e) => {
1158        return Err(WriteError::Serialization {
1159          reason: format!("{e}"),
1160          data,
1161        })
1162      }
1163    };
1164
1165    let dds_data = DDSData::new(SerializedPayload::new_from_bytes(
1166      SA::output_encoding(),
1167      send_buffer,
1168    ));
1169    let sequence_number = self.next_sequence_number();
1170    let writer_command = WriterCommand::DDSData {
1171      ddsdata: dds_data,
1172      write_options,
1173      sequence_number,
1174    };
1175
1176    let timeout = self.qos().reliable_max_blocking_time();
1177
1178    let write_future = AsyncWrite {
1179      writer: self,
1180      writer_command: Some(writer_command),
1181      sequence_number,
1182      timeout,
1183      timeout_instant: std::time::Instant::now()
1184        + timeout
1185          .map(|t| t.to_std())
1186          .unwrap_or(crate::dds::helpers::TIMEOUT_FALLBACK.to_std()),
1187      sample: Some(data),
1188    };
1189    write_future.await
1190  }
1191
1192  /// Like the synchronous version.
1193  /// But there is no timeout. Use asyncs to bring your own timeout.
1194  pub async fn async_wait_for_acknowledgments(&self) -> WriteResult<bool, ()> {
1195    match &self.qos_policy.reliability {
1196      None | Some(Reliability::BestEffort) => Ok(true),
1197      Some(Reliability::Reliable { .. }) => {
1198        // Construct a future for an async operation to first send the
1199        // WaitForAcknowledgments command and then wait for the
1200        // acknowledgements. Await for this future to complete.
1201
1202        let (ack_wait_sender, ack_wait_receiver) = sync_status_channel::<()>(1).unwrap(); // TODO: remove unwrap
1203
1204        let async_ack_wait = AsyncWaitForAcknowledgments::WaitingSendCommand {
1205          writer: self,
1206          ack_wait_receiver,
1207          ack_wait_sender,
1208        };
1209        async_ack_wait.await
1210      }
1211    }
1212  }
1213} // impl
1214
1215#[cfg(test)]
1216mod tests {
1217  use std::thread;
1218
1219  use byteorder::LittleEndian;
1220  use log::info;
1221
1222  use super::*;
1223  use crate::{
1224    dds::{key::Key, participant::DomainParticipant},
1225    structure::topic_kind::TopicKind,
1226    test::random_data::*,
1227  };
1228
1229  #[test]
1230  fn dw_write_test() {
1231    let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
1232    let qos = QosPolicies::qos_none();
1233    let _default_dw_qos = QosPolicies::qos_none();
1234    let publisher = domain_participant
1235      .create_publisher(&qos)
1236      .expect("Failed to create publisher");
1237    let topic = domain_participant
1238      .create_topic(
1239        "Aasii".to_string(),
1240        "Huh?".to_string(),
1241        &qos,
1242        TopicKind::WithKey,
1243      )
1244      .expect("Failed to create topic");
1245
1246    let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
1247      publisher
1248        .create_datawriter(&topic, None)
1249        .expect("Failed to create datawriter");
1250
1251    let mut data = RandomData {
1252      a: 4,
1253      b: "Fobar".to_string(),
1254    };
1255
1256    data_writer
1257      .write(data.clone(), None)
1258      .expect("Unable to write data");
1259
1260    data.a = 5;
1261    let timestamp = Timestamp::now();
1262    data_writer
1263      .write(data, Some(timestamp))
1264      .expect("Unable to write data with timestamp");
1265
1266    // TODO: verify that data is sent/written correctly
1267    // TODO: write also with timestamp
1268  }
1269
1270  #[test]
1271  fn dw_dispose_test() {
1272    let domain_participant = DomainParticipant::new(0).expect("Publisher creation failed!");
1273    let qos = QosPolicies::qos_none();
1274    let publisher = domain_participant
1275      .create_publisher(&qos)
1276      .expect("Failed to create publisher");
1277    let topic = domain_participant
1278      .create_topic(
1279        "Aasii".to_string(),
1280        "Huh?".to_string(),
1281        &qos,
1282        TopicKind::WithKey,
1283      )
1284      .expect("Failed to create topic");
1285
1286    let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
1287      publisher
1288        .create_datawriter(&topic, None)
1289        .expect("Failed to create datawriter");
1290
1291    let data = RandomData {
1292      a: 4,
1293      b: "Fobar".to_string(),
1294    };
1295
1296    let key = &data.key().hash_key(false);
1297    info!("key: {key:?}");
1298
1299    data_writer
1300      .write(data.clone(), None)
1301      .expect("Unable to write data");
1302
1303    thread::sleep(Duration::from_millis(100));
1304    data_writer
1305      .dispose(&data.key(), None)
1306      .expect("Unable to dispose data");
1307
1308    // TODO: verify that dispose is sent correctly
1309  }
1310
1311  #[test]
1312  fn dw_wait_for_ack_test() {
1313    let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
1314    let qos = QosPolicies::qos_none();
1315    let publisher = domain_participant
1316      .create_publisher(&qos)
1317      .expect("Failed to create publisher");
1318    let topic = domain_participant
1319      .create_topic(
1320        "Aasii".to_string(),
1321        "Huh?".to_string(),
1322        &qos,
1323        TopicKind::WithKey,
1324      )
1325      .expect("Failed to create topic");
1326
1327    let data_writer: DataWriter<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>> =
1328      publisher
1329        .create_datawriter(&topic, None)
1330        .expect("Failed to create datawriter");
1331
1332    let data = RandomData {
1333      a: 4,
1334      b: "Fobar".to_string(),
1335    };
1336
1337    data_writer.write(data, None).expect("Unable to write data");
1338
1339    let res = data_writer
1340      .wait_for_acknowledgments(Duration::from_secs(2))
1341      .unwrap();
1342    assert!(res); // we should get "true" immediately, because we have
1343                  // no Reliable QoS
1344  }
1345}