use donglora_bridge::{config, gossip, radio, rate_limit, router, setup, tui};
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use iroh::SecretKey;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use crate::rate_limit::RateLimiter;
use crate::router::Stats;
#[derive(Parser)]
#[command(name = "donglora-bridge", about = "LoRa gossip bridge using iroh")]
struct Cli {
#[arg(long)]
config: Option<String>,
#[arg(long)]
log_only: bool,
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
Config,
}
#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() -> Result<()> {
let cli = Cli::parse();
let config_path = match &cli.config {
Some(p) => std::path::PathBuf::from(p),
None => config::config_path()?,
};
if matches!(cli.command, Some(Commands::Config)) {
let existing = if config_path.exists() { config::load_config(&config_path).ok() } else { None };
return setup::run_wizard(&config_path, existing.as_ref());
}
if !config_path.exists() {
println!(" No config found. Let's set one up!\n");
return setup::run_wizard(&config_path, None);
}
let cfg =
config::load_config(&config_path).with_context(|| format!("loading config from {}", config_path.display()))?;
setup_logging(cli.log_only, &cfg);
let secret_key = SecretKey::generate();
let node_id = secret_key.public();
info!("donglora-bridge starting, ephemeral id: {}", node_id.fmt_short());
let radio_config = cfg.radio.to_lora_config()?;
let retry_policy = cfg.tx.to_retry_policy()?;
let cr_denom: u8 = match radio_config.cr {
donglora_client::LoRaCodingRate::Cr4_5 => 5,
donglora_client::LoRaCodingRate::Cr4_6 => 6,
donglora_client::LoRaCodingRate::Cr4_7 => 7,
donglora_client::LoRaCodingRate::Cr4_8 => 8,
};
let rate_limiter = RateLimiter::from_radio_config(
radio_config.sf,
radio_config.bw,
cr_denom,
radio_config.preamble_len,
cfg.bridge.rate_limit_pps,
);
info!("rate limiter: {:.1} pps", rate_limiter.rate_pps());
let (swarm, gossip_event_rx, gossip_frame_tx) = gossip::Gossip::new(secret_key, &cfg.bridge.passphrase).await?;
let swarm = Arc::new(swarm);
let (radio_event_rx, radio_tx, _radio_handle) = radio::spawn(radio_config, cfg.radio.port.clone(), retry_policy)?;
let stats = Arc::new(Stats::default());
let (log_tx, log_rx) = mpsc::channel(512);
let (config_watch_tx, config_watch_rx) = tokio::sync::watch::channel(router::RadioConfigInfo {
active: radio_config,
requested: radio_config,
source: radio::ConfigSource::Ours,
device: String::new(),
connected: false,
});
let cancel = CancellationToken::new();
let start_time = Instant::now();
let router_stats = stats.clone();
let router_cancel = cancel.clone();
let router_our_id = node_id;
let dedup_window = std::time::Duration::from_secs(cfg.bridge.dedup_window_secs);
let tx_queue_size = cfg.bridge.tx_queue_size;
tokio::spawn(async move {
router::run(
router_our_id,
dedup_window,
tx_queue_size,
radio_config,
rate_limiter,
radio_event_rx,
radio_tx,
gossip_event_rx,
gossip_frame_tx,
&router_stats,
log_tx,
config_watch_tx,
)
.await;
router_cancel.cancel();
});
{
let cancel = cancel.clone();
tokio::spawn(async move {
let sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate());
match sigterm {
Ok(mut sigterm) => {
tokio::select! {
_ = tokio::signal::ctrl_c() => info!("received SIGINT"),
_ = sigterm.recv() => info!("received SIGTERM"),
}
}
Err(e) => {
warn!("failed to register SIGTERM handler: {e:#}");
let _ = tokio::signal::ctrl_c().await;
info!("received SIGINT");
}
}
cancel.cancel();
});
}
if cli.log_only {
info!("running in headless mode");
let log_cancel = cancel.clone();
tokio::spawn(headless_packet_logger(log_rx, log_cancel));
cancel.cancelled().await;
} else {
let terminal = tui::run(config_watch_rx, swarm.clone(), stats, log_rx, cancel.clone(), start_time);
if let Ok(mut term) = terminal {
tui::restore_terminal(&mut term);
}
cancel.cancel();
if tokio::time::timeout(Duration::from_secs(2), swarm.shutdown()).await.is_err() {
warn!("graceful shutdown timed out — exiting anyway");
}
info!("donglora-bridge stopped");
return Ok(());
}
swarm.shutdown().await;
info!("donglora-bridge stopped");
Ok(())
}
async fn headless_packet_logger(mut log_rx: mpsc::Receiver<router::PacketLogEntry>, cancel: CancellationToken) {
use router::{PacketAction, PacketDirection, TxRetryState};
loop {
tokio::select! {
() = cancel.cancelled() => return,
entry = log_rx.recv() => {
let Some(e) = entry else { return; };
let dir = match e.direction {
PacketDirection::RadioIn => "RF→NET",
PacketDirection::GossipIn => "NET→RF",
};
let hash_str = hex::encode(&e.hash[..4]);
let rssi_str = e.rssi.map_or_else(|| "-".to_string(), |r| r.to_string());
let snr_str = e.snr.map_or_else(|| "-".to_string(), |s| s.to_string());
if let Some(r) = e.tx_retry {
match r.state {
TxRetryState::Retrying(reason) => {
tracing::info!(
target: "packet",
"{} retry hash={} attempt={}/{} reason={}",
dir, hash_str, r.attempt, r.total_attempts, reason,
);
}
TxRetryState::Succeeded if r.attempt > 1 => {
tracing::info!(
target: "packet",
"{} sent hash={} size={}B attempts={}/{}",
dir, hash_str, e.size, r.attempt, r.total_attempts,
);
}
TxRetryState::Failed => {
tracing::warn!(
target: "packet",
"{} FAILED hash={} size={}B attempts={}/{}",
dir, hash_str, e.size, r.attempt, r.total_attempts,
);
}
_ => {}
}
continue;
}
match e.action {
PacketAction::Bridged => {
tracing::info!(
target: "packet",
"{} bridged hash={} size={}B rssi={} snr={}",
dir, hash_str, e.size, rssi_str, snr_str,
);
}
PacketAction::DroppedDedup => {
tracing::info!(
target: "packet",
"{} dedup hash={} size={}B rssi={} snr={}",
dir, hash_str, e.size, rssi_str, snr_str,
);
}
PacketAction::DroppedRateLimit => {
tracing::warn!(
target: "packet",
"{} ratelim hash={} size={}B",
dir, hash_str, e.size,
);
}
PacketAction::DroppedQueueFull => {
tracing::warn!(
target: "packet",
"{} QFULL hash={} size={}B",
dir, hash_str, e.size,
);
}
}
}
}
}
}
fn setup_logging(log_only: bool, cfg: &config::Config) {
let env_filter = || {
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
if log_only {
tracing_subscriber::fmt().with_env_filter(env_filter()).init();
} else {
let log_path = cfg
.bridge
.log_file
.clone()
.unwrap_or_else(|| config::default_log_path().unwrap_or_else(|_| "/tmp/donglora-bridge.log".into()));
if let Some(parent) = log_path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
eprintln!("warning: failed to create log directory {}: {e}", parent.display());
}
let file_appender = tracing_appender::rolling::never(
log_path.parent().unwrap_or_else(|| std::path::Path::new("/tmp")),
log_path.file_name().unwrap_or_else(|| std::ffi::OsStr::new("donglora-bridge.log")),
);
tracing_subscriber::fmt().with_env_filter(env_filter()).with_writer(file_appender).with_ansi(false).init();
}
}