use std::fmt;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::string::String;
use std::time::{Duration, Instant};
use futures::io::{AsyncRead, AsyncWrite};
use futures::sink::{Sink, SinkExt};
use futures::stream::{Stream, StreamExt};
use futures::task::{noop_waker_ref, Context, Poll};
use futures_codec::Framed;
use async_std::net::TcpStream;
use async_std::prelude::*;
use super::controlstream;
use super::controlstream::{ControlSink, ControlStreamEvents, ControlStreamResults};
use super::data::{DataIn, DataOut};
use super::dataevent::{DataEvent, DataEventStream};
use crate::arq::{ConnectionFailedReason, ConnectionInfo};
use crate::framing::data::TncDataFraming;
use crate::protocol::command;
use crate::protocol::command::Command;
use crate::protocol::constants::{CommandID, ProtocolMode};
use crate::protocol::response::{CommandOk, CommandResult, ConnectionStateChange};
use crate::tnc::{DiscoveredPeer, PingAck, PingFailedReason, TncError, TncResult};
const DATA_PORT_OFFSET: u16 = 1;
const DEFAULT_TIMEOUT_COMMAND: Duration = Duration::from_millis(20000);
const TIMEOUT_DISCONNECT: Duration = Duration::from_secs(60);
const TIMEOUT_PING: Duration = Duration::from_secs(5);
pub enum ConnectionInfoOrPeerDiscovery {
Connection(ConnectionInfo),
PeerDiscovery(DiscoveredPeer),
}
pub struct AsyncTnc<I>
where
I: AsyncRead + AsyncWrite + Unpin + Send,
{
data_stream: DataEventStream<Framed<I, TncDataFraming>, ControlStreamEvents<I>>,
control_in_res: ControlStreamResults<I>,
control_out: ControlSink<I>,
control_timeout: Duration,
disconnect_progress: DisconnectProgress,
}
pub type AsyncTncTcp = AsyncTnc<TcpStream>;
impl<I: 'static> AsyncTnc<I>
where
I: AsyncRead + AsyncWrite + Unpin + Send,
{
pub async fn new<S>(control_addr: &SocketAddr, mycall: S) -> TncResult<AsyncTnc<TcpStream>>
where
S: Into<String>,
{
let data_addr = SocketAddr::new(control_addr.ip(), control_addr.port() + DATA_PORT_OFFSET);
let stream_control: TcpStream = TcpStream::connect(control_addr).await?;
let stream_data: TcpStream = TcpStream::connect(&data_addr).await?;
let mut out = AsyncTnc::new_from_streams(stream_control, stream_data, mycall);
match out.initialize().await {
Ok(()) => {
info!("Initialized ARDOP TNC at {}", &control_addr);
Ok(out)
}
Err(e) => {
error!(
"Unable to initialize ARDOP TNC at {}: {}",
&control_addr, &e
);
Err(e)
}
}
}
pub(crate) fn new_from_streams<S>(stream_control: I, stream_data: I, mycall: S) -> AsyncTnc<I>
where
S: Into<String>,
{
let (control_in_evt, control_in_res, control_out) =
controlstream::controlstream(stream_control);
let data_inout = Framed::new(stream_data, TncDataFraming::new());
AsyncTnc {
data_stream: DataEventStream::new(mycall, data_inout, control_in_evt),
control_in_res,
control_out,
control_timeout: DEFAULT_TIMEOUT_COMMAND,
disconnect_progress: DisconnectProgress::NoProgress,
}
}
pub fn mycall(&self) -> &String {
self.data_stream.state().mycall()
}
pub fn control_timeout(&self) -> &Duration {
&self.control_timeout
}
pub fn set_control_timeout(&mut self, timeout: Duration) {
self.control_timeout = timeout;
}
pub fn data_stream_sink(
&mut self,
) -> &mut (impl Stream<Item = DataEvent> + Sink<DataOut, Error = io::Error> + Unpin) {
&mut self.data_stream
}
pub async fn command<F>(&mut self, cmd: Command<F>) -> TncResult<()>
where
F: fmt::Display,
{
match execute_command(
&mut self.control_out,
&mut self.control_in_res,
&self.control_timeout,
cmd,
)
.await
{
Ok(cmdok) => {
debug!("Command {} OK", cmdok.0);
Ok(())
}
Err(e) => {
warn!("Command failed: {}", &e);
Err(e)
}
}
}
pub async fn await_clear(&mut self, clear_time: Duration, max_wait: Duration) -> TncResult<()> {
if clear_time == Duration::from_secs(0) {
warn!("Busy detector is DISABLED. Assuming channel is clear.");
return Ok(());
}
let mut ctx = Context::from_waker(noop_waker_ref());
loop {
match Pin::new(&mut self.data_stream).poll_next(&mut ctx) {
Poll::Pending => break,
Poll::Ready(None) => return Err(connection_reset_err().into()),
Poll::Ready(Some(_evt)) => continue,
}
}
if self.data_stream.state().clear_time() > clear_time {
info!("Channel is CLEAR and READY for use at +0.0 seconds");
return Ok(());
}
info!(
"Waiting for a clear channel: {:0.1} seconds within {:0.1} seconds...",
clear_time.as_millis() as f32 / 1000.0f32,
max_wait.as_millis() as f32 / 1000.0f32
);
let wait_start = Instant::now();
loop {
let wait_elapsed = wait_start.elapsed();
if wait_elapsed >= max_wait {
info!("Timed out while waiting for clear channel.");
return Err(TncError::TimedOut);
}
let mut wait_remaining = max_wait - wait_elapsed;
if !self.data_stream.state().busy() {
let clear_elapsed = self.data_stream.state().clear_time();
if clear_elapsed >= clear_time {
info!(
"Channel is CLEAR and READY for use at +{:0.1} seconds",
wait_elapsed.as_millis() as f32 / 1000.0f32
);
break;
}
let clear_remaining = clear_time - clear_elapsed;
wait_remaining = wait_remaining.min(clear_remaining)
}
match self.next_state_change_timeout(wait_remaining).await {
Err(TncError::TimedOut) => continue,
Err(_e) => return Err(_e),
Ok(ConnectionStateChange::Busy(busy)) => {
if busy {
info!(
"Channel is BUSY at +{:0.1} seconds",
wait_start.elapsed().as_millis() as f32 / 1000.0f32
);
} else {
info!(
"Channel is CLEAR at +{:0.1} seconds",
wait_start.elapsed().as_millis() as f32 / 1000.0f32
);
}
}
_ => continue,
}
}
Ok(())
}
pub async fn ping<S>(
&mut self,
target: S,
attempts: u16,
clear_time: Duration,
clear_max_wait: Duration,
) -> TncResult<Result<PingAck, PingFailedReason>>
where
S: Into<String>,
{
if attempts <= 0 {
return Ok(Err(PingFailedReason::NoAnswer));
}
match self.await_clear(clear_time, clear_max_wait).await {
Ok(_ok) => {}
Err(TncError::TimedOut) => return Ok(Err(PingFailedReason::Busy)),
Err(e) => return Err(e),
}
let target_string = target.into();
info!("Pinging {} ({} attempts)...", &target_string, attempts);
self.command(command::ping(target_string.clone(), attempts))
.await?;
let timeout_seconds = attempts as u64 * TIMEOUT_PING.as_secs();
let start = Instant::now();
loop {
match self.next_state_change_timeout(TIMEOUT_PING.clone()).await {
Err(TncError::TimedOut) => { }
Ok(ConnectionStateChange::PingAck(snr, quality)) => {
let ack = PingAck::new(target_string, snr, quality);
info!("{}", &ack);
return Ok(Ok(ack));
}
Err(e) => return Err(e),
_ => { }
}
if start.elapsed().as_secs() >= timeout_seconds {
break;
}
}
info!("Ping {}: ping timeout", &target_string);
Ok(Err(PingFailedReason::NoAnswer))
}
pub async fn connect<S>(
&mut self,
target: S,
bw: u16,
bw_forced: bool,
attempts: u16,
clear_time: Duration,
clear_max_wait: Duration,
) -> TncResult<Result<ConnectionInfo, ConnectionFailedReason>>
where
S: Into<String>,
{
let target_string = target.into();
match self.await_clear(clear_time, clear_max_wait).await {
Ok(_clear) => { }
Err(TncError::TimedOut) => return Ok(Err(ConnectionFailedReason::Busy)),
Err(e) => return Err(e),
}
self.command(command::listen(false)).await?;
self.command(command::protocolmode(ProtocolMode::ARQ))
.await?;
self.command(command::arqbw(bw, bw_forced)).await?;
info!(
"Connecting to {}: dialing at {} Hz BW...",
&target_string, bw
);
match self
.command(command::arqcall(target_string.clone(), attempts))
.await
{
Ok(()) => { }
Err(e) => {
error!(
"Connection to {} failed: TNC rejected request: {}.",
&target_string, &e
);
return Err(e);
}
}
loop {
match self.next_state_change().await? {
ConnectionStateChange::Connected(info) => {
info!("CONNECTED {}", &info);
return Ok(Ok(info));
}
ConnectionStateChange::Failed(fail) => {
info!("Connection to {} failed: {}", &target_string, &fail);
return Ok(Err(fail));
}
ConnectionStateChange::Closed => {
info!("Connection to {} failed: not connected", &target_string);
return Ok(Err(ConnectionFailedReason::NoAnswer));
}
_ => { }
}
}
}
pub async fn listen(&mut self, bw: u16, bw_forced: bool) -> TncResult<ConnectionInfo> {
loop {
match self.listen_monitor(bw, bw_forced).await? {
ConnectionInfoOrPeerDiscovery::Connection(conn_info) => return Ok(conn_info),
_ => continue,
}
}
}
pub async fn monitor(&mut self) -> TncResult<DiscoveredPeer> {
self.command(command::protocolmode(ProtocolMode::ARQ))
.await?;
self.command(command::listen(true)).await?;
info!("Monitoring for available peers...");
loop {
match self.next_state_change().await? {
ConnectionStateChange::IdentityFrame(call, grid) => {
let peer = DiscoveredPeer::new(call, None, grid);
info!("ID frame: {}", peer);
return Ok(peer);
}
ConnectionStateChange::Ping(src, _dst, snr, _quality) => {
let peer = DiscoveredPeer::new(src, Some(snr), None);
info!("Ping: {}", peer);
return Ok(peer);
}
_ => continue,
}
}
}
pub async fn listen_monitor(
&mut self,
bw: u16,
bw_forced: bool,
) -> TncResult<ConnectionInfoOrPeerDiscovery> {
self.command(command::protocolmode(ProtocolMode::ARQ))
.await?;
self.command(command::arqbw(bw, bw_forced)).await?;
self.command(command::listen(true)).await?;
info!("Listening for {} at {} Hz...", self.mycall(), bw);
loop {
match self.next_state_change().await? {
ConnectionStateChange::Connected(info) => {
info!("CONNECTED {}", &info);
self.command(command::listen(false)).await?;
return Ok(ConnectionInfoOrPeerDiscovery::Connection(info));
}
ConnectionStateChange::Failed(fail) => {
info!("Incoming connection failed: {}", fail);
}
ConnectionStateChange::IdentityFrame(call, grid) => {
let peer = DiscoveredPeer::new(call, None, grid);
info!("ID frame: {}", peer);
return Ok(ConnectionInfoOrPeerDiscovery::PeerDiscovery(peer));
}
ConnectionStateChange::Ping(src, _dst, snr, _quality) => {
let peer = DiscoveredPeer::new(src, Some(snr), None);
info!("Ping: {}", peer);
return Ok(ConnectionInfoOrPeerDiscovery::PeerDiscovery(peer));
}
ConnectionStateChange::Closed => {
info!("Incoming connection failed: not connected");
}
_ => continue,
}
}
}
pub async fn disconnect(&mut self) {
match self.command(command::disconnect()).await {
Ok(()) => { }
Err(_e) => return,
};
for _i in 0..2 {
match self
.next_state_change_timeout(TIMEOUT_DISCONNECT.clone())
.await
{
Err(_timeout) => {
warn!("Disconnect timed out. Trying to abort.");
let _ = self.command(command::abort()).await;
continue;
}
Ok(ConnectionStateChange::Closed) => {
break;
}
_ => { }
}
}
}
pub fn poll_disconnect(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
match ready!(execute_disconnect(
&mut self.disconnect_progress,
cx,
&mut self.control_out,
&mut self.control_in_res,
&mut self.data_stream,
)) {
Ok(()) => Poll::Ready(Ok(())),
Err(TncError::IoError(e)) => Poll::Ready(Err(e)),
Err(_tnc_err) => Poll::Ready(Err(connection_reset_err())),
}
}
pub async fn version(&mut self) -> TncResult<String> {
let cmd = command::version();
let vers = execute_command(
&mut self.control_out,
&mut self.control_in_res,
&self.control_timeout,
cmd,
)
.await?;
match vers.1 {
None => Ok("".to_owned()),
Some(v) => Ok(v),
}
}
async fn initialize(&mut self) -> TncResult<()> {
self.data_stream.state_mut().reset();
self.command(command::initialize()).await?;
self.command(command::listen(false)).await?;
self.command(command::protocolmode(ProtocolMode::FEC))
.await?;
self.command(command::mycall(self.mycall().clone())).await?;
self.command(command::busyblock(true)).await?;
Ok(())
}
async fn next_state_change(&mut self) -> TncResult<ConnectionStateChange> {
self.next_state_change_timeout(Duration::from_secs(0)).await
}
async fn next_state_change_timeout(
&mut self,
timeout: Duration,
) -> TncResult<ConnectionStateChange> {
loop {
let res = if timeout == Duration::from_secs(0) {
self.data_stream.next().await
} else {
self.data_stream.next().timeout(timeout.clone()).await?
};
match res {
None => return Err(TncError::IoError(connection_reset_err())),
Some(DataEvent::Event(event)) => return Ok(event),
Some(DataEvent::Data(DataIn::IDF(peer_call, peer_grid))) => {
return Ok(ConnectionStateChange::IdentityFrame(peer_call, peer_grid))
}
Some(_data) => { }
}
}
}
}
impl<I> Unpin for AsyncTnc<I> where I: AsyncRead + AsyncWrite + Unpin + Send {}
async fn execute_command<'d, W, R, F>(
outp: &'d mut W,
inp: &'d mut R,
timeout: &'d Duration,
cmd: Command<F>,
) -> TncResult<CommandOk>
where
W: Sink<String> + Unpin,
R: Stream<Item = CommandResult> + Unpin,
F: fmt::Display,
{
let send_raw = cmd.to_string();
debug!("Sending TNC command: {}", &send_raw);
let _ = outp.send(send_raw).timeout(timeout.clone()).await?;
match inp.next().timeout(timeout.clone()).await {
Err(_timeout) => Err(TncError::IoError(connection_timeout_err())),
Ok(None) => Err(TncError::IoError(connection_reset_err())),
Ok(Some(Err(badcmd))) => Err(TncError::CommandFailed(badcmd)),
Ok(Some(Ok((in_id, msg)))) => {
if in_id == *cmd.command_id() {
Ok((in_id, msg))
} else {
Err(TncError::IoError(command_response_invalid_err()))
}
}
}
}
#[derive(Debug, Eq, PartialEq, Clone)]
enum DisconnectProgress {
NoProgress,
SentDisconnect,
AckDisconnect,
}
fn execute_disconnect<K, S, E, Z>(
state: &mut DisconnectProgress,
cx: &mut Context<'_>,
ctrl_out: &mut K,
ctrl_in: &mut S,
evt_in: &mut E,
) -> Poll<TncResult<()>>
where
K: Sink<String, Error = Z> + Unpin,
S: Stream<Item = CommandResult> + Unpin,
E: Stream<Item = DataEvent> + Unpin,
crate::tnc::TncError: std::convert::From<Z>,
{
loop {
match state {
DisconnectProgress::NoProgress => {
ready!(Pin::new(&mut *ctrl_out).poll_ready(cx))?;
*state = DisconnectProgress::SentDisconnect;
Pin::new(&mut *ctrl_out).start_send(format!("{}", command::disconnect()))?;
}
DisconnectProgress::SentDisconnect => {
ready!(Pin::new(&mut *ctrl_out).poll_flush(cx))?;
match ready!(Pin::new(&mut *ctrl_in).poll_next(cx)) {
None => {
*state = DisconnectProgress::NoProgress;
return Poll::Ready(Err(TncError::IoError(connection_reset_err())));
}
Some(Err(_e)) => {
*state = DisconnectProgress::NoProgress;
return Poll::Ready(Ok(()));
}
Some(Ok((CommandID::DISCONNECT, _msg))) => {
*state = DisconnectProgress::AckDisconnect;
}
Some(_ok) => {
}
}
}
DisconnectProgress::AckDisconnect => {
match ready!(Pin::new(&mut *evt_in).poll_next(cx)) {
None => {
*state = DisconnectProgress::NoProgress;
return Poll::Ready(Err(TncError::IoError(connection_reset_err())));
}
Some(DataEvent::Event(ConnectionStateChange::Closed)) => {
*state = DisconnectProgress::NoProgress;
return Poll::Ready(Ok(()));
}
Some(_dataevt) => { }
}
}
}
}
}
fn connection_reset_err() -> io::Error {
io::Error::new(
io::ErrorKind::ConnectionReset,
"Lost connection to ARDOP TNC",
)
}
fn connection_timeout_err() -> io::Error {
io::Error::new(
io::ErrorKind::TimedOut,
"Lost connection to ARDOP TNC: command timed out",
)
}
fn command_response_invalid_err() -> io::Error {
io::Error::new(
io::ErrorKind::InvalidData,
"TNC sent an unsolicited or invalid command response",
)
}
#[cfg(test)]
mod test {
use super::*;
use futures::channel::mpsc;
use futures::io::Cursor;
use futures::sink;
use futures::stream;
use futures::task;
use crate::protocol::constants::CommandID;
#[test]
fn test_execute_command_good_response() {
async_std::task::block_on(async {
let cmd_out = command::listen(true);
let res_in: Vec<CommandResult> = vec![Ok((CommandID::LISTEN, None))];
let mut sink_out = sink::drain();
let mut stream_in = stream::iter(res_in.into_iter());
let timeout = Duration::from_secs(10);
let res = execute_command(&mut sink_out, &mut stream_in, &timeout, cmd_out).await;
match res {
Ok((CommandID::LISTEN, None)) => assert!(true),
_ => assert!(false),
}
})
}
#[test]
fn test_execute_command_bad_response() {
async_std::task::block_on(async {
let cmd_out = command::listen(true);
let res_in: Vec<CommandResult> = vec![Ok((CommandID::VERSION, None))];
let mut sink_out = sink::drain();
let mut stream_in = stream::iter(res_in.into_iter());
let timeout = Duration::from_secs(10);
let res = execute_command(&mut sink_out, &mut stream_in, &timeout, cmd_out).await;
match res {
Err(TncError::IoError(e)) => assert_eq!(io::ErrorKind::InvalidData, e.kind()),
_ => assert!(false),
}
})
}
#[test]
fn test_execute_command_eof() {
async_std::task::block_on(async {
let cmd_out = command::listen(true);
let res_in: Vec<CommandResult> = vec![];
let mut sink_out = sink::drain();
let mut stream_in = stream::iter(res_in.into_iter());
let timeout = Duration::from_secs(10);
let res = execute_command(&mut sink_out, &mut stream_in, &timeout, cmd_out).await;
match res {
Err(TncError::IoError(e)) => assert_eq!(e.kind(), io::ErrorKind::ConnectionReset),
_ => assert!(false),
}
})
}
#[test]
fn test_execute_command_timeout() {
async_std::task::block_on(async {
let cmd_out = command::listen(true);
let mut sink_out = sink::drain();
let mut stream_in = stream::once(futures::future::pending());
let timeout = Duration::from_micros(2);
let res = execute_command(&mut sink_out, &mut stream_in, &timeout, cmd_out).await;
match res {
Err(TncError::IoError(e)) => assert_eq!(io::ErrorKind::TimedOut, e.kind()),
_ => assert!(false),
}
})
}
#[test]
fn test_streams() {
async_std::task::block_on(async {
let stream_ctrl = Cursor::new(b"BUSY FALSE\rINPUTPEAKS BLAH\rREJECTEDBW\r".to_vec());
let stream_data = Cursor::new(b"\x00\x08ARQHELLO\x00\x0BIDFID: W1AW".to_vec());
let mut tnc = AsyncTnc::new_from_streams(stream_ctrl, stream_data, "W1AW");
futures::executor::block_on(async {
match tnc.data_stream_sink().next().await {
Some(DataEvent::Data(_d)) => assert!(true),
_ => assert!(false),
}
match tnc.data_stream_sink().next().await {
Some(DataEvent::Data(DataIn::IDF(_i0, _i1))) => assert!(true),
_ => assert!(false),
}
match tnc.data_stream_sink().next().await {
Some(DataEvent::Event(ConnectionStateChange::Busy(false))) => assert!(true),
_ => assert!(false),
}
match tnc.data_stream_sink().next().await {
Some(DataEvent::Event(ConnectionStateChange::Failed(
ConnectionFailedReason::IncompatibleBandwidth,
))) => assert!(true),
_ => assert!(false),
}
assert!(tnc.data_stream_sink().next().await.is_none());
});
})
}
#[test]
fn test_execute_disconnect() {
async_std::task::block_on(async {
let mut cx = Context::from_waker(task::noop_waker_ref());
let (mut ctrl_out_snd, mut ctrl_out_rx) = mpsc::unbounded();
let (ctrl_in_snd, mut ctrl_in_rx) = mpsc::unbounded();
let (evt_in_snd, mut evt_in_rx) = mpsc::unbounded();
let mut state = DisconnectProgress::NoProgress;
ctrl_in_snd
.unbounded_send(Err("not from state".to_owned()))
.unwrap();
match execute_disconnect(
&mut state,
&mut cx,
&mut ctrl_out_snd,
&mut ctrl_in_rx,
&mut evt_in_rx,
) {
Poll::Ready(Ok(())) => assert!(true),
_ => assert!(false),
}
assert_eq!(DisconnectProgress::NoProgress, state);
state = DisconnectProgress::NoProgress;
match execute_disconnect(
&mut state,
&mut cx,
&mut ctrl_out_snd,
&mut ctrl_in_rx,
&mut evt_in_rx,
) {
Poll::Pending => assert!(true),
_ => assert!(false),
}
assert_eq!(DisconnectProgress::SentDisconnect, state);
let _ = ctrl_out_rx.try_next().unwrap();
ctrl_in_snd
.unbounded_send(Ok((CommandID::DISCONNECT, None)))
.unwrap();
match execute_disconnect(
&mut state,
&mut cx,
&mut ctrl_out_snd,
&mut ctrl_in_rx,
&mut evt_in_rx,
) {
Poll::Pending => assert!(true),
_ => assert!(false),
}
assert_eq!(DisconnectProgress::AckDisconnect, state);
evt_in_snd
.unbounded_send(DataEvent::Event(ConnectionStateChange::SendBuffer(0)))
.unwrap();
evt_in_snd
.unbounded_send(DataEvent::Event(ConnectionStateChange::Closed))
.unwrap();
match execute_disconnect(
&mut state,
&mut cx,
&mut ctrl_out_snd,
&mut ctrl_in_rx,
&mut evt_in_rx,
) {
Poll::Ready(Ok(())) => assert!(true),
_ => assert!(false),
}
assert_eq!(DisconnectProgress::NoProgress, state);
})
}
}