use std::io;
use serde::{de::DeserializeOwned};
use mio::{Poll, Token, Ready, PollOpt, Evented};
use crate::{
structure::{
entity::RTPSEntity ,
},
};
use crate::dds::{traits::serde_adapters::*, values::result::*, qos::*,
readcondition::*, data_types::*, };
use crate::dds::with_key::datareader as datareader_with_key;
use crate::dds::with_key::datasample::DataSample as WithKeyDataSample;
use crate::serialization::CDRDeserializerAdapter;
use crate::dds::no_key::datasample::DataSample;
use super::{
wrappers::{NoKeyWrapper, SAWrapper},
};
pub struct DataReader<
D: DeserializeOwned,
DA: DeserializerAdapter<D> = CDRDeserializerAdapter<D>,
> {
keyed_datareader: datareader_with_key::DataReader<NoKeyWrapper<D>, SAWrapper<DA>>,
}
impl<D: 'static, DA> DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
pub(crate) fn from_keyed(
keyed: datareader_with_key::DataReader<NoKeyWrapper<D>, SAWrapper<DA>>,
) -> DataReader<D, DA> {
DataReader {
keyed_datareader: keyed,
}
}
pub fn read(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<&D>>> {
let values: Vec<WithKeyDataSample<&NoKeyWrapper<D>>> =
self.keyed_datareader.read(max_samples, read_condition)?;
let mut result = Vec::with_capacity(values.len());
for ks in values {
if let Some(s) = DataSample::<D>::from_with_key_ref(ks) {
result.push(s)
}
}
Ok(result)
}
pub fn take(
&mut self,
max_samples: usize,
read_condition: ReadCondition,
) -> Result<Vec<DataSample<D>>> {
let values: Vec<WithKeyDataSample<NoKeyWrapper<D>>> =
self.keyed_datareader.take(max_samples, read_condition)?;
let mut result = Vec::with_capacity(values.len());
for ks in values {
if let Some(s) = DataSample::<D>::from_with_key(ks) {
result.push(s)
}
}
Ok(result)
}
pub fn read_next_sample(&mut self) -> Result<Option<DataSample<&D>>> {
let mut ds = self.read(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn take_next_sample(&mut self) -> Result<Option<DataSample<D>>> {
let mut ds = self.take(1, ReadCondition::not_read())?;
Ok(ds.pop())
}
pub fn iterator(&mut self) -> Result<impl Iterator<Item = &D>> {
Ok(
self
.read(std::usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = &D>> {
Ok(
self
.read(std::usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_iterator(&mut self) -> Result<impl Iterator<Item = D>> {
Ok(
self
.take(std::usize::MAX, ReadCondition::not_read())?
.into_iter()
.map(|ds| ds.value),
)
}
pub fn into_conditional_iterator(
&mut self,
read_condition: ReadCondition,
) -> Result<impl Iterator<Item = D>> {
Ok(
self
.take(std::usize::MAX, read_condition)?
.into_iter()
.map(|ds| ds.value),
)
}
}
impl<D, DA> Evented for DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self
.keyed_datareader
.notification_receiver
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self
.keyed_datareader
.notification_receiver
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.keyed_datareader.notification_receiver.deregister(poll)
}
}
impl<D, DA> HasQoSPolicy for DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn get_qos(&self) -> QosPolicies {
self.keyed_datareader.get_qos()
}
}
impl<D, DA> RTPSEntity for DataReader<D, DA>
where
D: DeserializeOwned,
DA: DeserializerAdapter<D>,
{
fn get_guid(&self) -> GUID {
self.keyed_datareader.get_guid()
}
}