motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
//! Blocking wrapper around [`crate::core::Subscription`].
//!
//! `read` / `read_all` / `notify` / `id` / `name` are forwarded
//! directly — they're sync on the async type too, so no runtime is
//! involved. `latest` uses the shared `Runtime` handed in by
//! [`super::Subscribe`] to block on the async future. `stream()` is
//! exposed as a blocking [`Iterator`] — each `.next()` call blocks
//! on the next broadcast frame via the same runtime.

use std::pin::Pin;
use std::sync::Arc;

use futures::{Stream, StreamExt};
use tokio::runtime::Runtime;

use crate::TimeSpec;
use crate::core::{self, StreamResult};
use crate::error::Result;
use crate::parameter_value::{GetParameterTuple, GetParameterValue};

pub struct Subscription {
    inner: core::Subscription,
    rt: Arc<Runtime>,
}

impl Subscription {
    pub(crate) fn new(inner: core::Subscription, rt: Arc<Runtime>) -> Self {
        Self { inner, rt }
    }

    pub(crate) fn into_inner(self) -> core::Subscription {
        self.inner
    }

    pub fn id(&self) -> u32 {
        self.inner.id()
    }

    pub fn name(&self) -> &str {
        self.inner.name()
    }

    pub fn read<V>(&self) -> Option<(TimeSpec, V)>
    where
        V: GetParameterTuple,
    {
        self.inner.read()
    }

    pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
    where
        V: GetParameterValue + Default,
    {
        self.inner.read_all()
    }

    pub fn notify<F>(&self, cb: F)
    where
        F: Fn(&core::Subscription) + Send + Sync + 'static,
    {
        self.inner.notify(cb);
    }

    /// Blocking analogue of [`core::Subscription::latest`].
    pub fn latest<V>(&self) -> Result<(TimeSpec, V)>
    where
        V: GetParameterTuple,
    {
        self.rt.block_on(self.inner.latest::<V>())
    }

    /// Blocking iterator over the broadcast ring. Each `next()`
    /// call blocks on the next item via the shared runtime. The
    /// iterator ends (returns `None`) when the broadcast channel
    /// closes — i.e. when every `Subscription` clone is dropped.
    pub fn iter<V>(&self, capacity: usize) -> StreamIter<V>
    where
        V: GetParameterTuple + Send + 'static,
    {
        StreamIter {
            rt: Arc::clone(&self.rt),
            stream: Box::pin(self.inner.stream::<V>(capacity)),
        }
    }
}

impl Clone for Subscription {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            rt: Arc::clone(&self.rt),
        }
    }
}

/// Blocking iterator returned by [`Subscription::iter`].
pub struct StreamIter<V>
where
    V: GetParameterTuple + Send + 'static,
{
    rt: Arc<Runtime>,
    stream: Pin<Box<dyn Stream<Item = StreamResult<V>> + Send>>,
}

impl<V> Iterator for StreamIter<V>
where
    V: GetParameterTuple + Send + 'static,
{
    type Item = StreamResult<V>;

    fn next(&mut self) -> Option<Self::Item> {
        self.rt.block_on(self.stream.next())
    }
}