futuresdr 0.0.41

An Experimental Async SDR Runtime for Heterogeneous Architectures.
Documentation
use crate::runtime::buffer::SendBufferWriter;
use crate::runtime::dev::prelude::*;
use cpal::BufferSize;
use cpal::Stream;
use cpal::StreamConfig;
use cpal::traits::DeviceTrait;
use cpal::traits::HostTrait;
use cpal::traits::StreamTrait;

/// Audio Source.
///
/// Captures `f32` audio samples from the default input device.
///
/// # Stream Inputs
///
/// No stream inputs.
///
/// # Stream Outputs
///
/// `output`: Interleaved audio samples.
///
/// # Usage
/// ```no_run
/// use futuresdr::blocks::audio::AudioSource;
///
/// let source = AudioSource::new(48_000, 2)?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[derive(Block)]
pub struct AudioSource<O = DefaultCpuWriter<f32>>
where
    O: CpuBufferWriter<Item = f32>,
{
    #[output]
    output: O,
    output_channels: u16,
    sample_rate: u32,
    channels: u16,
    stream: Option<Stream>,
    rx: Option<mpsc::Receiver<Vec<f32>>>,
    buff: Option<(Vec<f32>, usize)>,
}

// cpal::Stream is !Send
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<O> Send for AudioSource<O> where O: CpuBufferWriter<Item = f32> + SendBufferWriter {}

const QUEUE_SIZE: usize = 5;

impl AudioSource<DefaultCpuWriter<f32>> {
    /// Create AudioSource block with the default stream buffer.
    pub fn new(sample_rate: u32, channels: u16) -> Result<Self> {
        Self::with_buffer(sample_rate, channels)
    }
}

impl<O> AudioSource<O>
where
    O: CpuBufferWriter<Item = f32>,
{
    fn supports_config(sample_rate: u32, channels: u16) -> bool {
        let Some(device) = cpal::default_host().default_input_device() else {
            return false;
        };

        let Ok(configs) = device.supported_input_configs() else {
            return false;
        };

        configs.into_iter().any(|config| {
            config.channels() == channels
                && sample_rate >= config.min_sample_rate()
                && sample_rate <= config.max_sample_rate()
        })
    }

    /// Create AudioSource block with a custom stream buffer.
    pub fn with_buffer(sample_rate: u32, channels: u16) -> Result<Self> {
        let input_channels = if Self::supports_config(sample_rate, channels) {
            channels
        } else if channels == 1 && Self::supports_config(sample_rate, 2) {
            warn!(
                "audio source requested mono input at {} Hz, but only stereo is supported; using channel 0",
                sample_rate
            );
            2
        } else {
            return Err(Error::InvalidParameter.into());
        };

        Ok(AudioSource {
            output: O::default(),
            output_channels: channels,
            sample_rate,
            channels: input_channels,
            stream: None,
            rx: None,
            buff: None,
        })
    }
}

#[doc(hidden)]
impl<O> Kernel for AudioSource<O>
where
    O: CpuBufferWriter<Item = f32>,
{
    async fn init(&mut self, _mo: &mut MessageOutputs, _b: &mut BlockMeta) -> Result<()> {
        let device = cpal::default_host()
            .default_input_device()
            .expect("no input device available");

        let config = StreamConfig {
            channels: self.channels,
            sample_rate: self.sample_rate,
            buffer_size: BufferSize::Default,
        };

        let (tx, rx) = mpsc::channel(QUEUE_SIZE);

        let stream = device.build_input_stream(
            &config,
            move |data, _| {
                let data = data.to_owned();
                // Keep the audio callback non-blocking; if the queue is full, drop the newest
                // chunk instead of stalling the input thread.
                match tx.try_send(data) {
                    Ok(()) | Err(mpsc::TrySendError::Full(_)) => {}
                    Err(mpsc::TrySendError::Disconnected(_)) => {
                        panic!("audio source channel receiver dropped unexpectedly");
                    }
                }
            },
            move |err| {
                panic!("cpal stream error {err:?}");
            },
            None,
        )?;

        stream.play()?;

        self.rx = Some(rx);
        self.stream = Some(stream);

        Ok(())
    }

    async fn work(
        &mut self,
        io: &mut WorkIo,
        _mo: &mut MessageOutputs,
        _meta: &mut BlockMeta,
    ) -> Result<()> {
        if let Some((buff, mut full)) = self.buff.take() {
            let o = self.output.slice();
            if self.output_channels == 1 && self.channels == 2 {
                let n = std::cmp::min(o.len(), (buff.len() - full) / 2);

                for j in 0..n {
                    o[j] = buff[full + 2 * j];
                }

                full += 2 * n;
                self.output.produce(n);
            } else {
                let n = std::cmp::min(o.len(), buff.len() - full);

                for (i, v) in o.iter_mut().take(n).enumerate() {
                    *v = buff[full + i]
                }

                full += n;
                self.output.produce(n);
            }

            if buff.len() == full {
                io.call_again = true;
                self.buff = None;
            } else {
                self.buff = Some((buff, full));
            }
        } else if let Some(v) = self.rx.as_mut().unwrap().recv().await {
            io.call_again = true;
            self.buff = Some((v, 0));
        } else {
            io.finished = true;
        }

        Ok(())
    }
}