use futures::Stream;
use num_complex::Complex;
use rs_spy::{Airspy, AirspyGainStages, IqConverter, RECOMMENDED_BUFFER_SIZE};
use tracing::{debug, info, warn};
use crate::{Gain, GainElementName, error};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AirspyGainMode {
Sensitivity,
Linearity,
}
#[allow(clippy::derivable_impls)]
impl Default for AirspyGainMode {
fn default() -> Self {
AirspyGainMode::Sensitivity
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum DeviceSelector {
Index(usize),
Serial(u64),
}
impl Default for DeviceSelector {
fn default() -> Self {
DeviceSelector::Index(0)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct AirspyConfig {
pub device: DeviceSelector,
pub center_freq: u32,
pub sample_rate: u32,
pub gain: Gain,
pub bias_tee: bool,
pub packing: bool,
pub lna_gain: Option<u8>,
pub mixer_gain: Option<u8>,
pub vga_gain: Option<u8>,
pub gain_mode: AirspyGainMode,
}
impl AirspyConfig {
pub fn new(device_index: usize, center_freq: u32, sample_rate: u32, gain: Gain) -> Self {
Self {
device: DeviceSelector::Index(device_index),
center_freq,
sample_rate,
gain,
bias_tee: false,
packing: false,
lna_gain: None,
mixer_gain: None,
vga_gain: None,
gain_mode: AirspyGainMode::default(),
}
}
pub fn new_with_serial(serial: u64, center_freq: u32, sample_rate: u32, gain: Gain) -> Self {
Self {
device: DeviceSelector::Serial(serial),
center_freq,
sample_rate,
gain,
bias_tee: false,
packing: false,
lna_gain: None,
mixer_gain: None,
vga_gain: None,
gain_mode: AirspyGainMode::default(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct AirspyDeviceInfo {
pub index: usize,
pub board_id: u32,
pub firmware_version: String,
pub part_id: [u32; 2],
pub serial_number: u64,
pub supported_sample_rates: Vec<u32>,
}
impl AirspyDeviceInfo {
pub fn board_name(&self) -> &'static str {
match self.board_id {
0 => "AIRSPY",
1 => "AIRSPY MINI",
2 => "AIRSPY HF+",
_ => "AIRSPY (Unknown)",
}
}
}
pub fn list_devices() -> error::Result<Vec<AirspyDeviceInfo>> {
let device_strings = Airspy::list_devices()
.map_err(|e| error::Error::device(format!("Failed to enumerate Airspy devices: {}", e)))?;
let mut devices = Vec::new();
for (index, _) in device_strings.iter().enumerate() {
match Airspy::open_by_index(index) {
Ok(device) => {
let board_id = device.board_id().unwrap_or(0);
let firmware_version = device.version().unwrap_or_default();
let (part_id_1, part_id_2, serial_number) =
device.board_partid_serialno().unwrap_or((0, 0, 0));
let supported_sample_rates = device.supported_sample_rates().unwrap_or_default();
devices.push(AirspyDeviceInfo {
index,
board_id,
firmware_version,
part_id: [part_id_1, part_id_2],
serial_number,
supported_sample_rates,
});
}
Err(e) => {
warn!(index, error = %e, "Could not open Airspy device");
}
}
}
Ok(devices)
}
fn open_device_with_selector(selector: &DeviceSelector) -> error::Result<Airspy> {
match selector {
DeviceSelector::Index(idx) => {
Airspy::open_by_index(*idx).map_err(|e| error::Error::device(e.to_string()))
}
DeviceSelector::Serial(serial) => {
let devices = list_devices()?;
for dev in devices {
if dev.serial_number == *serial {
return Airspy::open_by_index(dev.index)
.map_err(|e| error::Error::device(e.to_string()));
}
}
Err(error::Error::device(format!(
"No Airspy device found with serial 0x{:016X}",
serial
)))
}
}
}
fn configure_gain(
device: &Airspy,
gain: &Gain,
gain_mode: AirspyGainMode,
lna: Option<u8>,
mixer: Option<u8>,
vga: Option<u8>,
) -> error::Result<()> {
if let Some(lna_val) = lna {
device
.set_lna_gain(lna_val.min(14))
.map_err(|e| error::Error::device(format!("Failed to set LNA gain: {}", e)))?;
}
if let Some(mixer_val) = mixer {
device
.set_mixer_gain(mixer_val.min(15))
.map_err(|e| error::Error::device(format!("Failed to set Mixer gain: {}", e)))?;
}
if let Some(vga_val) = vga {
device
.set_vga_gain(vga_val.min(15))
.map_err(|e| error::Error::device(format!("Failed to set VGA gain: {}", e)))?;
}
if lna.is_some() || mixer.is_some() || vga.is_some() {
device
.set_lna_agc(false)
.map_err(|e| error::Error::device(format!("Failed to disable LNA AGC: {}", e)))?;
device
.set_mixer_agc(false)
.map_err(|e| error::Error::device(format!("Failed to disable Mixer AGC: {}", e)))?;
return Ok(());
}
match gain {
Gain::Auto => {
device
.set_lna_agc(true)
.map_err(|e| error::Error::device(format!("Failed to enable LNA AGC: {}", e)))?;
device
.set_mixer_agc(true)
.map_err(|e| error::Error::device(format!("Failed to enable Mixer AGC: {}", e)))?;
device
.set_vga_gain(8)
.map_err(|e| error::Error::device(format!("Failed to set VGA gain: {}", e)))?;
}
Gain::Manual(db) => {
let level = ((*db / 50.0) * 21.0).clamp(0.0, 21.0) as u8;
match gain_mode {
AirspyGainMode::Linearity => {
device.set_linearity_gain(level).map_err(|e| {
error::Error::device(format!("Failed to set linearity gain: {}", e))
})?;
info!(level, "Airspy linearity gain set");
}
AirspyGainMode::Sensitivity => {
device.set_sensitivity_gain(level).map_err(|e| {
error::Error::device(format!("Failed to set sensitivity gain: {}", e))
})?;
info!(level, "Airspy sensitivity gain set");
}
}
}
Gain::Elements(elements) => {
device
.set_lna_agc(false)
.map_err(|e| error::Error::device(format!("Failed to disable LNA AGC: {}", e)))?;
device
.set_mixer_agc(false)
.map_err(|e| error::Error::device(format!("Failed to disable Mixer AGC: {}", e)))?;
for element in elements {
match &element.name {
GainElementName::Lna => {
let gain = (element.value_db as u8).min(14);
device.set_lna_gain(gain).map_err(|e| {
error::Error::device(format!("Failed to set LNA gain: {}", e))
})?;
}
GainElementName::Mix => {
let gain = (element.value_db as u8).min(15);
device.set_mixer_gain(gain).map_err(|e| {
error::Error::device(format!("Failed to set Mixer gain: {}", e))
})?;
}
GainElementName::Vga => {
let gain = (element.value_db as u8).min(15);
device.set_vga_gain(gain).map_err(|e| {
error::Error::device(format!("Failed to set VGA gain: {}", e))
})?;
}
other => {
warn!(element = ?other, "Airspy does not support gain element, ignoring");
}
}
}
}
}
Ok(())
}
pub struct AirspySdrReader {
device: Airspy,
buf: Vec<u8>,
float_buf: Vec<f32>,
iq_converter: IqConverter,
packing: bool,
}
fn convert_airspy_real_to_f32(bytes: &[u8], out: &mut [f32]) -> usize {
let num_samples = bytes.len() / 2;
for (i, chunk) in bytes.chunks_exact(2).enumerate() {
let raw = u16::from_le_bytes([chunk[0], chunk[1]]);
out[i] = (raw as f32 - 2048.0) / 2048.0;
}
num_samples
}
impl AirspySdrReader {
pub fn new(config: &AirspyConfig) -> error::Result<Self> {
let device = open_device_with_selector(&config.device)?;
device
.set_freq(config.center_freq)
.map_err(|e| error::Error::device(format!("Failed to set frequency: {}", e)))?;
configure_gain(
&device,
&config.gain,
config.gain_mode,
config.lna_gain,
config.mixer_gain,
config.vga_gain,
)?;
device
.set_rf_bias(config.bias_tee)
.map_err(|e| error::Error::device(format!("Failed to set RF bias: {}", e)))?;
device
.set_packing(config.packing)
.map_err(|e| error::Error::device(format!("Failed to set packing: {}", e)))?;
let mut sample_rate_set = false;
match device.set_sample_rate_for_iq(config.sample_rate) {
Ok(()) => sample_rate_set = true,
Err(_e) => {
}
}
device
.start_rx()
.map_err(|e| error::Error::device(format!("Failed to start RX: {}", e)))?;
if !sample_rate_set {
match device.set_sample_rate_for_iq(config.sample_rate) {
Ok(()) => {}
Err(e) => {
let supported = device.supported_sample_rates().unwrap_or_default();
return Err(error::Error::device(format!(
"Failed to set sample rate after start_rx: {}. \
Supported rates: {:?}. \
Requested IQ: {} Hz",
e, supported, config.sample_rate
)));
}
}
}
let max_samples = RECOMMENDED_BUFFER_SIZE / 2;
Ok(Self {
device,
buf: vec![0u8; RECOMMENDED_BUFFER_SIZE],
float_buf: vec![0.0f32; max_samples],
iq_converter: IqConverter::new(),
packing: config.packing,
})
}
pub fn device_info(&self) -> error::Result<AirspyDeviceInfo> {
let board_id = self
.device
.board_id()
.map_err(|e| error::Error::device(e.to_string()))?;
let firmware_version = self
.device
.version()
.map_err(|e| error::Error::device(e.to_string()))?;
let (part_id_1, part_id_2, serial_number) = self
.device
.board_partid_serialno()
.map_err(|e| error::Error::device(e.to_string()))?;
let supported_sample_rates = self
.device
.supported_sample_rates()
.map_err(|e| error::Error::device(e.to_string()))?;
Ok(AirspyDeviceInfo {
index: 0, board_id,
firmware_version,
part_id: [part_id_1, part_id_2],
serial_number,
supported_sample_rates,
})
}
}
impl Iterator for AirspySdrReader {
type Item = error::Result<Vec<Complex<f32>>>;
fn next(&mut self) -> Option<Self::Item> {
match self.device.read_sync(&mut self.buf) {
Ok(bytes_read) => {
if bytes_read == 0 {
return None; }
if self.packing {
warn!("12-bit unpacking not yet implemented, data may be incorrect");
}
let num_samples =
convert_airspy_real_to_f32(&self.buf[..bytes_read], &mut self.float_buf);
let samples = self
.iq_converter
.process_to_complex(&mut self.float_buf[..num_samples]);
Some(Ok(samples))
}
Err(e) => Some(Err(error::Error::device(format!("Read error: {}", e)))),
}
}
}
impl Drop for AirspySdrReader {
fn drop(&mut self) {
let _ = self.device.stop_rx();
}
}
#[derive(Debug, Clone)]
pub enum AirspyMessage {
Frequency(u32),
Gain(Gain),
}
fn gain_to_stages(
gain: &Gain,
gain_mode: AirspyGainMode,
lna: Option<u8>,
mixer: Option<u8>,
vga: Option<u8>,
) -> AirspyGainStages {
match gain {
Gain::Auto => AirspyGainStages {
lna_agc: true,
mixer_agc: true,
lna: 0,
mixer: 0,
vga: vga.unwrap_or(15),
},
Gain::Manual(db) => {
let level = ((*db as u32 * 21) / 50).min(21) as u8;
let idx = (21 - level) as usize;
match gain_mode {
AirspyGainMode::Linearity => {
const LINEARITY_VGA: [u8; 22] = [
13, 12, 11, 11, 11, 11, 11, 10, 10, 10, 10, 10, 10, 10, 10, 10, 9, 8, 7, 6,
5, 4,
];
const LINEARITY_MIXER: [u8; 22] = [
12, 12, 11, 9, 8, 7, 6, 6, 5, 0, 0, 1, 0, 0, 2, 2, 1, 1, 1, 1, 0, 0,
];
const LINEARITY_LNA: [u8; 22] = [
14, 14, 14, 13, 12, 10, 9, 9, 8, 9, 8, 6, 5, 3, 1, 0, 0, 0, 0, 0, 0, 0,
];
AirspyGainStages {
lna_agc: false,
mixer_agc: false,
lna: LINEARITY_LNA[idx],
mixer: LINEARITY_MIXER[idx],
vga: LINEARITY_VGA[idx],
}
}
AirspyGainMode::Sensitivity => {
const SENSITIVITY_VGA: [u8; 22] = [
13, 12, 11, 10, 9, 8, 7, 6, 5, 5, 5, 5, 5, 4, 4, 4, 4, 4, 4, 4, 4, 4,
];
const SENSITIVITY_MIXER: [u8; 22] = [
12, 12, 12, 12, 11, 10, 10, 9, 9, 8, 7, 4, 4, 4, 3, 2, 2, 1, 0, 0, 0, 0,
];
const SENSITIVITY_LNA: [u8; 22] = [
14, 14, 14, 14, 14, 14, 14, 14, 14, 13, 12, 12, 9, 9, 8, 7, 6, 5, 3, 2, 1,
0,
];
AirspyGainStages {
lna_agc: false,
mixer_agc: false,
lna: SENSITIVITY_LNA[idx],
mixer: SENSITIVITY_MIXER[idx],
vga: SENSITIVITY_VGA[idx],
}
}
}
}
Gain::Elements(elements) => {
let mut s = AirspyGainStages {
lna_agc: false,
mixer_agc: false,
lna: lna.unwrap_or(14),
mixer: mixer.unwrap_or(12),
vga: vga.unwrap_or(13),
};
for elem in elements {
match &elem.name {
GainElementName::Lna => s.lna = (elem.value_db as u8).min(14),
GainElementName::Mix => s.mixer = (elem.value_db as u8).min(15),
GainElementName::Vga => s.vga = (elem.value_db as u8).min(15),
_ => {}
}
}
s
}
}
}
pub struct AsyncAirspySdrReader {
rx: tokio::sync::mpsc::Receiver<error::Result<Vec<Complex<f32>>>>,
ctrl: rs_spy::transport::AsyncReadControlHandle,
gain_mode: AirspyGainMode,
_handle: std::thread::JoinHandle<()>,
}
impl AsyncAirspySdrReader {
pub fn new(config: &AirspyConfig) -> error::Result<Self> {
let (tx, rx) = tokio::sync::mpsc::channel::<error::Result<Vec<Complex<f32>>>>(32);
let cfg = config.clone();
let device = open_device_with_selector(&cfg.device)?;
if let Ok(rates) = device.supported_sample_rates() {
info!(rates = ?rates, "Airspy supported sample rates");
}
if let Ok((_, _, sn)) = device.board_partid_serialno() {
info!(serial = format!("0x{:016X}", sn), "Airspy serial number");
}
device
.set_freq(cfg.center_freq)
.map_err(|e| error::Error::device(format!("Failed to set frequency: {}", e)))?;
configure_gain(
&device,
&cfg.gain,
cfg.gain_mode,
cfg.lna_gain,
cfg.mixer_gain,
cfg.vga_gain,
)?;
device
.set_rf_bias(cfg.bias_tee)
.map_err(|e| error::Error::device(format!("Failed to set RF bias: {}", e)))?;
device
.set_packing(cfg.packing)
.map_err(|e| error::Error::device(format!("Failed to set packing: {}", e)))?;
let mut sample_rate_set = false;
if device.set_sample_rate_for_iq(cfg.sample_rate).is_ok() {
sample_rate_set = true;
}
device
.start_rx()
.map_err(|e| error::Error::device(format!("Failed to start RX: {}", e)))?;
if !sample_rate_set {
device
.set_sample_rate_for_iq(cfg.sample_rate)
.map_err(|e| error::Error::device(format!("Failed to set sample rate: {}", e)))?;
}
let async_reader = device
.into_multi_transfer_reader(15, RECOMMENDED_BUFFER_SIZE)
.map_err(|e| error::Error::device(format!("Failed to start async reader: {}", e)))?;
let ctrl = async_reader.control_handle();
let handle = std::thread::spawn(move || {
let reader = async_reader;
let max_samples = RECOMMENDED_BUFFER_SIZE / 2;
let mut float_buf = vec![0.0f32; max_samples];
let mut iq_converter = IqConverter::new();
let mut chunk_count = 0usize;
while let Some(chunk_res) = reader.recv() {
match chunk_res {
Ok(bytes) => {
if bytes.is_empty() {
continue;
}
if cfg.packing {
warn!("12-bit unpacking not yet implemented");
}
let num_samples = convert_airspy_real_to_f32(&bytes, &mut float_buf);
let samples =
iq_converter.process_to_complex(&mut float_buf[..num_samples]);
chunk_count += 1;
if chunk_count <= 5 {
debug!(
chunk = chunk_count,
bytes = bytes.len(),
num_samples,
iq_samples = samples.len(),
"Airspy sending chunk"
);
}
if tx.blocking_send(Ok(samples)).is_err() {
debug!("Airspy channel closed, exiting");
break;
}
}
Err(e) => {
let _ = tx
.blocking_send(Err(error::Error::device(format!("Read error: {}", e))));
break;
}
}
}
debug!(chunks = chunk_count, "Airspy background thread exiting");
});
Ok(Self {
rx,
ctrl,
gain_mode: cfg.gain_mode,
_handle: handle,
})
}
pub fn tune(&self, center_freq: u32) -> error::Result<()> {
self.adjust(AirspyMessage::Frequency(center_freq))
}
pub fn adjust(&self, message: AirspyMessage) -> error::Result<()> {
match message {
AirspyMessage::Frequency(freq) => self
.ctrl
.tune(freq)
.map_err(|e| error::Error::device(format!("Airspy async tune failed: {}", e))),
AirspyMessage::Gain(gain) => {
let stages = gain_to_stages(&gain, self.gain_mode, None, None, None);
self.ctrl.set_gain(stages).map_err(|e| {
error::Error::device(format!("Airspy async set_gain failed: {}", e))
})
}
}
}
}
impl Stream for AsyncAirspySdrReader {
type Item = error::Result<Vec<Complex<f32>>>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.rx.poll_recv(cx) {
std::task::Poll::Ready(Some(item)) => std::task::Poll::Ready(Some(item)),
std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
pub fn get_first_device_index() -> usize {
0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_device_selector_default() {
assert_eq!(DeviceSelector::default(), DeviceSelector::Index(0));
}
#[test]
fn test_airspy_config_new() {
let config = AirspyConfig::new(0, 100_000_000, 6_000_000, Gain::Auto);
assert_eq!(config.device, DeviceSelector::Index(0));
assert_eq!(config.center_freq, 100_000_000);
assert_eq!(config.sample_rate, 6_000_000);
assert_eq!(config.gain, Gain::Auto);
assert!(!config.bias_tee);
assert!(!config.packing);
}
#[test]
fn test_airspy_config_with_serial() {
let config =
AirspyConfig::new_with_serial(0x35AC63DC2D8C7A4F, 100_000_000, 6_000_000, Gain::Auto);
assert_eq!(config.device, DeviceSelector::Serial(0x35AC63DC2D8C7A4F));
}
#[test]
fn test_device_info_board_name() {
let info = AirspyDeviceInfo {
index: 0,
board_id: 1,
firmware_version: "test".to_string(),
part_id: [0, 0],
serial_number: 0,
supported_sample_rates: vec![],
};
assert_eq!(info.board_name(), "AIRSPY MINI");
}
}