use futures::Stream;
use num_complex::Complex;
use soapysdr::{Args, Device, Direction, Error as SoapyError};
use crate::{Gain, GainElementName, error};
#[derive(Debug, Clone, PartialEq)]
pub struct SoapyConfig {
pub args: String,
pub center_freq: f64,
pub sample_rate: f64,
pub channel: usize,
pub gain: Gain,
pub bias_tee: bool,
}
impl SoapyConfig {
pub fn new(args: String, center_freq: f64, sample_rate: f64) -> Self {
Self {
args,
center_freq,
sample_rate,
channel: 0,
gain: Gain::Auto,
bias_tee: false,
}
}
}
pub struct SoapySdrReader {
stream: soapysdr::RxStream<Complex<i16>>,
buf: Vec<Complex<i16>>,
pos: usize,
end: usize,
}
impl SoapySdrReader {
pub fn new(config: &SoapyConfig) -> error::Result<Self> {
let device = Device::new(config.args.as_str())?;
device.set_frequency(Direction::Rx, config.channel, config.center_freq, ())?;
device.set_sample_rate(Direction::Rx, config.channel, config.sample_rate)?;
let supported = device.list_gains(Direction::Rx, config.channel)?;
match &config.gain {
Gain::Auto => {
device.set_gain_mode(Direction::Rx, config.channel, true)?;
}
Gain::Manual(value) => {
device.set_gain_mode(Direction::Rx, config.channel, false)?;
device.set_gain(Direction::Rx, config.channel, *value)?;
}
Gain::Elements(elements) => {
device.set_gain_mode(Direction::Rx, config.channel, false)?;
for elem in elements.iter() {
let gain_name = match &elem.name {
GainElementName::Tuner => "TUNER",
GainElementName::Lna => "LNA",
GainElementName::Mix => "MIX",
GainElementName::Vga => "VGA",
GainElementName::Pga => "PGA",
GainElementName::Custom(name) => name.as_str(),
};
if !supported.contains(&gain_name.to_string()) {
eprintln!(
"Warning: Gain element '{}' not supported by device (supported: {:?}), skipping...",
gain_name, supported
);
} else {
device.set_gain_element(
Direction::Rx,
config.channel,
gain_name,
elem.value_db,
)?;
}
}
}
}
if config.bias_tee {
let _ = device.write_setting("biastee", "true");
}
let mut stream = device.rx_stream::<Complex<i16>>(&[config.channel])?;
let mtu = stream.mtu()?;
stream.activate(None)?;
Ok(Self {
stream,
buf: vec![Complex::new(0, 0); mtu],
pos: 0,
end: 0,
})
}
}
impl Iterator for SoapySdrReader {
type Item = error::Result<Vec<Complex<f32>>>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.end {
match self.stream.read(&mut [&mut self.buf], 5_000_000) {
Ok(len) => {
if len == 0 {
return None; }
self.pos = 0;
self.end = len;
}
Err(e) => {
return Some(Err(e.into()));
}
}
}
let samples: Vec<Complex<f32>> = self.buf[self.pos..self.end]
.iter()
.map(|c| {
Complex::new(
c.re as f32 / (1 << 15) as f32,
c.im as f32 / (1 << 15) as f32,
)
})
.collect();
self.pos = self.end;
Some(Ok(samples))
}
}
pub struct AsyncSoapySdrReader {
rx: tokio::sync::mpsc::Receiver<error::Result<Vec<Complex<f32>>>>,
_handle: std::thread::JoinHandle<()>,
}
impl AsyncSoapySdrReader {
pub fn new(config: &SoapyConfig) -> error::Result<Self> {
let (tx, rx) = tokio::sync::mpsc::channel::<error::Result<Vec<Complex<f32>>>>(32);
let (tx_init, rx_init) = std::sync::mpsc::channel::<error::Result<()>>();
let cfg = config.clone();
let handle = std::thread::spawn(move || {
let init_res = (|| -> error::Result<(Device, soapysdr::RxStream<Complex<i16>>)> {
let device = Device::new(cfg.args.as_str())?;
device.set_frequency(Direction::Rx, cfg.channel, cfg.center_freq, ())?;
device.set_sample_rate(Direction::Rx, cfg.channel, cfg.sample_rate)?;
let supported = device.list_gains(Direction::Rx, cfg.channel)?;
match &cfg.gain {
Gain::Auto => {
device.set_gain_mode(Direction::Rx, cfg.channel, true)?;
}
Gain::Manual(value) => {
device.set_gain_mode(Direction::Rx, cfg.channel, false)?;
device.set_gain(Direction::Rx, cfg.channel, *value)?;
}
Gain::Elements(elements) => {
device.set_gain_mode(Direction::Rx, cfg.channel, false)?;
for elem in elements.iter() {
let gain_name = match &elem.name {
GainElementName::Tuner => "TUNER",
GainElementName::Lna => "LNA",
GainElementName::Mix => "MIX",
GainElementName::Vga => "VGA",
GainElementName::Pga => "PGA",
GainElementName::Custom(name) => name.as_str(),
};
if !supported.contains(&gain_name.to_string()) {
eprintln!(
"Warning: Gain element '{}' not supported by device (supported: {:?}), skipping...",
gain_name, supported
);
} else {
device.set_gain_element(
Direction::Rx,
cfg.channel,
gain_name,
elem.value_db,
)?;
}
}
}
}
if cfg.bias_tee {
let _ = device.write_setting("biastee", "true");
}
let mut stream = device.rx_stream::<Complex<i16>>(&[cfg.channel])?;
stream.activate(None)?;
Ok((device, stream))
})();
match init_res {
Ok((_device, mut stream)) => {
let _ = tx_init.send(Ok(()));
let mtu = stream.mtu().unwrap_or(16384);
let mut buffer = vec![Complex::new(0, 0); mtu];
loop {
match stream.read(&mut [&mut buffer], 5_000_000) {
Ok(len) => {
if len == 0 {
let _ = tx.blocking_send(Ok(Vec::new()));
return;
}
let samples: Vec<Complex<f32>> = buffer[..len]
.iter()
.map(|c| {
Complex::new(
c.re as f32 / (1 << 15) as f32,
c.im as f32 / (1 << 15) as f32,
)
})
.collect();
if tx.blocking_send(Ok(samples)).is_err() {
return;
}
}
Err(e) => {
let _ = tx.blocking_send(Err(e.into()));
return;
}
}
}
}
Err(e) => {
let _ = tx_init.send(Err(e));
}
}
});
match rx_init.recv() {
Ok(Ok(())) => Ok(Self {
rx,
_handle: handle,
}),
Ok(Err(e)) => Err(e),
Err(_) => Err(error::Error::device("Failed to initialize SoapySDR device")),
}
}
}
impl Stream for AsyncSoapySdrReader {
type Item = error::Result<Vec<Complex<f32>>>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = &mut *self;
match this.rx.poll_recv(cx) {
std::task::Poll::Ready(Some(item)) => std::task::Poll::Ready(Some(item)),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub fn enumerate_devices(args: &str) -> Result<Vec<Args>, SoapyError> {
soapysdr::enumerate(args)
}