use desperado::{IqAsyncSource, IqFormat, IqSource, Result};
use futures::stream::StreamExt;
use futures::Stream as FuturesStream;
use std::collections::VecDeque;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
#[cfg(any(feature = "rtlsdr", feature = "soapy", feature = "hackrf"))]
use desperado::DeviceConfig;
#[cfg(any(feature = "rtlsdr", feature = "soapy", feature = "airspy"))]
use desperado::Gain;
#[cfg(feature = "airspy")]
use desperado::airspy::{
AirspyConfig, AirspyGainMode, AsyncAirspySdrReader, DeviceSelector as AirspyDeviceSelector,
};
#[cfg(feature = "hackrf")]
use desperado::hackrf::HackRfConfig;
#[cfg(feature = "rtlsdr")]
use desperado::rtlsdr::{DeviceSelector, RtlSdrConfig};
#[cfg(feature = "soapy")]
use desperado::soapy::SoapyConfig;
use crate::dsp::ais::{AisDemodulatedMessage, AisDemodulator};
use crate::dsp::sample_rate::SampleRateAdapter;
const DEFAULT_CHUNK_SAMPLES: usize = 8192;
const AIS_FREQ: u32 = 162_000_000;
#[cfg(feature = "rtlsdr")]
const RTLSDR_GAIN: f64 = 49.6; #[cfg(feature = "soapy")]
const SOAPY_GAIN: f64 = 49.6;
pub struct AisIqSource {
source: IqSource,
demodulator: AisDemodulator,
adapter: SampleRateAdapter,
message_buffer: VecDeque<AisDemodulatedMessage>,
}
impl AisIqSource {
pub fn new(source: IqSource, sample_rate: u32) -> Self {
Self {
source,
demodulator: AisDemodulator::new(),
adapter: SampleRateAdapter::new(sample_rate),
message_buffer: VecDeque::new(),
}
}
pub fn from_file<P: AsRef<Path>>(path: P, sample_rate: u32, format: IqFormat) -> Result<Self> {
let source =
IqSource::from_file(path, AIS_FREQ, sample_rate, DEFAULT_CHUNK_SAMPLES, format)?;
Ok(Self::new(source, sample_rate))
}
pub fn from_tcp(addr: &str, port: u16, sample_rate: u32, format: IqFormat) -> Result<Self> {
let source = IqSource::from_tcp(
addr,
port,
AIS_FREQ,
sample_rate,
DEFAULT_CHUNK_SAMPLES,
format,
)?;
Ok(Self::new(source, sample_rate))
}
#[cfg(any(feature = "rtlsdr", feature = "soapy"))]
pub fn from_device_config(config: DeviceConfig, sample_rate: u32) -> Result<Self> {
let source = IqSource::from_device_config(config)?;
Ok(Self::new(source, sample_rate))
}
#[cfg(feature = "rtlsdr")]
pub fn from_rtlsdr(
device: DeviceSelector,
sample_rate: u32,
gain: Option<f64>,
bias_tee: bool,
) -> Result<Self> {
let rtlsdr_config = RtlSdrConfig {
device,
center_freq: AIS_FREQ,
sample_rate,
gain: gain.map(Gain::Manual).unwrap_or(Gain::Manual(RTLSDR_GAIN)),
bias_tee,
freq_correction_ppm: 0,
};
let config = DeviceConfig::RtlSdr(rtlsdr_config);
Self::from_device_config(config, sample_rate)
}
#[cfg(feature = "soapy")]
pub fn from_soapy(
args: &str,
channel: usize,
sample_rate: u32,
gain: Option<f64>,
bias_tee: bool,
) -> Result<Self> {
let soapy_config = SoapyConfig {
args: args.to_string(),
center_freq: AIS_FREQ as f64,
sample_rate: sample_rate as f64,
channel,
gain: gain.map(Gain::Manual).unwrap_or(Gain::Manual(SOAPY_GAIN)),
bias_tee,
};
let config = DeviceConfig::Soapy(soapy_config);
Self::from_device_config(config, sample_rate)
}
pub fn next_message(&mut self) -> Option<Result<AisDemodulatedMessage>> {
if let Some(msg) = self.message_buffer.pop_front() {
return Some(Ok(msg));
}
loop {
match self.source.next() {
Some(Ok(samples)) => {
let samples_96k = self.adapter.process(&samples);
let messages = self.demodulator.demodulate(&samples_96k);
self.message_buffer.extend(messages);
if let Some(msg) = self.message_buffer.pop_front() {
return Some(Ok(msg));
}
}
Some(Err(e)) => {
return Some(Err(e));
}
None => {
return None;
}
}
}
}
}
impl Iterator for AisIqSource {
type Item = Result<AisDemodulatedMessage>;
fn next(&mut self) -> Option<Self::Item> {
self.next_message()
}
}
pub struct AisAsyncIqSource {
pub handle: tokio::task::JoinHandle<()>,
rx: mpsc::Receiver<Result<AisDemodulatedMessage>>,
}
impl AisAsyncIqSource {
pub fn new(
handle: tokio::task::JoinHandle<()>,
rx: mpsc::Receiver<Result<AisDemodulatedMessage>>,
) -> Self {
Self { handle, rx }
}
pub fn from_file<P: AsRef<Path>>(
path: P,
sample_rate: u32,
format: IqFormat,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> {
let (tx, rx) = mpsc::channel::<Result<AisDemodulatedMessage>>(32);
async move {
let source = IqAsyncSource::from_file(
path,
AIS_FREQ,
sample_rate,
DEFAULT_CHUNK_SAMPLES,
format,
)
.await?;
let handle = spawn_demodulator_task(source, tx, sample_rate);
Ok(Self { handle, rx })
}
}
pub fn from_stdin(sample_rate: u32, format: IqFormat) -> AisAsyncIqSource {
let (tx, rx) = mpsc::channel::<Result<AisDemodulatedMessage>>(32);
let source =
IqAsyncSource::from_stdin(AIS_FREQ, sample_rate, DEFAULT_CHUNK_SAMPLES, format);
let handle = spawn_demodulator_task(source, tx, sample_rate);
AisAsyncIqSource { handle, rx }
}
pub fn from_tcp(
addr: &str,
port: u16,
sample_rate: u32,
format: IqFormat,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> + '_ {
let addr = addr.to_string();
let (tx, rx) = mpsc::channel::<Result<AisDemodulatedMessage>>(32);
async move {
let source = IqAsyncSource::from_tcp(
&addr,
port,
AIS_FREQ,
sample_rate,
DEFAULT_CHUNK_SAMPLES,
format,
)
.await?;
let handle = spawn_demodulator_task(source, tx, sample_rate);
Ok(AisAsyncIqSource { handle, rx })
}
}
#[cfg(any(feature = "rtlsdr", feature = "soapy", feature = "hackrf"))]
pub fn from_device_config(
config: DeviceConfig,
sample_rate: u32,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> {
let (tx, rx) = mpsc::channel::<Result<AisDemodulatedMessage>>(32);
async move {
let source = IqAsyncSource::from_device_config(&config).await?;
let handle = spawn_demodulator_task(source, tx, sample_rate);
Ok(AisAsyncIqSource { handle, rx })
}
}
#[cfg(feature = "rtlsdr")]
pub fn from_rtlsdr(
device: DeviceSelector,
sample_rate: u32,
gain: Gain,
bias_tee: bool,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> {
let rtlsdr_config = RtlSdrConfig {
device,
center_freq: AIS_FREQ,
sample_rate,
gain,
bias_tee,
freq_correction_ppm: 0,
};
let config = DeviceConfig::RtlSdr(rtlsdr_config);
Self::from_device_config(config, sample_rate)
}
#[cfg(feature = "soapy")]
pub fn from_soapy(
args: &str,
sample_rate: u32,
gain: Gain,
bias_tee: bool,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> {
let soapy_config = SoapyConfig {
args: args.to_string(),
center_freq: AIS_FREQ as f64,
sample_rate: sample_rate as f64,
channel: 0,
gain,
bias_tee,
};
let config = DeviceConfig::Soapy(soapy_config);
Self::from_device_config(config, sample_rate)
}
#[cfg(feature = "airspy")]
pub fn from_airspy(
device: AirspyDeviceSelector,
sample_rate: u32,
gain: Gain,
bias_tee: bool,
lna_gain: Option<u8>,
mixer_gain: Option<u8>,
vga_gain: Option<u8>,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> {
let airspy_config = AirspyConfig {
device,
center_freq: AIS_FREQ,
sample_rate,
gain,
bias_tee,
packing: false,
lna_gain,
mixer_gain,
vga_gain,
gain_mode: AirspyGainMode::default(),
};
let (tx, rx) = mpsc::channel::<Result<AisDemodulatedMessage>>(32);
async move {
let source = IqAsyncSource::Airspy(AsyncAirspySdrReader::new(&airspy_config)?);
let handle = spawn_demodulator_task(source, tx, sample_rate);
Ok(AisAsyncIqSource { handle, rx })
}
}
#[cfg(feature = "hackrf")]
pub fn from_hackrf(
device_index: usize,
sample_rate: u32,
gain: Gain,
amp_enable: bool,
bias_tee: bool,
) -> impl std::future::Future<Output = Result<AisAsyncIqSource>> {
let hackrf_config = HackRfConfig {
device_index,
center_freq: AIS_FREQ as u64,
sample_rate,
gain,
amp_enable,
bias_tee,
};
let config = DeviceConfig::HackRf(hackrf_config);
Self::from_device_config(config, sample_rate)
}
}
impl FuturesStream for AisAsyncIqSource {
type Item = Result<AisDemodulatedMessage>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
pub fn spawn_demodulator_task(
mut source: IqAsyncSource,
tx: mpsc::Sender<Result<AisDemodulatedMessage>>,
sample_rate: u32,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut adapter = SampleRateAdapter::new(sample_rate);
let mut demodulator = AisDemodulator::new();
loop {
match source.next().await {
Some(Ok(samples)) => {
let samples_96k = adapter.process(&samples);
let messages = demodulator.demodulate(&samples_96k);
for msg in messages {
if tx.send(Ok(msg)).await.is_err() {
return;
}
}
}
Some(Err(e)) => {
let _ = tx.send(Err(e)).await;
return;
}
None => return,
}
}
})
}