use std::collections::{HashSet, VecDeque};
use std::pin::Pin;
use crate::pcap::PcapWriter;
use crate::state::{self, Event, ReturnEvent};
use crate::{Addr, Packet, PacketType};
use anyhow::{Context, Error, Result, bail};
use log::{debug, error, trace, warn};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio_serial::SerialPortBuilderExt;
pub enum PortType {
Serial(tokio_serial::SerialStream),
Tcp(tokio::net::TcpStream),
}
#[allow(clippy::doc_markdown)]
pub async fn connect_kiss_endpoint(endpoint: &str) -> Result<PortType> {
if let Some(addr) = endpoint.strip_prefix("tcp://") {
if addr.is_empty() {
bail!("empty TCP KISS endpoint");
}
return Ok(PortType::Tcp(
tokio::net::TcpStream::connect(addr)
.await
.with_context(|| format!("connecting to TCP KISS endpoint {endpoint:?}"))?,
));
}
if let Some(path) = endpoint.strip_prefix("serial://") {
if path.is_empty() {
bail!("empty serial KISS endpoint");
}
return Ok(PortType::Serial(
tokio_serial::new(path, 9600)
.open_native_async()
.with_context(|| format!("opening serial KISS endpoint {endpoint:?}"))?,
));
}
bail!("KISS endpoint must start with tcp:// or serial://");
}
impl tokio::io::AsyncRead for PortType {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match *self {
PortType::Serial(ref mut x) => Pin::new(x).poll_read(cx, buf),
PortType::Tcp(ref mut x) => Pin::new(x).poll_read(cx, buf),
}
}
}
impl tokio::io::AsyncWrite for PortType {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<std::io::Result<usize>> {
match *self {
PortType::Serial(ref mut x) => Pin::new(x).poll_write(cx, buf),
PortType::Tcp(ref mut x) => Pin::new(x).poll_write(cx, buf),
}
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match *self {
PortType::Serial(ref mut x) => Pin::new(x).poll_flush(cx),
PortType::Tcp(ref mut x) => Pin::new(x).poll_flush(cx),
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match *self {
PortType::Serial(ref mut x) => Pin::new(x).poll_shutdown(cx),
PortType::Tcp(ref mut x) => Pin::new(x).poll_shutdown(cx),
}
}
}
pub struct ConnectionBuilder {
me: Addr,
extended: Option<bool>,
capture: Option<std::path::PathBuf>,
port: PortType,
t3v: Option<std::time::Duration>,
srt: Option<std::time::Duration>,
mtu: Option<usize>,
experiments: HashSet<crate::Experiment>,
}
impl ConnectionBuilder {
pub fn new(me: Addr, port: PortType) -> Result<Self> {
Ok(Self {
me,
extended: None,
capture: None,
t3v: None,
srt: None,
mtu: None,
port,
experiments: HashSet::new(),
})
}
#[must_use]
pub fn extended(mut self, ext: Option<bool>) -> ConnectionBuilder {
self.extended = ext;
self
}
#[must_use]
pub fn capture(mut self, path: std::path::PathBuf) -> ConnectionBuilder {
self.capture = Some(path);
self
}
#[must_use]
pub fn srt_default(mut self, v: std::time::Duration) -> ConnectionBuilder {
self.srt = Some(v);
self
}
#[must_use]
pub fn t3v(mut self, v: std::time::Duration) -> ConnectionBuilder {
self.t3v = Some(v);
self
}
#[must_use]
pub fn enable_experiment(mut self, ex: crate::Experiment) -> ConnectionBuilder {
self.experiments.insert(ex);
self
}
#[must_use]
pub fn mtu(mut self, v: usize) -> ConnectionBuilder {
self.mtu = Some(v);
self
}
#[must_use]
fn create_data(&self) -> state::Data {
let mut data = state::Data::new(self.me.clone());
if let Some(v) = self.srt {
data.srt_default(v);
}
if let Some(v) = self.t3v {
data.t3v(v);
}
if let Some(v) = self.mtu {
data.mtu(v);
}
for ex in &self.experiments {
data.enable_experiment(*ex);
}
data
}
pub async fn connect(self, peer: Addr) -> Result<Client> {
let mut cli = Client::internal_new(
self.create_data(),
self.port,
self.extended.unwrap_or(false),
);
if let Some(capture) = self.capture {
cli.capture(capture)?;
}
cli.connect(peer).await
}
pub async fn accept(self) -> Result<Client> {
let mut data = self.create_data();
data.able_to_establish = true;
let mut cli = Client::internal_new(data, self.port, false);
if let Some(capture) = self.capture {
cli.capture(capture)?;
}
debug!("rax25: Awaiting incoming SABM");
loop {
cli.wait_event().await?;
if cli.state.is_state_connected() {
return Ok(cli);
}
}
}
}
struct KissPort {
incoming_kiss: VecDeque<u8>,
incoming_frames: VecDeque<Packet>,
ext: bool,
port: PortType,
}
impl KissPort {
async fn process(&mut self, mut pcap: Option<&mut PcapWriter>) -> Result<()> {
let mut buf = [0; 1024];
loop {
match self.port.read(&mut buf).await {
Ok(0) => {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"EOF from KISS port",
)
.into());
}
Ok(n) => {
trace!("Read {n} bytes from serial port");
let buf = &buf[..n];
self.incoming_kiss.extend(buf);
self.extract_packets(pcap.as_deref_mut());
if !self.incoming_frames.is_empty() {
return Ok(());
}
}
Err(e) => return Err(e.into()),
}
}
}
#[must_use]
fn len(&self) -> usize {
self.incoming_frames.len()
}
#[must_use]
fn pop_frame(&mut self) -> Option<Packet> {
self.incoming_frames.pop_front()
}
fn extract_packets(&mut self, pcap: Option<&mut PcapWriter>) {
self.incoming_frames
.extend(kisser_read(&mut self.incoming_kiss, Some(self.ext), pcap));
}
async fn write(&mut self, packet: &Packet) -> Result<()> {
let bytes = packet.serialize(self.ext);
let frame = crate::escape(&bytes);
self.port.write_all(&frame).await?;
self.port.flush().await?;
Ok(())
}
}
pub struct Client {
state: Box<dyn state::State>,
data: state::Data,
eof: bool,
incoming: VecDeque<u8>,
kissport: KissPort,
pcap: Option<PcapWriter>,
}
#[must_use]
fn kisser_read(
ibuf: &mut VecDeque<u8>,
ext: Option<bool>,
mut pcap: Option<&mut PcapWriter>,
) -> Vec<Packet> {
let mut ret = Vec::new();
while let Some((a, b)) = crate::find_frame(ibuf) {
if b - a < 14 {
ibuf.drain(..=a);
continue;
}
let pb: Vec<_> = ibuf.iter().skip(a + 2).take(b - a - 2).copied().collect();
ibuf.drain(..b);
let pb = crate::unescape(&pb);
match Packet::parse(&pb, ext) {
Ok(packet) => {
trace!("rax25: parsed {packet:?}");
if let PacketType::Sabme(_) = packet.packet_type {
}
if false {
trace!(
"BYTES: {:?}",
Packet::parse(&packet.serialize(ext.unwrap_or(false)), ext)
);
assert_eq!(&packet.serialize(ext.unwrap_or(false)), &pb);
}
if let Some(f) = &mut pcap
&& let Err(e) = f.write(&pb)
{
error!("Failed to write to pcap: {e}");
}
ret.push(packet);
}
Err(e) => {
debug!("rax25: Failed to parse packet: {e:?}");
}
}
}
ret
}
impl Client {
#[must_use]
fn internal_new(data: state::Data, port: PortType, ext: bool) -> Self {
Self {
eof: false,
incoming: VecDeque::new(),
kissport: KissPort {
port,
incoming_kiss: VecDeque::new(),
incoming_frames: VecDeque::new(),
ext,
},
state: state::new(),
data,
pcap: None,
}
}
async fn connect(mut self, peer: Addr) -> Result<Self> {
self.actions(Event::Connect {
addr: peer,
ext: self.kissport.ext,
})
.await?;
loop {
self.wait_event().await?;
trace!("rax25: State after waiting: {}", self.state.name());
if self.state.is_state_connected() {
return Ok(self);
}
if self.state.is_state_disconnected() {
return Err(Error::msg("connection timed out"));
}
}
}
fn capture(&mut self, filename: std::path::PathBuf) -> Result<()> {
let pcap = PcapWriter::create(filename)?;
self.pcap = Some(pcap);
Ok(())
}
async fn wait_event(&mut self) -> Result<()> {
trace!(
"rax25: Waiting for event. {} packets ready",
self.kissport.len()
);
let state_name = self.state.name();
while let Some(p) = self.kissport.pop_frame() {
if p.dst.call() != self.data.me.call() {
trace!("rax25: Skipping packet not for {:?}", self.data.me.call());
continue;
}
if let Some(peer) = &self.data.peer
&& peer.call() != p.src.call()
{
trace!(
"rax25: Skipping packet not from {peer:?} but {:?}",
p.src.call()
);
continue;
}
trace!("rax25: processing packet {:?}", p.packet_type);
self.actions_packet(&p).await?;
trace!(
"rax25: post packet: {} {:?} {:?}",
self.state.name(),
self.data.t1.remaining(),
self.data.t3.remaining()
);
}
if !self.incoming.is_empty() {
return Ok(());
}
if self.state.name() != state_name {
if self.state.is_state_connected() {
self.kissport.ext = self.data.modulus.is_extended();
}
return Ok(());
}
let (t1, t3) = self.timer_13();
tokio::pin!(t1);
tokio::pin!(t3);
tokio::select! {
() = &mut t1 => {
debug!("rax25: async con event: T1");
self.actions(Event::T1).await?;
},
() = &mut t3 => {
debug!("rax25: async con event: T3");
self.actions(Event::T3).await?;
},
res = self.kissport.process(self.pcap.as_mut()) => {
if let Err(e) = res {
warn!("Error reading from serial port: {e:?}");
return Err(e);
}
},
}
debug!(
"rax25: async con post state: {} {:?} {:?}",
self.state.name(),
self.data.t1.remaining(),
self.data.t3.remaining()
);
Ok(())
}
async fn actions_packet(&mut self, packet: &Packet) -> Result<()> {
match &packet.packet_type {
PacketType::Sabm(p) => self.actions(state::Event::Sabm(p.clone(), packet.src.clone())),
PacketType::Sabme(p) => {
self.actions(state::Event::Sabme(p.clone(), packet.src.clone()))
}
PacketType::Ua(ua) => self.actions(state::Event::Ua(ua.clone())),
PacketType::Disc(p) => self.actions(state::Event::Disc(p.clone())),
PacketType::Rnr(p) => self.actions(state::Event::Rnr(p.clone())),
PacketType::Rej(p) => self.actions(state::Event::Rej(p.clone())),
PacketType::Srej(p) => self.actions(state::Event::Srej(p.clone())),
PacketType::Frmr(p) => self.actions(state::Event::Frmr(p.clone())),
PacketType::Xid(p) => {
self.actions(state::Event::Xid(p.clone(), packet.command_response))
}
PacketType::Ui(p) => self.actions(state::Event::Ui(p.clone(), packet.command_response)),
PacketType::Test(p) => {
self.actions(state::Event::Test(p.clone(), packet.command_response))
}
PacketType::Dm(p) => self.actions(state::Event::Dm(p.clone())),
PacketType::Rr(rr) => {
self.actions(state::Event::Rr(rr.clone(), packet.command_response))
}
PacketType::Iframe(iframe) => self.actions(state::Event::Iframe(
iframe.clone(),
packet.command_response,
)),
}
.await
}
pub async fn disconnect(mut self) -> Result<()> {
trace!("rax25: Disconnecting while in state {}", self.state.name());
self.actions(Event::Disconnect).await?;
match self.state.name().as_str() {
"AwaitingRelease" => loop {
match self.wait_event().await {
Ok(()) => {}
Err(e) if is_error_eof(&e) => return Ok(()),
Err(e) => return Err(e),
}
if self.state.is_state_disconnected() {
break Ok(());
}
},
"Disconnected" => Ok(()),
other => {
trace!("Disconnect request in unexpected state {other}");
Ok(())
}
}
}
fn sync_disconnect(&mut self) {
if !self.state.is_state_disconnected() {
trace!("TODO: sync_disconnect");
}
}
pub async fn write(&mut self, data: &[u8]) -> Result<()> {
self.actions(Event::Data(data.to_vec())).await
}
fn timer_13(&self) -> (tokio::time::Sleep, tokio::time::Sleep) {
let timer1 = tokio::time::sleep(
self.data
.t1
.remaining()
.unwrap_or(std::time::Duration::from_hours(24)),
);
let timer3 = tokio::time::sleep(
self.data
.t3
.remaining()
.unwrap_or(std::time::Duration::from_hours(24)),
);
(timer1, timer3)
}
pub async fn read(&mut self) -> Result<Vec<u8>> {
loop {
self.wait_event().await?;
if self.incoming.is_empty() && self.eof {
return Ok(vec![]);
}
if !self.incoming.is_empty() {
let ret: Vec<_> = self.incoming.iter().copied().collect();
self.incoming.clear();
return Ok(ret);
}
}
}
async fn actions(&mut self, event: Event) -> Result<()> {
let (state, actions) = state::handle(&*self.state, &mut self.data, &event);
if let Some(state) = state {
let _ = std::mem::replace(&mut self.state, state);
}
for act in actions {
match &act {
ReturnEvent::DlError(e) => warn!("DLError: {e:?}"),
ReturnEvent::Data(res) => {
trace!("rax25: ReturnEvent::Data {act:?}");
match res {
state::Res::None => {}
state::Res::EOF => self.eof = true,
state::Res::Some(d) => self.incoming.extend(d),
}
}
ReturnEvent::Packet(p) => {
self.kissport.ext = self.data.ext();
self.kissport.write(p).await?;
if let Some(f) = &mut self.pcap {
f.write(&p.serialize(self.data.ext()))?;
}
}
}
}
Ok(())
}
}
impl Drop for Client {
fn drop(&mut self) {
self.sync_disconnect();
}
}
fn is_error_eof(e: &Error) -> bool {
let Some(io_err) = e.chain().find_map(|e| e.downcast_ref::<std::io::Error>()) else {
return false;
};
io_err.kind() == std::io::ErrorKind::UnexpectedEof
}