dust_dds/dds_async/
data_writer.rs

1use tracing::warn;
2
3use super::{condition::StatusConditionAsync, publisher::PublisherAsync};
4use crate::{
5    builtin_topics::SubscriptionBuiltinTopicData,
6    dcps::{
7        actor::ActorAddress,
8        domain_participant_actor::poll_timeout,
9        domain_participant_actor_mail::{
10            DomainParticipantMail, MessageServiceMail, WriterServiceMail,
11        },
12        listeners::data_writer_listener::DataWriterListenerActor,
13        status_condition_actor::StatusConditionActor,
14    },
15    dds_async::topic_description::TopicDescriptionAsync,
16    infrastructure::{
17        error::{DdsError, DdsResult},
18        instance::InstanceHandle,
19        qos::{DataWriterQos, QosKind},
20        status::{
21            LivelinessLostStatus, OfferedDeadlineMissedStatus, OfferedIncompatibleQosStatus,
22            PublicationMatchedStatus, StatusKind,
23        },
24        time::{Duration, Time},
25        type_support::DdsSerialize,
26    },
27    publication::data_writer_listener::DataWriterListener,
28    runtime::{ChannelSend, DdsRuntime, OneshotReceive},
29};
30use alloc::{boxed::Box, string::String, vec::Vec};
31use core::marker::PhantomData;
32
33/// Async version of [`DataWriter`](crate::publication::data_writer::DataWriter).
34pub struct DataWriterAsync<R: DdsRuntime, Foo> {
35    handle: InstanceHandle,
36    status_condition_address: ActorAddress<R, StatusConditionActor<R>>,
37    publisher: PublisherAsync<R>,
38    topic: TopicDescriptionAsync<R>,
39    phantom: PhantomData<Foo>,
40}
41
42impl<R: DdsRuntime, Foo> Clone for DataWriterAsync<R, Foo> {
43    fn clone(&self) -> Self {
44        Self {
45            handle: self.handle,
46            status_condition_address: self.status_condition_address.clone(),
47            publisher: self.publisher.clone(),
48            topic: self.topic.clone(),
49            phantom: self.phantom,
50        }
51    }
52}
53
54impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
55    pub(crate) fn new(
56        handle: InstanceHandle,
57        status_condition_address: ActorAddress<R, StatusConditionActor<R>>,
58        publisher: PublisherAsync<R>,
59        topic: TopicDescriptionAsync<R>,
60    ) -> Self {
61        Self {
62            handle,
63            status_condition_address,
64            publisher,
65            topic,
66            phantom: PhantomData,
67        }
68    }
69
70    pub(crate) fn participant_address(&self) -> &R::ChannelSender<DomainParticipantMail<R>> {
71        self.publisher.participant_address()
72    }
73
74    pub(crate) fn change_foo_type<T>(self) -> DataWriterAsync<R, T> {
75        DataWriterAsync {
76            handle: self.handle,
77            status_condition_address: self.status_condition_address,
78            publisher: self.publisher,
79            topic: self.topic,
80            phantom: PhantomData,
81        }
82    }
83}
84
85impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo>
86where
87    Foo: DdsSerialize,
88{
89    /// Async version of [`register_instance`](crate::publication::data_writer::DataWriter::register_instance).
90    #[tracing::instrument(skip(self, instance))]
91    pub async fn register_instance(&self, instance: &Foo) -> DdsResult<Option<InstanceHandle>> {
92        let timestamp = self
93            .get_publisher()
94            .get_participant()
95            .get_current_time()
96            .await?;
97        self.register_instance_w_timestamp(instance, timestamp)
98            .await
99    }
100
101    /// Async version of [`register_instance_w_timestamp`](crate::publication::data_writer::DataWriter::register_instance_w_timestamp).
102    #[tracing::instrument(skip(self, _instance))]
103    pub async fn register_instance_w_timestamp(
104        &self,
105        _instance: &Foo,
106        timestamp: Time,
107    ) -> DdsResult<Option<InstanceHandle>> {
108        todo!()
109    }
110
111    /// Async version of [`unregister_instance`](crate::publication::data_writer::DataWriter::unregister_instance).
112    #[tracing::instrument(skip(self, instance))]
113    pub async fn unregister_instance(
114        &self,
115        instance: &Foo,
116        handle: Option<InstanceHandle>,
117    ) -> DdsResult<()> {
118        let timestamp = self
119            .get_publisher()
120            .get_participant()
121            .get_current_time()
122            .await?;
123        self.unregister_instance_w_timestamp(instance, handle, timestamp)
124            .await
125    }
126
127    /// Async version of [`unregister_instance_w_timestamp`](crate::publication::data_writer::DataWriter::unregister_instance_w_timestamp).
128    #[tracing::instrument(skip(self, instance))]
129    pub async fn unregister_instance_w_timestamp(
130        &self,
131        instance: &Foo,
132        handle: Option<InstanceHandle>,
133        timestamp: Time,
134    ) -> DdsResult<()> {
135        let (reply_sender, reply_receiver) = R::oneshot();
136        let serialized_data = instance.serialize_data()?;
137        self.participant_address()
138            .send(DomainParticipantMail::Writer(
139                WriterServiceMail::UnregisterInstance {
140                    publisher_handle: self.publisher.get_instance_handle().await,
141                    data_writer_handle: self.handle,
142                    serialized_data,
143                    timestamp,
144                    reply_sender,
145                },
146            ))
147            .await?;
148        reply_receiver.receive().await?
149    }
150
151    /// Async version of [`get_key_value`](crate::publication::data_writer::DataWriter::get_key_value).
152    #[tracing::instrument(skip(self, _key_holder))]
153    pub async fn get_key_value(
154        &self,
155        _key_holder: &mut Foo,
156        _handle: InstanceHandle,
157    ) -> DdsResult<()> {
158        todo!()
159    }
160
161    /// Async version of [`lookup_instance`](crate::publication::data_writer::DataWriter::lookup_instance).
162    #[tracing::instrument(skip(self, instance))]
163    pub async fn lookup_instance(&self, instance: &Foo) -> DdsResult<Option<InstanceHandle>> {
164        let (reply_sender, reply_receiver) = R::oneshot();
165        let serialized_data = instance.serialize_data()?;
166        self.participant_address()
167            .send(DomainParticipantMail::Writer(
168                WriterServiceMail::LookupInstance {
169                    publisher_handle: self.publisher.get_instance_handle().await,
170                    data_writer_handle: self.handle,
171                    serialized_data,
172                    reply_sender,
173                },
174            ))
175            .await?;
176        reply_receiver.receive().await?
177    }
178
179    /// Async version of [`write`](crate::publication::data_writer::DataWriter::write).
180    #[tracing::instrument(skip(self, data))]
181    pub async fn write(&self, data: &Foo, handle: Option<InstanceHandle>) -> DdsResult<()> {
182        let timestamp = self
183            .get_publisher()
184            .get_participant()
185            .get_current_time()
186            .await?;
187        self.write_w_timestamp(data, handle, timestamp).await
188    }
189
190    /// Async version of [`write_w_timestamp`](crate::publication::data_writer::DataWriter::write_w_timestamp).
191    #[tracing::instrument(skip(self, data))]
192    pub async fn write_w_timestamp(
193        &self,
194        data: &Foo,
195        handle: Option<InstanceHandle>,
196        timestamp: Time,
197    ) -> DdsResult<()> {
198        let (reply_sender, reply_receiver) = R::oneshot();
199        let serialized_data = data.serialize_data()?;
200        self.participant_address()
201            .send(DomainParticipantMail::Writer(
202                WriterServiceMail::WriteWTimestamp {
203                    participant_address: self.participant_address().clone(),
204                    publisher_handle: self.publisher.get_instance_handle().await,
205                    data_writer_handle: self.handle,
206                    serialized_data,
207                    timestamp,
208                    reply_sender,
209                },
210            ))
211            .await?;
212        reply_receiver.receive().await?
213    }
214
215    /// Async version of [`dispose`](crate::publication::data_writer::DataWriter::dispose).
216    #[tracing::instrument(skip(self, data))]
217    pub async fn dispose(&self, data: &Foo, handle: Option<InstanceHandle>) -> DdsResult<()> {
218        let timestamp = self
219            .get_publisher()
220            .get_participant()
221            .get_current_time()
222            .await?;
223        self.dispose_w_timestamp(data, handle, timestamp).await
224    }
225
226    /// Async version of [`dispose_w_timestamp`](crate::publication::data_writer::DataWriter::dispose_w_timestamp).
227    #[tracing::instrument(skip(self, data))]
228    pub async fn dispose_w_timestamp(
229        &self,
230        data: &Foo,
231        handle: Option<InstanceHandle>,
232        timestamp: Time,
233    ) -> DdsResult<()> {
234        let (reply_sender, reply_receiver) = R::oneshot();
235        let serialized_data = data.serialize_data()?;
236        self.participant_address()
237            .send(DomainParticipantMail::Writer(
238                WriterServiceMail::DisposeWTimestamp {
239                    publisher_handle: self.publisher.get_instance_handle().await,
240                    data_writer_handle: self.handle,
241                    serialized_data,
242                    timestamp,
243                    reply_sender,
244                },
245            ))
246            .await?;
247        reply_receiver.receive().await?
248    }
249}
250
251impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
252    /// Async version of [`wait_for_acknowledgments`](crate::publication::data_writer::DataWriter::wait_for_acknowledgments).
253    #[tracing::instrument(skip(self))]
254    pub async fn wait_for_acknowledgments(&self, max_wait: Duration) -> DdsResult<()> {
255        let publisher_handle = self.get_publisher().get_instance_handle().await;
256        let timer_handle = self
257            .get_publisher()
258            .get_participant()
259            .timer_handle()
260            .clone();
261        let participant_address = self.participant_address().clone();
262        let data_writer_handle = self.handle;
263
264        poll_timeout(
265            timer_handle,
266            max_wait.into(),
267            Box::pin(async move {
268                loop {
269                    let (reply_sender, reply_receiver) = R::oneshot();
270                    participant_address
271                        .send(DomainParticipantMail::Message(
272                            MessageServiceMail::AreAllChangesAcknowledged {
273                                publisher_handle,
274                                data_writer_handle,
275                                reply_sender,
276                            },
277                        ))
278                        .await
279                        .ok();
280                    let reply = reply_receiver.receive().await;
281                    match reply {
282                        Ok(are_changes_acknowledged) => match are_changes_acknowledged {
283                            Ok(true) => return Ok(()),
284                            Ok(false) => (),
285                            Err(e) => return Err(e),
286                        },
287                        Err(_) => return Err(DdsError::Error(String::from("Channel error"))),
288                    }
289                }
290            }),
291        )
292        .await?
293    }
294
295    /// Async version of [`get_liveliness_lost_status`](crate::publication::data_writer::DataWriter::get_liveliness_lost_status).
296    #[tracing::instrument(skip(self))]
297    pub async fn get_liveliness_lost_status(&self) -> DdsResult<LivelinessLostStatus> {
298        todo!()
299    }
300
301    /// Async version of [`get_offered_deadline_missed_status`](crate::publication::data_writer::DataWriter::get_offered_deadline_missed_status).
302    #[tracing::instrument(skip(self))]
303    pub async fn get_offered_deadline_missed_status(
304        &self,
305    ) -> DdsResult<OfferedDeadlineMissedStatus> {
306        let (reply_sender, reply_receiver) = R::oneshot();
307        self.participant_address()
308            .send(DomainParticipantMail::Writer(
309                WriterServiceMail::GetOfferedDeadlineMissedStatus {
310                    publisher_handle: self.publisher.get_instance_handle().await,
311                    data_writer_handle: self.handle,
312                    reply_sender,
313                },
314            ))
315            .await?;
316        reply_receiver.receive().await?
317    }
318
319    /// Async version of [`get_offered_incompatible_qos_status`](crate::publication::data_writer::DataWriter::get_offered_incompatible_qos_status).
320    #[tracing::instrument(skip(self))]
321    pub async fn get_offered_incompatible_qos_status(
322        &self,
323    ) -> DdsResult<OfferedIncompatibleQosStatus> {
324        todo!()
325    }
326
327    /// Async version of [`get_publication_matched_status`](crate::publication::data_writer::DataWriter::get_publication_matched_status).
328    #[tracing::instrument(skip(self))]
329    pub async fn get_publication_matched_status(&self) -> DdsResult<PublicationMatchedStatus> {
330        let (reply_sender, reply_receiver) = R::oneshot();
331        self.participant_address()
332            .send(DomainParticipantMail::Writer(
333                WriterServiceMail::GetPublicationMatchedStatus {
334                    publisher_handle: self.publisher.get_instance_handle().await,
335                    data_writer_handle: self.handle,
336                    reply_sender,
337                },
338            ))
339            .await?;
340        reply_receiver.receive().await?
341    }
342
343    /// Async version of [`get_topic`](crate::publication::data_writer::DataWriter::get_topic).
344    #[tracing::instrument(skip(self))]
345    pub fn get_topic(&self) -> TopicDescriptionAsync<R> {
346        self.topic.clone()
347    }
348
349    /// Async version of [`get_publisher`](crate::publication::data_writer::DataWriter::get_publisher).
350    #[tracing::instrument(skip(self))]
351    pub fn get_publisher(&self) -> PublisherAsync<R> {
352        self.publisher.clone()
353    }
354
355    /// Async version of [`assert_liveliness`](crate::publication::data_writer::DataWriter::assert_liveliness).
356    #[tracing::instrument(skip(self))]
357    pub async fn assert_liveliness(&self) -> DdsResult<()> {
358        todo!()
359    }
360
361    /// Async version of [`get_matched_subscription_data`](crate::publication::data_writer::DataWriter::get_matched_subscription_data).
362    #[tracing::instrument(skip(self))]
363    pub async fn get_matched_subscription_data(
364        &self,
365        subscription_handle: InstanceHandle,
366    ) -> DdsResult<SubscriptionBuiltinTopicData> {
367        let (reply_sender, reply_receiver) = R::oneshot();
368        self.participant_address()
369            .send(DomainParticipantMail::Writer(
370                WriterServiceMail::GetMatchedSubscriptionData {
371                    publisher_handle: self.publisher.get_instance_handle().await,
372                    data_writer_handle: self.handle,
373                    subscription_handle,
374                    reply_sender,
375                },
376            ))
377            .await?;
378        reply_receiver.receive().await?
379    }
380
381    /// Async version of [`get_matched_subscriptions`](crate::publication::data_writer::DataWriter::get_matched_subscriptions).
382    #[tracing::instrument(skip(self))]
383    pub async fn get_matched_subscriptions(&self) -> DdsResult<Vec<InstanceHandle>> {
384        let (reply_sender, reply_receiver) = R::oneshot();
385        self.participant_address()
386            .send(DomainParticipantMail::Writer(
387                WriterServiceMail::GetMatchedSubscriptions {
388                    publisher_handle: self.publisher.get_instance_handle().await,
389                    data_writer_handle: self.handle,
390                    reply_sender,
391                },
392            ))
393            .await?;
394        reply_receiver.receive().await?
395    }
396}
397
398impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
399    /// Async version of [`set_qos`](crate::publication::data_writer::DataWriter::set_qos).
400    #[tracing::instrument(skip(self))]
401    pub async fn set_qos(&self, qos: QosKind<DataWriterQos>) -> DdsResult<()> {
402        let (reply_sender, reply_receiver) = R::oneshot();
403        self.participant_address()
404            .send(DomainParticipantMail::Writer(
405                WriterServiceMail::SetDataWriterQos {
406                    publisher_handle: self.publisher.get_instance_handle().await,
407                    data_writer_handle: self.handle,
408                    qos,
409                    reply_sender,
410                },
411            ))
412            .await?;
413        reply_receiver.receive().await?
414    }
415
416    /// Async version of [`get_qos`](crate::publication::data_writer::DataWriter::get_qos).
417    #[tracing::instrument(skip(self))]
418    pub async fn get_qos(&self) -> DdsResult<DataWriterQos> {
419        let (reply_sender, reply_receiver) = R::oneshot();
420        self.participant_address()
421            .send(DomainParticipantMail::Writer(
422                WriterServiceMail::GetDataWriterQos {
423                    publisher_handle: self.publisher.get_instance_handle().await,
424                    data_writer_handle: self.handle,
425                    reply_sender,
426                },
427            ))
428            .await?;
429        reply_receiver.receive().await?
430    }
431
432    /// Async version of [`get_statuscondition`](crate::publication::data_writer::DataWriter::get_statuscondition).
433    #[tracing::instrument(skip(self))]
434    pub fn get_statuscondition(&self) -> StatusConditionAsync<R> {
435        StatusConditionAsync::new(
436            self.status_condition_address.clone(),
437            self.publisher.get_participant().clock_handle().clone(),
438        )
439    }
440
441    /// Async version of [`get_status_changes`](crate::publication::data_writer::DataWriter::get_status_changes).
442    #[tracing::instrument(skip(self))]
443    pub async fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
444        todo!()
445    }
446
447    /// Async version of [`enable`](crate::publication::data_writer::DataWriter::enable).
448    #[tracing::instrument(skip(self))]
449    pub async fn enable(&self) -> DdsResult<()> {
450        let (reply_sender, reply_receiver) = R::oneshot();
451        self.participant_address()
452            .send(DomainParticipantMail::Writer(
453                WriterServiceMail::EnableDataWriter {
454                    publisher_handle: self.publisher.get_instance_handle().await,
455                    data_writer_handle: self.handle,
456                    participant_address: self.participant_address().clone(),
457                    reply_sender,
458                },
459            ))
460            .await?;
461        reply_receiver.receive().await?
462    }
463
464    /// Async version of [`get_instance_handle`](crate::publication::data_writer::DataWriter::get_instance_handle).
465    #[tracing::instrument(skip(self))]
466    pub async fn get_instance_handle(&self) -> InstanceHandle {
467        self.handle
468    }
469}
470impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
471    /// Async version of [`set_listener`](crate::publication::data_writer::DataWriter::set_listener).
472    #[tracing::instrument(skip(self, a_listener))]
473    pub async fn set_listener(
474        &self,
475        a_listener: Option<impl DataWriterListener<R, Foo> + Send + 'static>,
476        mask: &[StatusKind],
477    ) -> DdsResult<()> {
478        let (reply_sender, reply_receiver) = R::oneshot();
479        let listener_sender = a_listener.map(|l| {
480            DataWriterListenerActor::spawn(
481                l,
482                self.get_publisher().get_participant().spawner_handle(),
483            )
484        });
485        self.participant_address()
486            .send(DomainParticipantMail::Writer(
487                WriterServiceMail::SetListener {
488                    publisher_handle: self.publisher.get_instance_handle().await,
489                    data_writer_handle: self.handle,
490                    listener_sender,
491                    listener_mask: mask.to_vec(),
492                    reply_sender,
493                },
494            ))
495            .await?;
496        reply_receiver.receive().await?
497    }
498}