use std::pin::Pin;
use std::sync::Arc;
use futures::{Stream, StreamExt};
use tokio::runtime::Runtime;
use crate::TimeSpec;
use crate::core::{self, StreamResult};
use crate::error::Result;
use crate::parameter_value::{GetParameterTuple, GetParameterValue};
pub struct Subscription {
inner: core::Subscription,
rt: Arc<Runtime>,
}
impl Subscription {
pub(crate) fn new(inner: core::Subscription, rt: Arc<Runtime>) -> Self {
Self { inner, rt }
}
pub(crate) fn into_inner(self) -> core::Subscription {
self.inner
}
pub fn id(&self) -> u32 {
self.inner.id()
}
pub fn name(&self) -> &str {
self.inner.name()
}
pub fn read<V>(&self) -> Option<(TimeSpec, V)>
where
V: GetParameterTuple,
{
self.inner.read()
}
pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
where
V: GetParameterValue + Default,
{
self.inner.read_all()
}
pub fn notify<F>(&self, cb: F)
where
F: Fn(&core::Subscription) + Send + Sync + 'static,
{
self.inner.notify(cb);
}
pub fn latest<V>(&self) -> Result<(TimeSpec, V)>
where
V: GetParameterTuple,
{
self.rt.block_on(self.inner.latest::<V>())
}
pub fn iter<V>(&self, capacity: usize) -> StreamIter<V>
where
V: GetParameterTuple + Send + 'static,
{
StreamIter {
rt: Arc::clone(&self.rt),
stream: Box::pin(self.inner.stream::<V>(capacity)),
}
}
}
impl Clone for Subscription {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
rt: Arc::clone(&self.rt),
}
}
}
pub struct StreamIter<V>
where
V: GetParameterTuple + Send + 'static,
{
rt: Arc<Runtime>,
stream: Pin<Box<dyn Stream<Item = StreamResult<V>> + Send>>,
}
impl<V> Iterator for StreamIter<V>
where
V: GetParameterTuple + Send + 'static,
{
type Item = StreamResult<V>;
fn next(&mut self) -> Option<Self::Item> {
self.rt.block_on(self.stream.next())
}
}