use std::{
io,
pin::Pin,
task::{Context, Poll},
};
use futures::stream::{FusedStream, Stream};
use crate::{
dds::{
adapters::no_key::{DefaultDecoder, DeserializerAdapter},
no_key::datasample::DataSample,
qos::{HasQoSPolicy, QosPolicies},
readcondition::ReadCondition,
result::ReadResult,
statusevents::DataReaderStatus,
with_key::{
datareader as datareader_with_key,
datasample::{DataSample as WithKeyDataSample, Sample},
BareDataReaderStream as WithKeyBareDataReaderStream, DataReader as WithKeyDataReader,
DataReaderEventStream as WithKeyDataReaderEventStream,
DataReaderStream as WithKeyDataReaderStream,
},
},
serialization::CDRDeserializerAdapter,
structure::entity::RTPSEntity,
StatusEvented, GUID,
};
use super::wrappers::{DAWrapper, NoKeyWrapper};
pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
pub struct DataReader<D, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
keyed_datareader: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
}
impl<D: 'static, DA> DataReader<D, DA>
where
DA: DeserializerAdapter<D>,
{
pub(crate) fn from_keyed(
keyed: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
) -> Self {
Self {
keyed_datareader: keyed,
}
}
}
impl<D: 'static, DA> DataReader<D, DA>
where
DA: DefaultDecoder<D>,
{
pub fn read(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> ReadResult<Vec<DataSample<&D>>> {
let values: Vec<WithKeyDataSample<&NoKeyWrapper<D>>> =
self.keyed_datareader.read(max_samples, read_condition)?;
let mut result = Vec::with_capacity(values.len());
for ks in values {
if let Some(s) = DataSample::<D>::from_with_key_ref(ks) {
result.push(s);
}
}
Ok(result)
}
pub fn take(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> ReadResult<Vec<DataSample<D>>> {
let values: Vec<WithKeyDataSample<NoKeyWrapper<D>>> =
self.keyed_datareader.take(max_samples, read_condition)?;
let mut result = Vec::with_capacity(values.len());
for ks in values {
if let Some(s) = DataSample::<D>::from_with_key(ks) {
result.push(s);
}
}
Ok(result)
}
pub fn read_next_sample(&mut self) -> ReadResult<Option<DataSample<&D>>> {
let mut ds = self.read(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn take_next_sample(&mut self) -> ReadResult<Option<DataSample<D>>> {
let mut ds = self.take(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn iterator(&mut self) -> ReadResult<impl Iterator<Item = &D>> {
Ok(
self
.read(usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> ReadResult<impl Iterator<Item = &D>> {
Ok(
self
.read(usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_iterator(&mut self) -> ReadResult<impl Iterator<Item = D>> {
Ok(
self
.take(usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> ReadResult<impl Iterator<Item = D>> {
Ok(
self
.take(usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn async_bare_sample_stream(self) -> BareDataReaderStream<D, DA> {
BareDataReaderStream {
keyed_stream: self.keyed_datareader.async_bare_sample_stream(),
}
}
pub fn async_sample_stream(self) -> DataReaderStream<D, DA> {
DataReaderStream {
keyed_stream: self.keyed_datareader.async_sample_stream(),
}
}
}
impl<D, DA> mio_06::Evented for DataReader<D, DA>
where
DA: DeserializerAdapter<D>,
{
fn register(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self.keyed_datareader.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.keyed_datareader
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
self.keyed_datareader.deregister(poll)
}
}
impl<D, DA> mio_08::event::Source for DataReader<D, DA>
where
DA: DeserializerAdapter<D>,
{
fn register(
&mut self,
registry: &mio_08::Registry,
token: mio_08::Token,
interests: mio_08::Interest,
) -> io::Result<()> {
<WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::register(
&mut self.keyed_datareader,
registry,
token,
interests,
)
}
fn reregister(
&mut self,
registry: &mio_08::Registry,
token: mio_08::Token,
interests: mio_08::Interest,
) -> io::Result<()> {
<WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::reregister(
&mut self.keyed_datareader,
registry,
token,
interests,
)
}
fn deregister(&mut self, registry: &mio_08::Registry) -> io::Result<()> {
<WithKeyDataReader<NoKeyWrapper<D>, DAWrapper<DA>> as mio_08::event::Source>::deregister(
&mut self.keyed_datareader,
registry,
)
}
}
use crate::no_key::SimpleDataReaderEventStream;
impl<'a, D, DA> StatusEvented<'a, DataReaderStatus, SimpleDataReaderEventStream<'a, D, DA>>
for DataReader<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
fn as_status_evented(&mut self) -> &dyn mio_06::Evented {
self.keyed_datareader.as_status_evented()
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self.keyed_datareader.as_status_source()
}
fn as_async_status_stream(&'a self) -> SimpleDataReaderEventStream<'a, D, DA> {
SimpleDataReaderEventStream::from_keyed(self.keyed_datareader.as_async_status_stream())
}
fn try_recv_status(&self) -> Option<DataReaderStatus> {
self.keyed_datareader.try_recv_status()
}
}
impl<D, DA> HasQoSPolicy for DataReader<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
fn qos(&self) -> QosPolicies {
self.keyed_datareader.qos()
}
}
impl<D, DA> RTPSEntity for DataReader<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.keyed_datareader.guid()
}
}
pub struct BareDataReaderStream<
D: 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
keyed_stream: WithKeyBareDataReaderStream<NoKeyWrapper<D>, DAWrapper<DA>>,
}
impl<D, DA> BareDataReaderStream<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
DataReaderEventStream {
keyed_stream: self.keyed_stream.async_event_stream(),
}
}
}
impl<D, DA> Unpin for BareDataReaderStream<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
}
impl<D, DA> Stream for BareDataReaderStream<D, DA>
where
D: 'static,
DA: DefaultDecoder<D>,
{
type Item = ReadResult<D>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut keyed_stream = Pin::new(&mut Pin::into_inner(self).keyed_stream);
loop {
match keyed_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Err(e))) => break Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(Sample::Value(d)))) => break Poll::Ready(Some(Ok(d.d))),
Poll::Ready(Some(Ok(Sample::Dispose(_)))) => (), Poll::Ready(None) => break Poll::Ready(None), Poll::Pending => break Poll::Pending,
}
} }
}
impl<D, DA> FusedStream for BareDataReaderStream<D, DA>
where
D: 'static,
DA: DefaultDecoder<D>,
{
fn is_terminated(&self) -> bool {
false }
}
pub struct DataReaderStream<
D: 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
keyed_stream: WithKeyDataReaderStream<NoKeyWrapper<D>, DAWrapper<DA>>,
}
impl<D, DA> DataReaderStream<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
pub fn async_event_stream(&self) -> DataReaderEventStream<D, DA> {
DataReaderEventStream {
keyed_stream: self.keyed_stream.async_event_stream(),
}
}
}
impl<D, DA> Unpin for DataReaderStream<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
}
impl<D, DA> Stream for DataReaderStream<D, DA>
where
D: 'static,
DA: DefaultDecoder<D>,
{
type Item = ReadResult<DataSample<D>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut keyed_stream = Pin::new(&mut Pin::into_inner(self).keyed_stream);
loop {
match keyed_stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Err(e))) => break Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(d))) => match d.value() {
Sample::Value(_) => match DataSample::<D>::from_with_key(d) {
Some(d) => break Poll::Ready(Some(Ok(d))),
None => break Poll::Ready(None), },
Sample::Dispose(_) => (),
},
Poll::Ready(None) => break Poll::Ready(None), Poll::Pending => break Poll::Pending,
}
} }
}
impl<D, DA> FusedStream for DataReaderStream<D, DA>
where
D: 'static,
DA: DefaultDecoder<D>,
{
fn is_terminated(&self) -> bool {
false }
}
pub struct DataReaderEventStream<
D: 'static,
DA: DeserializerAdapter<D> + 'static = CDRDeserializerAdapter<D>,
> {
keyed_stream: WithKeyDataReaderEventStream<NoKeyWrapper<D>, DAWrapper<DA>>,
}
impl<D, DA> Stream for DataReaderEventStream<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
type Item = DataReaderStatus;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut Pin::into_inner(self).keyed_stream).poll_next(cx)
}
}
impl<D, DA> FusedStream for DataReaderEventStream<D, DA>
where
D: 'static,
DA: DeserializerAdapter<D>,
{
fn is_terminated(&self) -> bool {
false }
}