1use tracing::warn;
2
3use super::{condition::StatusConditionAsync, publisher::PublisherAsync};
4use crate::{
5 builtin_topics::SubscriptionBuiltinTopicData,
6 dcps::{
7 actor::ActorAddress,
8 domain_participant_actor::poll_timeout,
9 domain_participant_actor_mail::{
10 DomainParticipantMail, MessageServiceMail, WriterServiceMail,
11 },
12 listeners::data_writer_listener::DataWriterListenerActor,
13 status_condition_actor::StatusConditionActor,
14 },
15 dds_async::topic_description::TopicDescriptionAsync,
16 infrastructure::{
17 error::{DdsError, DdsResult},
18 instance::InstanceHandle,
19 qos::{DataWriterQos, QosKind},
20 status::{
21 LivelinessLostStatus, OfferedDeadlineMissedStatus, OfferedIncompatibleQosStatus,
22 PublicationMatchedStatus, StatusKind,
23 },
24 time::{Duration, Time},
25 type_support::DdsSerialize,
26 },
27 publication::data_writer_listener::DataWriterListener,
28 runtime::{ChannelSend, DdsRuntime, OneshotReceive},
29};
30use alloc::{boxed::Box, string::String, vec::Vec};
31use core::marker::PhantomData;
32
33pub struct DataWriterAsync<R: DdsRuntime, Foo> {
35 handle: InstanceHandle,
36 status_condition_address: ActorAddress<R, StatusConditionActor<R>>,
37 publisher: PublisherAsync<R>,
38 topic: TopicDescriptionAsync<R>,
39 phantom: PhantomData<Foo>,
40}
41
42impl<R: DdsRuntime, Foo> Clone for DataWriterAsync<R, Foo> {
43 fn clone(&self) -> Self {
44 Self {
45 handle: self.handle,
46 status_condition_address: self.status_condition_address.clone(),
47 publisher: self.publisher.clone(),
48 topic: self.topic.clone(),
49 phantom: self.phantom,
50 }
51 }
52}
53
54impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
55 pub(crate) fn new(
56 handle: InstanceHandle,
57 status_condition_address: ActorAddress<R, StatusConditionActor<R>>,
58 publisher: PublisherAsync<R>,
59 topic: TopicDescriptionAsync<R>,
60 ) -> Self {
61 Self {
62 handle,
63 status_condition_address,
64 publisher,
65 topic,
66 phantom: PhantomData,
67 }
68 }
69
70 pub(crate) fn participant_address(&self) -> &R::ChannelSender<DomainParticipantMail<R>> {
71 self.publisher.participant_address()
72 }
73
74 pub(crate) fn change_foo_type<T>(self) -> DataWriterAsync<R, T> {
75 DataWriterAsync {
76 handle: self.handle,
77 status_condition_address: self.status_condition_address,
78 publisher: self.publisher,
79 topic: self.topic,
80 phantom: PhantomData,
81 }
82 }
83}
84
85impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo>
86where
87 Foo: DdsSerialize,
88{
89 #[tracing::instrument(skip(self, instance))]
91 pub async fn register_instance(&self, instance: &Foo) -> DdsResult<Option<InstanceHandle>> {
92 let timestamp = self
93 .get_publisher()
94 .get_participant()
95 .get_current_time()
96 .await?;
97 self.register_instance_w_timestamp(instance, timestamp)
98 .await
99 }
100
101 #[tracing::instrument(skip(self, _instance))]
103 pub async fn register_instance_w_timestamp(
104 &self,
105 _instance: &Foo,
106 timestamp: Time,
107 ) -> DdsResult<Option<InstanceHandle>> {
108 todo!()
109 }
110
111 #[tracing::instrument(skip(self, instance))]
113 pub async fn unregister_instance(
114 &self,
115 instance: &Foo,
116 handle: Option<InstanceHandle>,
117 ) -> DdsResult<()> {
118 let timestamp = self
119 .get_publisher()
120 .get_participant()
121 .get_current_time()
122 .await?;
123 self.unregister_instance_w_timestamp(instance, handle, timestamp)
124 .await
125 }
126
127 #[tracing::instrument(skip(self, instance))]
129 pub async fn unregister_instance_w_timestamp(
130 &self,
131 instance: &Foo,
132 handle: Option<InstanceHandle>,
133 timestamp: Time,
134 ) -> DdsResult<()> {
135 let (reply_sender, reply_receiver) = R::oneshot();
136 let serialized_data = instance.serialize_data()?;
137 self.participant_address()
138 .send(DomainParticipantMail::Writer(
139 WriterServiceMail::UnregisterInstance {
140 publisher_handle: self.publisher.get_instance_handle().await,
141 data_writer_handle: self.handle,
142 serialized_data,
143 timestamp,
144 reply_sender,
145 },
146 ))
147 .await?;
148 reply_receiver.receive().await?
149 }
150
151 #[tracing::instrument(skip(self, _key_holder))]
153 pub async fn get_key_value(
154 &self,
155 _key_holder: &mut Foo,
156 _handle: InstanceHandle,
157 ) -> DdsResult<()> {
158 todo!()
159 }
160
161 #[tracing::instrument(skip(self, instance))]
163 pub async fn lookup_instance(&self, instance: &Foo) -> DdsResult<Option<InstanceHandle>> {
164 let (reply_sender, reply_receiver) = R::oneshot();
165 let serialized_data = instance.serialize_data()?;
166 self.participant_address()
167 .send(DomainParticipantMail::Writer(
168 WriterServiceMail::LookupInstance {
169 publisher_handle: self.publisher.get_instance_handle().await,
170 data_writer_handle: self.handle,
171 serialized_data,
172 reply_sender,
173 },
174 ))
175 .await?;
176 reply_receiver.receive().await?
177 }
178
179 #[tracing::instrument(skip(self, data))]
181 pub async fn write(&self, data: &Foo, handle: Option<InstanceHandle>) -> DdsResult<()> {
182 let timestamp = self
183 .get_publisher()
184 .get_participant()
185 .get_current_time()
186 .await?;
187 self.write_w_timestamp(data, handle, timestamp).await
188 }
189
190 #[tracing::instrument(skip(self, data))]
192 pub async fn write_w_timestamp(
193 &self,
194 data: &Foo,
195 handle: Option<InstanceHandle>,
196 timestamp: Time,
197 ) -> DdsResult<()> {
198 let (reply_sender, reply_receiver) = R::oneshot();
199 let serialized_data = data.serialize_data()?;
200 self.participant_address()
201 .send(DomainParticipantMail::Writer(
202 WriterServiceMail::WriteWTimestamp {
203 participant_address: self.participant_address().clone(),
204 publisher_handle: self.publisher.get_instance_handle().await,
205 data_writer_handle: self.handle,
206 serialized_data,
207 timestamp,
208 reply_sender,
209 },
210 ))
211 .await?;
212 reply_receiver.receive().await?
213 }
214
215 #[tracing::instrument(skip(self, data))]
217 pub async fn dispose(&self, data: &Foo, handle: Option<InstanceHandle>) -> DdsResult<()> {
218 let timestamp = self
219 .get_publisher()
220 .get_participant()
221 .get_current_time()
222 .await?;
223 self.dispose_w_timestamp(data, handle, timestamp).await
224 }
225
226 #[tracing::instrument(skip(self, data))]
228 pub async fn dispose_w_timestamp(
229 &self,
230 data: &Foo,
231 handle: Option<InstanceHandle>,
232 timestamp: Time,
233 ) -> DdsResult<()> {
234 let (reply_sender, reply_receiver) = R::oneshot();
235 let serialized_data = data.serialize_data()?;
236 self.participant_address()
237 .send(DomainParticipantMail::Writer(
238 WriterServiceMail::DisposeWTimestamp {
239 publisher_handle: self.publisher.get_instance_handle().await,
240 data_writer_handle: self.handle,
241 serialized_data,
242 timestamp,
243 reply_sender,
244 },
245 ))
246 .await?;
247 reply_receiver.receive().await?
248 }
249}
250
251impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
252 #[tracing::instrument(skip(self))]
254 pub async fn wait_for_acknowledgments(&self, max_wait: Duration) -> DdsResult<()> {
255 let publisher_handle = self.get_publisher().get_instance_handle().await;
256 let timer_handle = self
257 .get_publisher()
258 .get_participant()
259 .timer_handle()
260 .clone();
261 let participant_address = self.participant_address().clone();
262 let data_writer_handle = self.handle;
263
264 poll_timeout(
265 timer_handle,
266 max_wait.into(),
267 Box::pin(async move {
268 loop {
269 let (reply_sender, reply_receiver) = R::oneshot();
270 participant_address
271 .send(DomainParticipantMail::Message(
272 MessageServiceMail::AreAllChangesAcknowledged {
273 publisher_handle,
274 data_writer_handle,
275 reply_sender,
276 },
277 ))
278 .await
279 .ok();
280 let reply = reply_receiver.receive().await;
281 match reply {
282 Ok(are_changes_acknowledged) => match are_changes_acknowledged {
283 Ok(true) => return Ok(()),
284 Ok(false) => (),
285 Err(e) => return Err(e),
286 },
287 Err(_) => return Err(DdsError::Error(String::from("Channel error"))),
288 }
289 }
290 }),
291 )
292 .await?
293 }
294
295 #[tracing::instrument(skip(self))]
297 pub async fn get_liveliness_lost_status(&self) -> DdsResult<LivelinessLostStatus> {
298 todo!()
299 }
300
301 #[tracing::instrument(skip(self))]
303 pub async fn get_offered_deadline_missed_status(
304 &self,
305 ) -> DdsResult<OfferedDeadlineMissedStatus> {
306 let (reply_sender, reply_receiver) = R::oneshot();
307 self.participant_address()
308 .send(DomainParticipantMail::Writer(
309 WriterServiceMail::GetOfferedDeadlineMissedStatus {
310 publisher_handle: self.publisher.get_instance_handle().await,
311 data_writer_handle: self.handle,
312 reply_sender,
313 },
314 ))
315 .await?;
316 reply_receiver.receive().await?
317 }
318
319 #[tracing::instrument(skip(self))]
321 pub async fn get_offered_incompatible_qos_status(
322 &self,
323 ) -> DdsResult<OfferedIncompatibleQosStatus> {
324 todo!()
325 }
326
327 #[tracing::instrument(skip(self))]
329 pub async fn get_publication_matched_status(&self) -> DdsResult<PublicationMatchedStatus> {
330 let (reply_sender, reply_receiver) = R::oneshot();
331 self.participant_address()
332 .send(DomainParticipantMail::Writer(
333 WriterServiceMail::GetPublicationMatchedStatus {
334 publisher_handle: self.publisher.get_instance_handle().await,
335 data_writer_handle: self.handle,
336 reply_sender,
337 },
338 ))
339 .await?;
340 reply_receiver.receive().await?
341 }
342
343 #[tracing::instrument(skip(self))]
345 pub fn get_topic(&self) -> TopicDescriptionAsync<R> {
346 self.topic.clone()
347 }
348
349 #[tracing::instrument(skip(self))]
351 pub fn get_publisher(&self) -> PublisherAsync<R> {
352 self.publisher.clone()
353 }
354
355 #[tracing::instrument(skip(self))]
357 pub async fn assert_liveliness(&self) -> DdsResult<()> {
358 todo!()
359 }
360
361 #[tracing::instrument(skip(self))]
363 pub async fn get_matched_subscription_data(
364 &self,
365 subscription_handle: InstanceHandle,
366 ) -> DdsResult<SubscriptionBuiltinTopicData> {
367 let (reply_sender, reply_receiver) = R::oneshot();
368 self.participant_address()
369 .send(DomainParticipantMail::Writer(
370 WriterServiceMail::GetMatchedSubscriptionData {
371 publisher_handle: self.publisher.get_instance_handle().await,
372 data_writer_handle: self.handle,
373 subscription_handle,
374 reply_sender,
375 },
376 ))
377 .await?;
378 reply_receiver.receive().await?
379 }
380
381 #[tracing::instrument(skip(self))]
383 pub async fn get_matched_subscriptions(&self) -> DdsResult<Vec<InstanceHandle>> {
384 let (reply_sender, reply_receiver) = R::oneshot();
385 self.participant_address()
386 .send(DomainParticipantMail::Writer(
387 WriterServiceMail::GetMatchedSubscriptions {
388 publisher_handle: self.publisher.get_instance_handle().await,
389 data_writer_handle: self.handle,
390 reply_sender,
391 },
392 ))
393 .await?;
394 reply_receiver.receive().await?
395 }
396}
397
398impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
399 #[tracing::instrument(skip(self))]
401 pub async fn set_qos(&self, qos: QosKind<DataWriterQos>) -> DdsResult<()> {
402 let (reply_sender, reply_receiver) = R::oneshot();
403 self.participant_address()
404 .send(DomainParticipantMail::Writer(
405 WriterServiceMail::SetDataWriterQos {
406 publisher_handle: self.publisher.get_instance_handle().await,
407 data_writer_handle: self.handle,
408 qos,
409 reply_sender,
410 },
411 ))
412 .await?;
413 reply_receiver.receive().await?
414 }
415
416 #[tracing::instrument(skip(self))]
418 pub async fn get_qos(&self) -> DdsResult<DataWriterQos> {
419 let (reply_sender, reply_receiver) = R::oneshot();
420 self.participant_address()
421 .send(DomainParticipantMail::Writer(
422 WriterServiceMail::GetDataWriterQos {
423 publisher_handle: self.publisher.get_instance_handle().await,
424 data_writer_handle: self.handle,
425 reply_sender,
426 },
427 ))
428 .await?;
429 reply_receiver.receive().await?
430 }
431
432 #[tracing::instrument(skip(self))]
434 pub fn get_statuscondition(&self) -> StatusConditionAsync<R> {
435 StatusConditionAsync::new(
436 self.status_condition_address.clone(),
437 self.publisher.get_participant().clock_handle().clone(),
438 )
439 }
440
441 #[tracing::instrument(skip(self))]
443 pub async fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
444 todo!()
445 }
446
447 #[tracing::instrument(skip(self))]
449 pub async fn enable(&self) -> DdsResult<()> {
450 let (reply_sender, reply_receiver) = R::oneshot();
451 self.participant_address()
452 .send(DomainParticipantMail::Writer(
453 WriterServiceMail::EnableDataWriter {
454 publisher_handle: self.publisher.get_instance_handle().await,
455 data_writer_handle: self.handle,
456 participant_address: self.participant_address().clone(),
457 reply_sender,
458 },
459 ))
460 .await?;
461 reply_receiver.receive().await?
462 }
463
464 #[tracing::instrument(skip(self))]
466 pub async fn get_instance_handle(&self) -> InstanceHandle {
467 self.handle
468 }
469}
470impl<R: DdsRuntime, Foo> DataWriterAsync<R, Foo> {
471 #[tracing::instrument(skip(self, a_listener))]
473 pub async fn set_listener(
474 &self,
475 a_listener: Option<impl DataWriterListener<R, Foo> + Send + 'static>,
476 mask: &[StatusKind],
477 ) -> DdsResult<()> {
478 let (reply_sender, reply_receiver) = R::oneshot();
479 let listener_sender = a_listener.map(|l| {
480 DataWriterListenerActor::spawn(
481 l,
482 self.get_publisher().get_participant().spawner_handle(),
483 )
484 });
485 self.participant_address()
486 .send(DomainParticipantMail::Writer(
487 WriterServiceMail::SetListener {
488 publisher_handle: self.publisher.get_instance_handle().await,
489 data_writer_handle: self.handle,
490 listener_sender,
491 listener_mask: mask.to_vec(),
492 reply_sender,
493 },
494 ))
495 .await?;
496 reply_receiver.receive().await?
497 }
498}