lasprs 0.8.0

Library for Acoustic Signal Processing (Rust edition, with optional Python bindings via pyo3)
Documentation
//! All sources for a signal generator. Sine waves, sweeps, noise, etc.
use super::noise::{ColoredNoise, WhiteNoise};
use super::sweep::{SweepParams, SweepType};
use crate::config::*;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};

/// Dummy sampling frequency to be filled in when the sampling frequency is
/// still unknown at the point in time.
/// 
#[allow(dead_code)]
pub const DUMMY_SAMPLING_FREQ: Flt = 48000.;

/// Ratio between circumference and radius of a circle
const twopi: Flt = 2.0 * pi;
use crate::config::*;
use anyhow::{bail, Result};
use rand::prelude::*;

/// Signal source for a signal generator. A signal source is capable of creating
/// new signal data.
#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Clone, Debug)]
pub struct Source {
    src: Box<dyn SourceImpl>,
}
impl Source {
    /// Create a sine wave signal source
    ///
    /// # Args
    ///
    /// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
    ///   just fill in something sensible. If the [Siggen] runs in a
    ///   [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
    ///   sampling frequency.
    /// * `freq` - Frequency of the sine wave in \[Hz\]
    pub fn newSine(fs: Flt, freq: Flt) -> Result<Source> {
        Ok(Source {
            src: Box::new(Sine::new(fs, freq)?),
        })
    }
    /// Silence: create a signal source that does not output any dynamic
    /// signal at all.
    pub fn newSilence() -> Source {
        Source {
            src: Box::new(Silence {}),
        }
    }

    /// Create a white noise signal source
    ///
    /// # Args
    ///
    /// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
    ///   just fill in something sensible. If the [Siggen] runs in a
    ///   [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
    ///   sampling frequency.
    /// - `interrupt_period` - when given AND > 0, this turns on and off the
    ///   noise source with periods given by the value, in \[s\].
    pub fn newWhiteNoise(fs: Flt, interrupt_period: Option<Flt>) -> Source {
        Source {
            src: Box::new(WhiteNoise::new(fs, interrupt_period)),
        }
    }
    /// Create a pink noise signal source
    /// # Args
    ///
    /// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
    ///   just fill in something sensible. If the [Siggen] runs in a
    ///   [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
    ///   sampling frequency.
    /// - `interrupt_period` - when given AND > 0, this turns on and off the
    ///   noise source with periods given by the value, in \[s\].
    pub fn newPinkNoise(fs: Flt, interrupt_period: Option<Flt>) -> Source {
        Source {
            src: Box::new(ColoredNoise::newPinkNoise(fs, interrupt_period)),
        }
    }

    /// Sine sweep source
    ///
    /// # Args
    ///
    /// - `fs`: Sampling frequency \[Hz\]. When not known at the point in time,
    ///   just fill in something sensible. If the [Siggen] runs in a
    ///   [StreamMgr], the [StreamMgr] cals [Siggen::reset] to set the right
    ///   sampling frequency.
    /// - `fl` - Lower frequency \[Hz\]
    /// - `fu` - Upper frequency \[Hz\]
    /// - `sweep_time` - The duration of a single sweep \[s\]
    /// - `quiet_time` - Time of silence after one sweep and start of the next \[s\]
    /// - `sweep_type` - The type of the sweep, see [SweepType].
    pub fn newSweep(
        fs: Flt,
        fl: Flt,
        fu: Flt,
        sweep_time: Flt,
        quiet_time: Flt,
        sweep_type: SweepType,
    ) -> Result<Source> {
        Ok(Source {
            src: Box::new(Sweep::new(fs, fl, fu, sweep_time, quiet_time, sweep_type)?),
        })
    }
}

#[cfg(feature = "python-bindings")]
#[cfg_attr(feature = "python-bindings", pymethods)]
impl Source {
    #[staticmethod]
    #[pyo3(name = "newSine")]
    fn newSine_py(fs: Flt, freq: Flt) -> PyResult<Source> {
        Ok(Self::newSine(fs, freq)?)
    }
    #[pyo3(name = "newSilence")]
    #[staticmethod]
    fn newSilence_py() -> Source {
        Self::newSilence()
    }
    #[staticmethod]
    #[pyo3(name = "newWhiteNoise", signature=(interrupt_period=None))]
    fn newWhiteNoise_py(interrupt_period: Option<Flt>) -> Source {
        Self::newWhiteNoise(DUMMY_SAMPLING_FREQ, interrupt_period)
    }
    #[staticmethod]
    #[pyo3(name = "newPinkNoise", signature=(interrupt_period=None))]
    fn newPinkNoise_py(interrupt_period: Option<Flt>) -> Source {
        Self::newPinkNoise(DUMMY_SAMPLING_FREQ, interrupt_period)
    }
    #[staticmethod]
    #[pyo3(name = "newSweep")]
    fn newSweep_py(
        fs: Flt,
        fl: Flt,
        fu: Flt,
        sweep_time: Flt,
        quiet_time: Flt,
        sweep_type: SweepType,
    ) -> PyResult<Source> {
        Ok(Self::newSweep(
            fs, fl, fu, sweep_time, quiet_time, sweep_type,
        )?)
    }
}

#[derive(Clone, Debug)]
/// Silence source. Most simple one does only send out a 0.
struct Silence {}

impl SourceImpl for Silence {
    fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
        sig.for_each(|s| {
            *s = 0.0;
        });
    }
    fn reset(&mut self, _fs: Flt) {}
    fn clone_dyn(&self) -> Box<dyn SourceImpl> {
        Box::new(self.clone())
    }
}

/// Sine wave, with configurable frequency
#[derive(Clone, Debug)]
struct Sine {
    // Sampling freq \[Hz\]
    fs: Flt,
    // current stored phase
    phase: Flt,
    // Signal frequency \[rad/s\]
    omg: Flt,
}
impl Sine {
    /// Create new sine source signal
    ///
    /// Args:
    ///
    /// * fs: Sampling freq [Hz]
    /// *
    pub fn new(fs: Flt, freq: Flt) -> Result<Sine> {
        if fs <= 0. {
            bail!("Invalid sampling frequency");
        }
        if freq >= fs / 2. {
            bail!("Frequency of sine wave should be smaller than Nyquist frequency");
        }

        Ok(Sine {
            fs,
            phase: 0.0,
            omg: 2.0 * pi * freq,
        })
    }
}
impl SourceImpl for Sine {
    fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
        if self.fs <= 0.0 {
            sig.for_each(|s| {
                *s = 0.0;
            });
            return;
        }
        sig.for_each(|s| {
            *s = Flt::sin(self.phase);
            self.phase += self.omg / self.fs;
            self.phase %= twopi;
        });
    }
    fn reset(&mut self, fs: Flt) {
        self.fs = fs;
        self.phase = 0.0;
    }
    fn clone_dyn(&self) -> Box<dyn SourceImpl> {
        Box::new(self.clone())
    }
}

#[cfg_attr(feature = "python-bindings", pyclass)]
#[derive(Debug, Clone)]
struct Sweep {
    params: SweepParams,
    // Generated time-periodic buffer
    gen: Dcol,
    N: usize,
}
impl Sweep {
    fn new(
        fs: Flt,
        fl_: Flt,
        fu_: Flt,
        sweep_time: Flt,
        quiet_time: Flt,
        sweeptype: SweepType,
    ) -> Result<Self> {
        let params = SweepParams::new(fs, fl_, fu_, sweep_time, quiet_time, sweeptype)?;
        let gen = params.getSignal();

        Ok(Sweep { params, gen, N: 0 })
    }
}
// Linear forward or backward sweep phase
impl SourceImpl for Sweep {
    fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>) {
        let sweep_iter = self.gen.as_slice().unwrap().iter().cycle().skip(self.N);
        for (sig, sweep_sample) in sig.zip(sweep_iter) {
            *sig = *sweep_sample;
            self.N += 1;
        }
        // Modulo number of samples in generator
        self.N %= self.gen.len();
    }

    fn reset(&mut self, fs: Flt) {
        self.gen = self.params.reset(fs);
        self.N = 0;
    }

    fn clone_dyn(&self) -> Box<dyn SourceImpl> {
        Box::new(self.clone())
    }
}

impl Deref for Source {
    type Target = Box<dyn SourceImpl>;
    fn deref(&self) -> &Self::Target {
        &self.src
    }
}
impl DerefMut for Source {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.src
    }
}

/// Source for the signal generator. Implementations are sine waves, sweeps, noise.
pub trait SourceImpl: Send + Sync + Debug {
    /// Generate the 'pure' source signal. Output is placed inside the `sig` argument.
    fn genSignal_unscaled(&mut self, sig: &mut dyn ExactSizeIterator<Item = &mut Flt>);
    /// Reset the source state, i.e. set phase to 0, etc
    fn reset(&mut self, fs: Flt);
    /// Used to make the Siggen struct cloneable
    fn clone_dyn(&self) -> Box<dyn SourceImpl>;
}
impl Clone for Box<dyn SourceImpl> {
    fn clone(&self) -> Self {
        self.clone_dyn()
    }
}