use std::time::Duration;
use anyhow::{Context, Result};
use donglora_client::{AnyTransport, RadioConfig, Response, Transport};
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::packet::{RadioPacket, snr_grade};
#[derive(Debug, Clone, Copy)]
pub enum ConfigSource {
Ours,
Mux,
}
pub enum RadioEvent {
Packet(RadioPacket),
Connected(RadioConfig, ConfigSource, String),
Disconnected,
}
pub fn spawn(config: RadioConfig, port: Option<String>) -> Result<(mpsc::Receiver<RadioEvent>, mpsc::Sender<Vec<u8>>)> {
let (event_tx, event_rx) = mpsc::channel::<RadioEvent>(256);
let (tx_send, tx_recv) = mpsc::channel::<Vec<u8>>(64);
std::thread::Builder::new()
.name("radio".into())
.spawn(move || radio_loop(config, port, event_tx, tx_recv))
.context("failed to spawn radio thread")?;
Ok((event_rx, tx_send))
}
#[allow(clippy::needless_pass_by_value)] fn radio_loop(
config: RadioConfig,
port: Option<String>,
event_tx: mpsc::Sender<RadioEvent>,
mut tx_recv: mpsc::Receiver<Vec<u8>>,
) {
let initial_backoff = Duration::from_millis(250);
let max_backoff = Duration::from_secs(5);
let mut backoff = initial_backoff;
loop {
let started = std::time::Instant::now();
match connect_and_run(&config, port.as_deref(), &event_tx, &mut tx_recv) {
Ok(()) => {
info!("radio thread exiting");
return;
}
Err(e) => {
error!("radio error: {e:#}");
let _ = event_tx.blocking_send(RadioEvent::Disconnected);
if started.elapsed() > Duration::from_secs(5) {
backoff = initial_backoff;
}
info!("reconnecting in {backoff:?}");
std::thread::sleep(backoff);
backoff = (backoff * 2).min(max_backoff);
}
}
}
}
fn connect_and_run(
config: &RadioConfig,
port: Option<&str>,
event_tx: &mpsc::Sender<RadioEvent>,
tx_recv: &mut mpsc::Receiver<Vec<u8>>,
) -> Result<()> {
let timeout = Duration::from_secs(10);
info!("connecting to dongle...");
let mut client = donglora_client::connect(port, timeout)?;
let is_mux = matches!(client.transport(), AnyTransport::Mux(_));
let device = if is_mux { "mux".to_string() } else { port.map_or_else(|| "usb".to_string(), shorten_path) };
info!("[radio] connected and validated: {device}");
info!("[radio] negotiating config...");
let (active_config, config_source) = match negotiate_config(&mut client, config, is_mux) {
Ok(result) => {
info!("[radio] config negotiated: source={:?}, config={}", result.1, format_radio_config(&result.0));
result
}
Err(e) => {
info!("[radio] config negotiation FAILED: {e:#}");
return Err(e);
}
};
info!("[radio] sending start_rx...");
match client.start_rx() {
Ok(()) => info!("[radio] start_rx OK"),
Err(e) => {
info!("[radio] start_rx FAILED: {e:#}");
return Err(e);
}
}
let _ = event_tx.blocking_send(RadioEvent::Connected(active_config, config_source, device));
client.transport_mut().set_timeout(Duration::from_millis(100))?;
info!("[radio] entering main loop");
let liveness_interval = Duration::from_secs(2);
let mut last_activity = std::time::Instant::now();
loop {
while let Ok(payload) = tx_recv.try_recv() {
debug!("TX {} bytes", payload.len());
if let Err(e) = client.transmit(&payload, None) {
warn!("transmit error: {e:#}");
return Err(e);
}
last_activity = std::time::Instant::now();
}
match client.recv() {
Ok(Some(Response::RxPacket { rssi, snr, payload })) => {
last_activity = std::time::Instant::now();
let grade = snr_grade(snr, active_config.sf);
if grade.should_forward() {
debug!("RX {} bytes rssi={rssi} snr={snr} grade={grade}", payload.len());
let pkt = RadioPacket { rssi, snr, payload };
if event_tx.blocking_send(RadioEvent::Packet(pkt)).is_err() {
return Ok(());
}
} else {
debug!("RX drop {} bytes rssi={rssi} snr={snr} grade={grade}", payload.len());
}
}
Ok(Some(_)) => {
last_activity = std::time::Instant::now();
}
Ok(None) => {}
Err(e) if is_timeout_error(&e) => {}
Err(e) => return Err(e),
}
if last_activity.elapsed() >= liveness_interval {
let _ = client.transport_mut().set_timeout(Duration::from_secs(2));
match client.ping() {
Ok(()) => {
last_activity = std::time::Instant::now();
}
Err(e) => {
warn!("liveness ping failed: {e:#}");
return Err(e);
}
}
let _ = client.transport_mut().set_timeout(Duration::from_millis(100));
}
if event_tx.is_closed() {
return Ok(());
}
}
}
fn negotiate_config<T: Transport>(
client: &mut donglora_client::Client<T>,
desired: &RadioConfig,
is_mux: bool,
) -> Result<(RadioConfig, ConfigSource)> {
info!("[negotiate] get_config...");
match client.get_config() {
Ok(cfg) => {
if configs_match(&cfg, desired) {
info!("[negotiate] get_config OK (matches desired): {}", format_radio_config(&cfg));
Ok((cfg, ConfigSource::Ours))
} else if is_mux {
info!("[negotiate] get_config OK (mux config differs, accepting): {}", format_radio_config(&cfg));
Ok((cfg, ConfigSource::Mux))
} else {
info!("[negotiate] get_config OK (serial config differs, setting ours)");
client.set_config(*desired)?;
info!("[negotiate] set_config OK");
Ok((*desired, ConfigSource::Ours))
}
}
Err(e) => {
info!("[negotiate] get_config FAILED: {e:#}");
info!("[negotiate] trying set_config...");
match client.set_config(*desired) {
Ok(()) => {
info!("[negotiate] set_config OK");
Ok((*desired, ConfigSource::Ours))
}
Err(e2) => {
info!("[negotiate] set_config FAILED: {e2:#}");
anyhow::bail!("GetConfig failed ({e:#}) and SetConfig also failed ({e2:#})");
}
}
}
}
}
fn shorten_path(path: &str) -> String {
std::path::Path::new(path).file_name().map_or_else(|| path.to_string(), |f| f.to_string_lossy().into_owned())
}
fn configs_match(a: &RadioConfig, b: &RadioConfig) -> bool {
a.freq_hz == b.freq_hz
&& a.bw == b.bw
&& a.sf == b.sf
&& a.cr == b.cr
&& a.sync_word == b.sync_word
&& a.tx_power_dbm == b.tx_power_dbm
&& a.preamble_len == b.preamble_len
&& a.cad == b.cad
}
fn is_timeout_error(e: &anyhow::Error) -> bool {
e.downcast_ref::<std::io::Error>()
.is_some_and(|io_err| matches!(io_err.kind(), std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut))
}
#[must_use]
pub const fn format_bandwidth(bw: donglora_client::Bandwidth) -> &'static str {
match bw {
donglora_client::Bandwidth::Khz7 => "7.8 kHz",
donglora_client::Bandwidth::Khz10 => "10.4 kHz",
donglora_client::Bandwidth::Khz15 => "15.6 kHz",
donglora_client::Bandwidth::Khz20 => "20.8 kHz",
donglora_client::Bandwidth::Khz31 => "31.25 kHz",
donglora_client::Bandwidth::Khz41 => "41.7 kHz",
donglora_client::Bandwidth::Khz62 => "62.5 kHz",
donglora_client::Bandwidth::Khz125 => "125 kHz",
donglora_client::Bandwidth::Khz250 => "250 kHz",
donglora_client::Bandwidth::Khz500 => "500 kHz",
}
}
#[must_use]
pub fn format_radio_config(config: &RadioConfig) -> String {
let freq_mhz = f64::from(config.freq_hz) / 1_000_000.0;
let bw = format_bandwidth(config.bw);
let power = if config.tx_power_dbm == donglora_client::TX_POWER_MAX {
"max".to_string()
} else {
format!("{} dBm", config.tx_power_dbm)
};
let preamble = if config.preamble_len == 0 { 16 } else { config.preamble_len };
let cad = if config.cad != 0 { "on" } else { "off" };
format!(
"{freq_mhz:.3}MHz SF{} BW{bw} CR4/{} SW0x{:04X} TX:{power} Pre:{preamble} CAD:{cad}",
config.sf, config.cr, config.sync_word
)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
fn default_config() -> RadioConfig {
RadioConfig {
freq_hz: 910_525_000,
bw: donglora_client::Bandwidth::Khz62,
sf: 7,
cr: 5,
sync_word: 0x3444,
tx_power_dbm: 22,
preamble_len: 16,
cad: 1,
}
}
#[test]
fn configs_match_identical() {
let a = default_config();
let b = default_config();
assert!(configs_match(&a, &b));
}
#[test]
fn configs_match_differs_each_field() {
let base = default_config();
let mut c = base;
c.freq_hz = 915_000_000;
assert!(!configs_match(&base, &c));
let mut c = base;
c.sf = 12;
assert!(!configs_match(&base, &c));
let mut c = base;
c.cr = 8;
assert!(!configs_match(&base, &c));
let mut c = base;
c.sync_word = 0x1234;
assert!(!configs_match(&base, &c));
let mut c = base;
c.tx_power_dbm = 10;
assert!(!configs_match(&base, &c));
let mut c = base;
c.preamble_len = 8;
assert!(!configs_match(&base, &c));
let mut c = base;
c.cad = 0;
assert!(!configs_match(&base, &c));
}
#[test]
fn is_timeout_would_block() {
let e = anyhow::Error::from(std::io::Error::new(std::io::ErrorKind::WouldBlock, "test"));
assert!(is_timeout_error(&e));
}
#[test]
fn is_timeout_timed_out() {
let e = anyhow::Error::from(std::io::Error::new(std::io::ErrorKind::TimedOut, "test"));
assert!(is_timeout_error(&e));
}
#[test]
fn is_timeout_false_for_other() {
let e = anyhow::Error::from(std::io::Error::new(std::io::ErrorKind::ConnectionReset, "test"));
assert!(!is_timeout_error(&e));
}
#[test]
fn shorten_path_extracts_filename() {
assert_eq!(shorten_path("/dev/ttyACM0"), "ttyACM0");
assert_eq!(shorten_path("ttyACM0"), "ttyACM0");
}
#[test]
fn format_bandwidth_all_variants() {
assert_eq!(format_bandwidth(donglora_client::Bandwidth::Khz7), "7.8 kHz");
assert_eq!(format_bandwidth(donglora_client::Bandwidth::Khz125), "125 kHz");
assert_eq!(format_bandwidth(donglora_client::Bandwidth::Khz500), "500 kHz");
}
#[test]
fn format_radio_config_typical() {
let cfg = default_config();
let s = format_radio_config(&cfg);
assert!(s.contains("910.525MHz"));
assert!(s.contains("SF7"));
assert!(s.contains("CR4/5"));
assert!(s.contains("0x3444"));
assert!(s.contains("CAD:on"), "cad=1 should show CAD:on: {s}");
}
#[test]
fn format_radio_config_cad_off() {
let mut cfg = default_config();
cfg.cad = 0;
let s = format_radio_config(&cfg);
assert!(s.contains("CAD:off"), "cad=0 should show CAD:off: {s}");
}
#[test]
fn format_radio_config_max_power() {
let mut cfg = default_config();
cfg.tx_power_dbm = donglora_client::TX_POWER_MAX;
let s = format_radio_config(&cfg);
assert!(s.contains("TX:max"));
assert!(!s.contains("dBm"), "max power should not contain 'dBm': {s}");
}
#[test]
fn format_radio_config_non_max_power() {
let mut cfg = default_config();
cfg.tx_power_dbm = 22;
assert_ne!(cfg.tx_power_dbm, donglora_client::TX_POWER_MAX);
let s = format_radio_config(&cfg);
assert!(s.contains("22 dBm"), "non-max power should contain '22 dBm': {s}");
assert!(!s.contains("max"), "non-max power should not contain 'max': {s}");
}
#[test]
fn format_radio_config_zero_preamble() {
let mut cfg = default_config();
cfg.preamble_len = 0;
let s = format_radio_config(&cfg);
assert!(s.contains("Pre:16"));
}
}