use std::io;
use serde::de::DeserializeOwned;
use mio::{Evented, Poll, PollOpt, Ready, Token};
use crate::{
dds::{
data_types::GUID,
no_key::datasample::DataSample,
qos::{HasQoSPolicy, QosPolicies},
readcondition::ReadCondition,
traits::serde_adapters::no_key::DeserializerAdapter,
values::result::Result,
with_key::{datareader as datareader_with_key, datasample::DataSample as WithKeyDataSample},
},
serialization::CDRDeserializerAdapter,
structure::entity::RTPSEntity,
};
use super::wrappers::{DAWrapper, NoKeyWrapper};
pub type DataReaderCdr<D> = DataReader<D, CDRDeserializerAdapter<D>>;
pub struct DataReader<D: DeserializeOwned, DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>> {
keyed_datareader: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
}
impl<D: 'static, DA> DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
pub(crate) fn from_keyed(
keyed: datareader_with_key::DataReader<NoKeyWrapper<D>, DAWrapper<DA>>,
) -> Self {
Self {
keyed_datareader: keyed,
}
}
pub fn read(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<&D>>> {
let values: Vec<WithKeyDataSample<&NoKeyWrapper<D>>> =
self.keyed_datareader.read(max_samples, read_condition)?;
let mut result = Vec::with_capacity(values.len());
for ks in values {
if let Some(s) = DataSample::<D>::from_with_key_ref(ks) {
result.push(s);
}
}
Ok(result)
}
pub fn take(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<D>>> {
let values: Vec<WithKeyDataSample<NoKeyWrapper<D>>> =
self.keyed_datareader.take(max_samples, read_condition)?;
let mut result = Vec::with_capacity(values.len());
for ks in values {
if let Some(s) = DataSample::<D>::from_with_key(ks) {
result.push(s);
}
}
Ok(result)
}
pub fn read_next_sample(&mut self) -> Result<Option<DataSample<&D>>> {
let mut ds = self.read(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn take_next_sample(&mut self) -> Result<Option<DataSample<D>>> {
let mut ds = self.take(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn iterator(&mut self) -> Result<impl Iterator<Item = &D>> {
Ok(
self
.read(std::usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = &D>> {
Ok(
self
.read(std::usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_iterator(&mut self) -> Result<impl Iterator<Item = D>> {
Ok(
self
.take(std::usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = D>> {
Ok(
self
.take(std::usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
}
impl<D, DA> Evented for DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self
.keyed_datareader
.notification_receiver
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self
.keyed_datareader
.notification_receiver
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.keyed_datareader.notification_receiver.deregister(poll)
}
}
impl<D, DA> HasQoSPolicy for DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn qos(&self) -> QosPolicies {
self.keyed_datareader.qos()
}
}
impl<D, DA> RTPSEntity for DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn guid(&self) -> GUID {
self.keyed_datareader.guid()
}
}