#![allow(clippy::similar_names)]
use anyhow::Result;
use clap::Parser;
use log::{debug, info, warn};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio_serial::SerialPortBuilderExt;
use rax25::r#async::PortType;
use agw::r#async::AGWServer;
#[derive(Clone, Copy, Debug, Eq, PartialEq, clap::ValueEnum)]
enum LogLevel {
Error,
Warn,
Info,
Debug,
Trace,
}
#[derive(Parser)]
struct Opt {
#[arg(long, short)]
dev: Option<std::path::PathBuf>,
#[arg(long, short)]
kiss: Option<String>,
#[arg(long, short, default_value_t = 9600)]
baud: u32,
#[arg(long, short)]
listen: String,
#[arg(short = 'v', long = "log-level", value_enum, default_value = "info")]
log_level: LogLevel,
}
async fn handle_client(mut stream: AGWServer, mut modem: ModemLink) -> Result<()> {
loop {
tokio::select! {
packet = stream.recv() => {
let packet = match packet {
Ok(p) => p,
Err(agw::Error::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
};
debug!("Got packet from AGW client: {packet:?}");
#[allow(clippy::single_match)]
match packet {
agw::Packet::VersionQuery => {
stream.send(&agw::Packet::VersionReply {
major: 2005, minor: 127,
}).await?;
},
_ => {}
}
}
packet = modem.recv() => {
let packet = packet?;
debug!("Got packet from modem: {packet:?}");
modem.send(vec![]).await?;
}
}
}
info!("Connection ended normally");
Ok(())
}
#[derive(Clone)]
struct ModemLink {
tx: mpsc::Sender<Vec<u8>>,
}
impl ModemLink {
#[allow(clippy::unused_async)]
async fn recv(&mut self) -> Result<Vec<u8>> {
Ok(vec![])
}
async fn send<T: Into<Vec<u8>>>(
&mut self,
data: T,
) -> Result<(), mpsc::error::SendError<Vec<u8>>> {
self.tx.send(data.into()).await
}
}
struct Modem {
portr: tokio::io::ReadHalf<PortType>,
portw: tokio::io::WriteHalf<PortType>,
to_tx: mpsc::Receiver<Vec<u8>>,
distribute: Vec<mpsc::Sender<agw::Packet>>,
}
impl Modem {
#[must_use]
fn new(port: PortType) -> (Self, ModemLink) {
let (portr, portw) = tokio::io::split(port);
let (tx, to_tx) = mpsc::channel(10); (
Self {
portr,
portw,
to_tx,
distribute: vec![],
},
ModemLink { tx },
)
}
async fn run(&mut self) -> Result<()> {
const MAX_BUF: usize = 8192; let mut buf = [0u8; 1024];
let mut to_modem = Vec::new();
loop {
tokio::select! {
n = self.portw.write(&to_modem), if !to_modem.is_empty() => {
let n = n?;
to_modem.drain(..n);
}
packet = self.to_tx.recv() => {
match packet {
Some(packet) => {
to_modem.extend(&packet);
}
None => break
}
}
n = self.portr.read(&mut buf), if to_modem.len()<MAX_BUF => {
let n = n?;
to_modem.extend(&buf[..n]);
for _ in &self.distribute {
}
}
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::parse();
stderrlog::new()
.module(module_path!())
.module("agw")
.quiet(false)
.show_module_names(true)
.verbosity(opt.log_level as usize)
.timestamp(stderrlog::Timestamp::Second)
.init()
.unwrap();
info!("Starting up");
let mut port = if let Some(dev) = opt.dev {
PortType::Serial(tokio_serial::new(dev.to_str().unwrap(), opt.baud).open_native_async()?)
} else if let Some(kiss) = opt.kiss {
PortType::Tcp(tokio::net::TcpStream::connect(kiss).await?)
} else {
panic!()
};
if false {
let packet = rax25::Packet::ui(
rax25::Addr::new("M0THC-1")?,
rax25::Addr::new("M0THC-2")?,
b"hello world in the world",
);
let serial = rax25::escape(&packet.serialize(false));
port.write_all(&serial).await?;
port.flush().await?;
}
let (mut modem, link) = Modem::new(port);
let listener = tokio::net::TcpListener::bind(&opt.listen).await?;
tokio::spawn(async move { modem.run().await });
loop {
let (stream, peer) = listener.accept().await?;
let stream = AGWServer::new(stream);
info!("{peer}: Connected");
let link = link.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(stream, link).await {
warn!("Client task failed: {e:?}");
}
});
}
}