use futures::Stream;
use industrial_io::{Buffer, Channel};
use num_complex::Complex;
use pluto_sdr::pluto::{Pluto, RX};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use crate::{Gain, error};
const DEFAULT_BUFFER_SIZE: usize = 32768;
#[derive(Debug, Clone, PartialEq)]
pub struct PlutoConfig {
pub uri: String,
pub center_freq: i64,
pub sample_rate: i64,
pub gain: Gain,
}
impl PlutoConfig {
pub fn new(address: String, center_freq: i64, sample_rate: i64, gain: Gain) -> Self {
Self {
uri: address,
center_freq,
sample_rate,
gain,
}
}
}
pub struct PlutoSdrReader {
buffer: Buffer,
rx_i: Channel,
rx_q: Channel,
i_samples: Vec<i16>,
q_samples: Vec<i16>,
pos: usize,
}
impl PlutoSdrReader {
pub fn new(config: &PlutoConfig) -> error::Result<Self> {
let pluto = Pluto::connect(&config.uri).expect("Failed to connect to Pluto");
let _ = pluto.set_sampling_freq(config.sample_rate);
let _ = pluto.set_lo_rx(config.center_freq);
let bandwidth = config.sample_rate;
let _ = pluto.set_rf_bandwidth(bandwidth, RX);
match config.gain {
Gain::Manual(gain_db) => {
let _ = pluto.set_hwgain(gain_db, RX);
}
Gain::Auto => {
let _ = pluto.set_hwgain(40.0, RX);
}
Gain::Elements(_) => {
eprintln!(
"Warning: PlutoSDR does not support element-based gain control, using auto gain"
);
let _ = pluto.set_hwgain(40.0, RX);
}
}
let (rx_i, rx_q) = pluto.rx_ch0();
rx_i.enable();
rx_q.enable();
let buffer = pluto.create_buffer_rx(DEFAULT_BUFFER_SIZE).unwrap();
Ok(Self {
buffer,
rx_i,
rx_q,
i_samples: Vec::new(),
q_samples: Vec::new(),
pos: 0,
})
}
fn refill_buffer(&mut self) -> error::Result<()> {
self.buffer
.refill()
.map_err(|e| format!("Buffer refill failed: {:?}", e))?;
self.i_samples = self
.rx_i
.read::<i16>(&self.buffer)
.map_err(|e| format!("Failed to read I samples: {:?}", e))?;
self.q_samples = self
.rx_q
.read::<i16>(&self.buffer)
.map_err(|e| format!("Failed to read Q samples: {:?}", e))?;
self.pos = 0;
Ok(())
}
}
impl Iterator for PlutoSdrReader {
type Item = error::Result<Vec<Complex<f32>>>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.i_samples.len()
&& let Err(e) = self.refill_buffer()
{
return Some(Err(error::Error::PlutoSdr(e.to_string())));
}
if self.pos < self.i_samples.len() && self.pos < self.q_samples.len() {
let len = self.i_samples.len().min(self.q_samples.len()) - self.pos;
let mut samples = Vec::with_capacity(len);
for _ in 0..len {
let i = self.i_samples[self.pos];
let q = self.q_samples[self.pos];
self.pos += 1;
let i_norm = i as f32 / 2048.0;
let q_norm = q as f32 / 2048.0;
samples.push(Complex::new(i_norm, q_norm));
}
Some(Ok(samples))
} else {
None
}
}
}
pub struct AsyncPlutoSdrReader {
receiver: mpsc::Receiver<error::Result<Vec<Complex<f32>>>>,
_handle: tokio::task::JoinHandle<()>,
}
impl AsyncPlutoSdrReader {
pub async fn new(config: &PlutoConfig) -> error::Result<Self> {
let config = config.clone();
let (tx, rx) = mpsc::channel::<error::Result<Vec<Complex<f32>>>>(32);
let handle = tokio::task::spawn_blocking(move || {
let result = Self::run_pluto_rx(config, tx);
if let Err(e) = result {
eprintln!("Pluto SDR error: {}", e);
}
});
Ok(Self {
receiver: rx,
_handle: handle,
})
}
fn run_pluto_rx(
config: PlutoConfig,
tx: mpsc::Sender<error::Result<Vec<Complex<f32>>>>,
) -> error::Result<()> {
let pluto = Pluto::connect(&config.uri).expect("Failed to connect to Pluto");
let _ = pluto.set_sampling_freq(config.sample_rate);
let _ = pluto.set_lo_rx(config.center_freq);
let bandwidth = (config.sample_rate as f64 * 0.8) as i64;
let _ = pluto.set_rf_bandwidth(bandwidth, RX);
match config.gain {
Gain::Manual(gain_db) => {
let _ = pluto.set_hwgain(gain_db, RX);
}
Gain::Auto => {
let _ = pluto.set_hwgain(40.0, RX);
}
Gain::Elements(_) => {
eprintln!(
"Warning: PlutoSDR does not support element-based gain control, using auto gain"
);
let _ = pluto.set_hwgain(40.0, RX);
}
}
let (rx_i, rx_q) = pluto.rx_ch0();
rx_i.enable();
rx_q.enable();
let mut buffer = pluto
.create_buffer_rx(DEFAULT_BUFFER_SIZE)
.map_err(|e| format!("Failed to create buffer: {:?}", e))?;
loop {
if let Err(e) = buffer.refill() {
let _ = tx.blocking_send(Err(std::io::Error::other(format!(
"Buffer refill failed: {:?}",
e
))
.into()));
break;
}
let i_samples = rx_i
.read::<i16>(&buffer)
.map_err(|e| format!("Failed to read I samples: {:?}", e))
.unwrap_or_default();
let q_samples = rx_q
.read::<i16>(&buffer)
.map_err(|e| format!("Failed to read Q samples: {:?}", e))
.unwrap_or_default();
let mut samples = Vec::with_capacity(i_samples.len().min(q_samples.len()));
for (i, q) in i_samples.iter().zip(q_samples.iter()) {
let i_norm = *i as f32 / 2048.0; let q_norm = *q as f32 / 2048.0;
samples.push(Complex::new(i_norm, q_norm));
}
if tx.blocking_send(Ok(samples)).is_err() {
break;
}
}
Ok(())
}
}
impl Stream for AsyncPlutoSdrReader {
type Item = error::Result<Vec<Complex<f32>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}