1use std::{io, marker::PhantomData};
2
3use mio::{Evented, Poll, PollOpt, Ready, Token};
4use futures::{
5 pin_mut,
6 stream::{FusedStream, StreamExt},
7 Future,
8};
9use rustdds::{
10 dds::{ReadError, ReadResult, WriteResult},
11 serialization::CdrDeserializeSeedDecoder,
12 *,
13};
14use serde::{de::DeserializeOwned, Serialize};
15
16use super::{gid::Gid, message_info::MessageInfo, node::Node};
17
18pub struct Publisher<M: Serialize> {
23 datawriter: no_key::DataWriterCdr<M>,
24}
25
26impl<M: Serialize> Publisher<M> {
27 pub(crate) fn new(datawriter: no_key::DataWriterCdr<M>) -> Publisher<M> {
29 Publisher { datawriter }
30 }
31
32 pub fn publish(&self, message: M) -> WriteResult<(), M> {
33 self.datawriter.write(message, Some(Timestamp::now()))
34 }
35
36 pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
45 self.datawriter.assert_liveliness()
46 }
47
48 pub fn guid(&self) -> rustdds::GUID {
49 self.datawriter.guid()
50 }
51
52 pub fn gid(&self) -> Gid {
53 self.guid().into()
54 }
55
56 pub fn get_subscription_count(&self, my_node: &Node) -> usize {
61 my_node.get_subscription_count(self.guid())
62 }
63
64 pub fn wait_for_subscription(&self, my_node: &Node) -> impl Future<Output = ()> + Send {
70 my_node.wait_for_reader(self.guid())
71 }
72
73 pub async fn async_publish(&self, message: M) -> WriteResult<(), M> {
74 self
75 .datawriter
76 .async_write(message, Some(Timestamp::now()))
77 .await
78 }
79
80 #[allow(dead_code)] pub(crate) async fn async_publish_with_options(
82 &self,
83 message: M,
84 wo: WriteOptions,
85 ) -> dds::WriteResult<rustdds::rpc::SampleIdentity, M> {
86 self.datawriter.async_write_with_options(message, wo).await
87 }
88}
89pub struct Subscription<M> {
100 datareader: no_key::SimpleDataReaderCdr<M>,
101}
102
103impl<M> Subscription<M>
104where
105 M: 'static,
106{
107 pub(crate) fn new(datareader: no_key::SimpleDataReaderCdr<M>) -> Subscription<M> {
109 Subscription { datareader }
110 }
111
112 pub fn take_seed<'de, S>(&self, seed: S) -> ReadResult<Option<(M, MessageInfo)>>
113 where
114 S: serde::de::DeserializeSeed<'de, Value = M> + Clone,
115 M: 'static,
116 {
117 self.datareader.drain_read_notifications();
118 let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
119 let ds: Option<no_key::DeserializedCacheChange<M>> =
120 self.datareader.try_take_one_with(decoder)?;
121 Ok(ds.map(dcc_to_value_and_messageinfo))
122 }
123
124 pub fn async_stream_seed<'a, 'de, S>(
126 &'a self,
127 seed: S,
128 ) -> impl FusedStream<Item = ReadResult<(M, MessageInfo)>> + 'a
129 where
130 S: serde::de::DeserializeSeed<'de, Value = M> + Clone + 'a,
131 M: 'static,
132 {
133 let decoder = CdrDeserializeSeedDecoder::new(seed, PhantomData::<()>);
134 self
135 .datareader
136 .as_async_stream_with(decoder)
137 .map(|result| result.map(dcc_to_value_and_messageinfo))
138 }
139}
140
141impl<M: 'static + DeserializeOwned> Subscription<M> {
142 pub fn take(&self) -> ReadResult<Option<(M, MessageInfo)>> {
143 self.datareader.drain_read_notifications();
144 let ds: Option<no_key::DeserializedCacheChange<M>> = self.datareader.try_take_one()?;
145 Ok(ds.map(dcc_to_value_and_messageinfo))
146 }
147
148 pub async fn async_take(&self) -> ReadResult<(M, MessageInfo)> {
149 let async_stream = self.datareader.as_async_stream();
150 pin_mut!(async_stream);
151 match async_stream.next().await {
152 Some(Err(e)) => Err(e),
153 Some(Ok(ds)) => Ok(dcc_to_value_and_messageinfo(ds)),
154 None => {
156 read_error_internal!("async_take(): SimpleDataReader value stream unexpectedly ended!")
157 }
158 }
159 }
160
161 pub fn async_stream(&self) -> impl FusedStream<Item = ReadResult<(M, MessageInfo)>> + '_ {
163 self
164 .datareader
165 .as_async_stream()
166 .map(|result| result.map(dcc_to_value_and_messageinfo))
167 }
168}
169
170impl<M> Subscription<M>
171where
172 M: 'static,
173{
174 pub fn guid(&self) -> rustdds::GUID {
175 self.datareader.guid()
176 }
177
178 pub fn gid(&self) -> Gid {
179 self.guid().into()
180 }
181
182 pub fn get_publisher_count(&self, my_node: &Node) -> usize {
187 my_node.get_publisher_count(self.guid())
188 }
189
190 pub fn wait_for_publisher(&self, my_node: &Node) -> impl Future<Output = ()> + Send {
196 my_node.wait_for_writer(self.guid())
197 }
198}
199
200#[inline]
202fn dcc_to_value_and_messageinfo<M>(dcc: no_key::DeserializedCacheChange<M>) -> (M, MessageInfo) {
203 let mi = MessageInfo::from(&dcc);
204 (dcc.into_value(), mi)
205}
206
207impl<D> Evented for Subscription<D>
208where
209 D: DeserializeOwned,
210{
211 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
214 self.datareader.register(poll, token, interest, opts)
215 }
216
217 fn reregister(
218 &self,
219 poll: &Poll,
220 token: Token,
221 interest: Ready,
222 opts: PollOpt,
223 ) -> io::Result<()> {
224 self.datareader.reregister(poll, token, interest, opts)
225 }
226
227 fn deregister(&self, poll: &Poll) -> io::Result<()> {
228 self.datareader.deregister(poll)
229 }
230}