use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use donglora_protocol::{Info, LoRaConfig, Modulation};
use tracing::{debug, info};
use crate::discovery;
use crate::dongle::{Dongle, TransportKind};
use crate::errors::{ClientError, ClientResult};
use crate::session::Session;
#[cfg(unix)]
use crate::transport::UnixSocketTransport;
use crate::transport::{AnyTransport, SerialTransport, TcpTransport, Transport};
static USED_MUX: AtomicBool = AtomicBool::new(false);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, Default)]
pub struct ConnectOptions {
port: Option<String>,
timeout: Option<Duration>,
config: Option<Modulation>,
auto_configure: bool,
keepalive: bool,
}
impl ConnectOptions {
#[must_use]
pub fn port(mut self, path: impl Into<String>) -> Self {
self.port = Some(path.into());
self
}
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
#[must_use]
pub fn config(mut self, modulation: Modulation) -> Self {
self.config = Some(modulation);
self.auto_configure = true;
self
}
#[must_use]
pub fn auto_configure(mut self, enabled: bool) -> Self {
self.auto_configure = enabled;
self
}
#[must_use]
pub fn keepalive(mut self, enabled: bool) -> Self {
self.keepalive = enabled;
self
}
}
impl ConnectOptions {
#[must_use]
pub fn new() -> Self {
Self { keepalive: true, ..Self::default() }
}
}
#[must_use]
pub fn default_socket_path() -> String {
if let Ok(env) = std::env::var("DONGLORA_MUX") {
return env;
}
if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
return format!("{xdg}/donglora/mux.sock");
}
"/tmp/donglora-mux.sock".to_string()
}
#[must_use]
pub fn find_mux_socket() -> Option<String> {
if let Ok(env) = std::env::var("DONGLORA_MUX") {
if Path::new(&env).exists() {
return Some(env);
}
return None;
}
if let Ok(xdg) = std::env::var("XDG_RUNTIME_DIR") {
let p = format!("{xdg}/donglora/mux.sock");
if Path::new(&p).exists() {
return Some(p);
}
}
let p = "/tmp/donglora-mux.sock";
if Path::new(p).exists() {
return Some(p.to_string());
}
None
}
pub async fn connect() -> ClientResult<Dongle> {
connect_with(ConnectOptions::new()).await
}
pub async fn connect_with(opts: ConnectOptions) -> ClientResult<Dongle> {
let timeout = opts.timeout.unwrap_or(DEFAULT_TIMEOUT);
if let Some(port) = opts.port.as_deref() {
debug!("opening serial port {port}");
let transport = SerialTransport::open(port)?;
return finalize(AnyTransport::Serial(transport), TransportKind::Serial(port.to_string()), &opts, timeout)
.await;
}
if USED_MUX.load(Ordering::Relaxed) {
return connect_mux_sticky(&opts, timeout).await;
}
if let Some((transport, endpoint)) = try_tcp_env(timeout).await {
USED_MUX.store(true, Ordering::Relaxed);
return finalize(AnyTransport::Tcp(transport), TransportKind::MuxTcp(endpoint), &opts, timeout).await;
}
#[cfg(unix)]
if let Some(path) = find_mux_socket() {
debug!("connecting to Unix mux at {path}");
let transport = UnixSocketTransport::connect(&path).await?;
USED_MUX.store(true, Ordering::Relaxed);
return finalize(AnyTransport::Unix(transport), TransportKind::MuxUnix(path), &opts, timeout).await;
}
let port = match discovery::find_port() {
Some(p) => p,
None => discovery::wait_for_device().await,
};
debug!("opening serial port {port}");
let transport = SerialTransport::open(&port)?;
finalize(AnyTransport::Serial(transport), TransportKind::Serial(port), &opts, timeout).await
}
pub async fn try_connect() -> ClientResult<Dongle> {
try_connect_with(ConnectOptions::new()).await
}
pub async fn try_connect_with(opts: ConnectOptions) -> ClientResult<Dongle> {
let timeout = opts.timeout.unwrap_or(DEFAULT_TIMEOUT);
if let Some(port) = opts.port.as_deref() {
let transport = SerialTransport::open(port)?;
return finalize(AnyTransport::Serial(transport), TransportKind::Serial(port.to_string()), &opts, timeout)
.await;
}
if USED_MUX.load(Ordering::Relaxed) {
let path =
find_mux_socket().ok_or_else(|| ClientError::Other("mux not available (waiting for restart)".into()))?;
#[cfg(unix)]
{
let transport = UnixSocketTransport::connect(&path).await?;
return finalize(AnyTransport::Unix(transport), TransportKind::MuxUnix(path), &opts, timeout).await;
}
#[cfg(not(unix))]
{
let _ = path;
return Err(ClientError::Other("Unix mux requires a unix target".into()));
}
}
if let Some((transport, endpoint)) = try_tcp_env(timeout).await {
USED_MUX.store(true, Ordering::Relaxed);
return finalize(AnyTransport::Tcp(transport), TransportKind::MuxTcp(endpoint), &opts, timeout).await;
}
#[cfg(unix)]
if let Some(path) = find_mux_socket() {
let transport = UnixSocketTransport::connect(&path).await?;
USED_MUX.store(true, Ordering::Relaxed);
return finalize(AnyTransport::Unix(transport), TransportKind::MuxUnix(path), &opts, timeout).await;
}
let port = discovery::find_port()
.ok_or_else(|| ClientError::Other("no DongLoRa device found (no mux, no USB device)".into()))?;
let transport = SerialTransport::open(&port)?;
finalize(AnyTransport::Serial(transport), TransportKind::Serial(port), &opts, timeout).await
}
pub async fn connect_mux_auto() -> ClientResult<Dongle> {
connect_mux_auto_with(ConnectOptions::new()).await
}
pub async fn connect_mux_auto_with(opts: ConnectOptions) -> ClientResult<Dongle> {
let timeout = opts.timeout.unwrap_or(DEFAULT_TIMEOUT);
if let Some((transport, endpoint)) = try_tcp_env(timeout).await {
USED_MUX.store(true, Ordering::Relaxed);
return finalize(AnyTransport::Tcp(transport), TransportKind::MuxTcp(endpoint), &opts, timeout).await;
}
#[cfg(unix)]
{
let path = find_mux_socket().ok_or_else(|| ClientError::Other("no mux socket found".into()))?;
let transport = UnixSocketTransport::connect(&path).await?;
USED_MUX.store(true, Ordering::Relaxed);
finalize(AnyTransport::Unix(transport), TransportKind::MuxUnix(path), &opts, timeout).await
}
#[cfg(not(unix))]
Err(ClientError::Other("mux-only mode requires Unix socket support or DONGLORA_MUX_TCP".into()))
}
#[cfg(unix)]
pub async fn mux_unix_connect(path: &str) -> ClientResult<Dongle> {
let transport = UnixSocketTransport::connect(path).await?;
USED_MUX.store(true, Ordering::Relaxed);
finalize(
AnyTransport::Unix(transport),
TransportKind::MuxUnix(path.to_string()),
&ConnectOptions::new(),
DEFAULT_TIMEOUT,
)
.await
}
pub async fn mux_tcp_connect(host: &str, port: u16) -> ClientResult<Dongle> {
let transport = TcpTransport::connect(host, port, DEFAULT_TIMEOUT).await?;
USED_MUX.store(true, Ordering::Relaxed);
finalize(
AnyTransport::Tcp(transport),
TransportKind::MuxTcp(format!("{host}:{port}")),
&ConnectOptions::new(),
DEFAULT_TIMEOUT,
)
.await
}
async fn connect_mux_sticky(opts: &ConnectOptions, timeout: Duration) -> ClientResult<Dongle> {
if let Some((transport, endpoint)) = try_tcp_env(timeout).await {
return finalize(AnyTransport::Tcp(transport), TransportKind::MuxTcp(endpoint), opts, timeout).await;
}
#[cfg(unix)]
{
let path = default_socket_path();
let mut warned = false;
loop {
if Path::new(&path).exists() {
let transport = UnixSocketTransport::connect(&path).await?;
return finalize(AnyTransport::Unix(transport), TransportKind::MuxUnix(path), opts, timeout).await;
}
if !warned {
info!("waiting for mux at {path} ...");
warned = true;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
#[cfg(not(unix))]
Err(ClientError::Other("no mux endpoint available".into()))
}
async fn try_tcp_env(timeout: Duration) -> Option<(TcpTransport, String)> {
let tcp = std::env::var("DONGLORA_MUX_TCP").ok()?;
let (host, port) = parse_tcp_endpoint(&tcp)?;
match TcpTransport::connect(&host, port, timeout).await {
Ok(t) => {
debug!("connected to TCP mux at {host}:{port}");
Some((t, format!("{host}:{port}")))
}
Err(e) => {
debug!("DONGLORA_MUX_TCP connect failed: {e}");
None
}
}
}
fn parse_tcp_endpoint(addr: &str) -> Option<(String, u16)> {
if let Some((h, p)) = addr.rsplit_once(':') {
let host = if h.is_empty() { "localhost".to_string() } else { h.to_string() };
let port: u16 = p.parse().ok()?;
Some((host, port))
} else {
let port: u16 = addr.parse().ok()?;
Some(("localhost".to_string(), port))
}
}
async fn finalize<T: Transport>(
transport: T,
kind: TransportKind,
opts: &ConnectOptions,
timeout: Duration,
) -> ClientResult<Dongle> {
let session = Session::spawn(transport);
session.ping(timeout).await?;
let info = session.get_info(timeout).await?;
let applied = if opts.auto_configure {
match opts.config {
Some(m) => {
let prepared = prepare_config(&info, m)?;
let result = session.set_config(prepared, timeout).await?;
Some(result.current)
}
None => None,
}
} else {
None
};
Ok(Dongle::new(session, info, kind, applied, opts.keepalive))
}
pub(crate) fn prepare_config(info: &Info, modulation: Modulation) -> ClientResult<Modulation> {
let Modulation::LoRa(cfg) = modulation else {
return Ok(modulation);
};
Ok(Modulation::LoRa(prepare_lora_config(info, cfg)?))
}
fn prepare_lora_config(info: &Info, mut cfg: LoRaConfig) -> ClientResult<LoRaConfig> {
if cfg.freq_hz < info.freq_min_hz || cfg.freq_hz > info.freq_max_hz {
return Err(ClientError::ConfigNotSupported {
reason: format!(
"frequency {} Hz outside device range [{}, {}] Hz",
cfg.freq_hz, info.freq_min_hz, info.freq_max_hz
),
});
}
if info.supported_sf_bitmap & (1u16 << cfg.sf) == 0 {
let supported: Vec<u8> = (0u8..16).filter(|i| info.supported_sf_bitmap & (1u16 << i) != 0).collect();
return Err(ClientError::ConfigNotSupported {
reason: format!("SF{} not supported by this device (supports SF{:?})", cfg.sf, supported),
});
}
let bw_bit = cfg.bw.as_u8();
if info.supported_bw_bitmap & (1u16 << bw_bit) == 0 {
return Err(ClientError::ConfigNotSupported {
reason: format!(
"bandwidth {:?} (bit {}) not in supported_bw_bitmap 0x{:04X}",
cfg.bw, bw_bit, info.supported_bw_bitmap
),
});
}
if cfg.tx_power_dbm > info.tx_power_max_dbm {
info!(requested = cfg.tx_power_dbm, device_max = info.tx_power_max_dbm, "clamping tx_power_dbm to device max");
cfg.tx_power_dbm = info.tx_power_max_dbm;
} else if cfg.tx_power_dbm < info.tx_power_min_dbm {
info!(requested = cfg.tx_power_dbm, device_min = info.tx_power_min_dbm, "clamping tx_power_dbm to device min");
cfg.tx_power_dbm = info.tx_power_min_dbm;
}
Ok(cfg)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use donglora_protocol::{
FskConfig, LoRaBandwidth, LoRaCodingRate, LoRaHeaderMode, MAX_MCU_UID_LEN, MAX_RADIO_UID_LEN, RadioChipId,
};
fn info(tx_min: i8, tx_max: i8, freq_min: u32, freq_max: u32, sf_bm: u16, bw_bm: u16) -> Info {
Info {
proto_major: 1,
proto_minor: 0,
fw_major: 0,
fw_minor: 0,
fw_patch: 0,
radio_chip_id: RadioChipId::Sx1262.as_u16(),
capability_bitmap: donglora_protocol::cap::LORA,
supported_sf_bitmap: sf_bm,
supported_bw_bitmap: bw_bm,
max_payload_bytes: 255,
rx_queue_capacity: 32,
tx_queue_capacity: 1,
freq_min_hz: freq_min,
freq_max_hz: freq_max,
tx_power_min_dbm: tx_min,
tx_power_max_dbm: tx_max,
mcu_uid_len: 0,
mcu_uid: [0u8; MAX_MCU_UID_LEN],
radio_uid_len: 0,
radio_uid: [0u8; MAX_RADIO_UID_LEN],
}
}
fn lora(freq_hz: u32, sf: u8, bw: LoRaBandwidth, tx_power_dbm: i8) -> LoRaConfig {
LoRaConfig {
freq_hz,
sf,
bw,
cr: LoRaCodingRate::Cr4_5,
preamble_len: 8,
sync_word: 0x3444,
tx_power_dbm,
header_mode: LoRaHeaderMode::Explicit,
payload_crc: true,
iq_invert: false,
}
}
const SUB_GHZ_ALL_SF: u16 = 0x1FE0; const SX127X_SF: u16 = 0x1FC0; const SUB_GHZ_BW: u16 = 0x03FF;
#[test]
fn tx_power_above_max_clamps_down() {
let i = info(-9, 20, 863_000_000, 928_000_000, SUB_GHZ_ALL_SF, SUB_GHZ_BW);
let cfg = lora(915_000_000, 7, LoRaBandwidth::Khz125, 30);
let Modulation::LoRa(out) = prepare_config(&i, Modulation::LoRa(cfg)).unwrap() else {
panic!("expected LoRa");
};
assert_eq!(out.tx_power_dbm, 20);
assert_eq!(out.freq_hz, 915_000_000);
}
#[test]
fn tx_power_below_min_clamps_up() {
let i = info(2, 20, 863_000_000, 928_000_000, SUB_GHZ_ALL_SF, SUB_GHZ_BW);
let cfg = lora(915_000_000, 7, LoRaBandwidth::Khz125, -30);
let Modulation::LoRa(out) = prepare_config(&i, Modulation::LoRa(cfg)).unwrap() else {
panic!("expected LoRa");
};
assert_eq!(out.tx_power_dbm, 2);
}
#[test]
fn tx_power_in_range_unchanged() {
let i = info(-9, 22, 863_000_000, 928_000_000, SUB_GHZ_ALL_SF, SUB_GHZ_BW);
let cfg = lora(915_000_000, 7, LoRaBandwidth::Khz125, 17);
let Modulation::LoRa(out) = prepare_config(&i, Modulation::LoRa(cfg)).unwrap() else {
panic!("expected LoRa");
};
assert_eq!(out.tx_power_dbm, 17);
}
#[test]
fn freq_out_of_range_rejected() {
let i = info(-9, 22, 863_000_000, 928_000_000, SUB_GHZ_ALL_SF, SUB_GHZ_BW);
let cfg = lora(300_000_000, 7, LoRaBandwidth::Khz125, 14);
let err = prepare_config(&i, Modulation::LoRa(cfg)).unwrap_err();
assert!(matches!(err, ClientError::ConfigNotSupported { ref reason } if reason.contains("frequency")));
}
#[test]
fn sf5_rejected_on_sx127x_bitmap() {
let i = info(2, 20, 863_000_000, 1_020_000_000, SX127X_SF, SUB_GHZ_BW);
let cfg = lora(915_000_000, 5, LoRaBandwidth::Khz125, 14);
let err = prepare_config(&i, Modulation::LoRa(cfg)).unwrap_err();
assert!(matches!(err, ClientError::ConfigNotSupported { ref reason } if reason.contains("SF5")));
}
#[test]
fn bw_not_in_bitmap_rejected() {
let i = info(-9, 22, 863_000_000, 928_000_000, SUB_GHZ_ALL_SF, SUB_GHZ_BW);
let cfg = lora(915_000_000, 7, LoRaBandwidth::Khz200, 14);
let err = prepare_config(&i, Modulation::LoRa(cfg)).unwrap_err();
assert!(matches!(err, ClientError::ConfigNotSupported { ref reason } if reason.contains("bandwidth")));
}
#[test]
fn non_lora_modulation_passes_through() {
let i = info(-9, 22, 863_000_000, 928_000_000, SUB_GHZ_ALL_SF, SUB_GHZ_BW);
let fsk = FskConfig {
freq_hz: 50_000_000, bitrate_bps: 50_000,
freq_dev_hz: 25_000,
rx_bw: 0,
preamble_len: 16,
sync_word_len: 0,
sync_word: [0u8; 8],
};
let out = prepare_config(&i, Modulation::FskGfsk(fsk)).unwrap();
assert!(matches!(out, Modulation::FskGfsk(_)));
}
}