futuresdr 0.0.41

An Experimental Async SDR Runtime for Heterogeneous Architectures.
Documentation
use std::pin::Pin;

use crate::runtime::Timer;
use crate::runtime::dev::prelude::*;
use web_time::Instant;

/// Limit sample rate.
///
/// # Stream Inputs
///
/// `input`: Input samples.
///
/// # Stream Outputs
///
/// `output`: Rate-limited output samples.
///
/// # Usage
/// ```
/// use futuresdr::blocks::Throttle;
///
/// let throttle = Throttle::<u8>::new(1_000_000.0);
/// ```
#[derive(Block)]
pub struct Throttle<
    T: Copy + Send + 'static,
    I: CpuBufferReader<Item = T> = DefaultCpuReader<T>,
    O: CpuBufferWriter<Item = T> = DefaultCpuWriter<T>,
> {
    #[input]
    input: I,
    #[output]
    output: O,
    rate: f64,
    t_init: Instant,
    n_items: usize,
    timer: Option<Timer>,
}

impl<T, I, O> Throttle<T, I, O>
where
    T: Copy + Send + 'static,
    I: CpuBufferReader<Item = T>,
    O: CpuBufferWriter<Item = T>,
{
    /// Creates a new Throttle block which will throttle to the specified rate.
    pub fn new(rate: f64) -> Self {
        Self {
            input: I::default(),
            output: O::default(),
            rate,
            t_init: Instant::now(),
            n_items: 0,
            timer: None,
        }
    }
}

#[cfg(not(target_arch = "wasm32"))]
#[doc(hidden)]
impl<T, I, O> Kernel for Throttle<T, I, O>
where
    T: Copy + Send + 'static,
    I: CpuBufferReader<Item = T>,
    O: CpuBufferWriter<Item = T>,
{
    type BlockOn = Timer;

    fn block_on(&mut self) -> Option<Pin<&mut Timer>> {
        self.timer.as_mut().map(Pin::new)
    }

    async fn work(
        &mut self,
        io: &mut WorkIo,
        _mo: &mut MessageOutputs,
        _meta: &mut BlockMeta,
    ) -> Result<()> {
        let i = self.input.slice();
        let o = self.output.slice();
        let i_len = i.len();

        let now = Instant::now();
        let target_items = (now - self.t_init).as_secs_f64() * self.rate;
        let target_items = target_items.floor() as usize;
        let remaining_items = target_items - self.n_items;

        let m = *[remaining_items, i.len(), o.len()]
            .iter()
            .min()
            .unwrap_or(&0);

        if m != 0 {
            o[..m].copy_from_slice(&i[..m]);
            self.n_items += m;
            self.input.consume(m);
            self.output.produce(m);
        }

        if self.input.finished() && i_len == m {
            io.finished = true;
        }

        self.timer = Some(Timer::after(std::time::Duration::from_millis(100)));
        io.block_on();

        Ok(())
    }

    async fn init(&mut self, _mo: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
        self.t_init = Instant::now();
        self.n_items = 0;
        Ok(())
    }
}

#[cfg(target_arch = "wasm32")]
#[doc(hidden)]
impl<T, I, O> Kernel for Throttle<T, I, O>
where
    T: Copy + Send + 'static,
    I: CpuBufferReader<Item = T>,
    O: CpuBufferWriter<Item = T>,
{
    type BlockOn = Timer;

    fn block_on(&mut self) -> Option<Pin<&mut Timer>> {
        self.timer.as_mut().map(Pin::new)
    }

    async fn work(
        &mut self,
        io: &mut WorkIo,
        _mo: &mut MessageOutputs,
        _meta: &mut BlockMeta,
    ) -> Result<()> {
        let i = self.input.slice();
        let o = self.output.slice();
        let i_len = i.len();

        let now = Instant::now();
        let target_items = (now - self.t_init).as_secs_f64() * self.rate;
        let target_items = target_items.floor() as usize;
        let remaining_items = target_items - self.n_items;

        let m = *[remaining_items, i.len(), o.len()]
            .iter()
            .min()
            .unwrap_or(&0);

        if m != 0 {
            o[..m].copy_from_slice(&i[..m]);
            self.n_items += m;
            self.input.consume(m);
            self.output.produce(m);
        }

        if self.input.finished() && i_len == m {
            io.finished = true;
        }

        self.timer = Some(Timer::after(std::time::Duration::from_millis(100)));
        io.block_on();

        Ok(())
    }

    async fn init(&mut self, _mo: &mut MessageOutputs, _meta: &mut BlockMeta) -> Result<()> {
        self.t_init = Instant::now();
        self.n_items = 0;
        Ok(())
    }
}