use dialectic::prelude::*;
use dialectic_tokio_serde::codec::LengthDelimitedCodec;
use dialectic_tokio_serde_bincode::Bincode;
use colored::*;
use std::{error::Error, fmt::Debug, future::Future, io, process};
use structopt::StructOpt;
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, Stdin, Stdout},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener, TcpStream,
},
};
#[allow(unused)]
pub async fn prompt<T, E, R, W>(
prompt: &str,
input: &mut R,
output: &mut W,
mut parse: impl FnMut(&str) -> Result<T, E>,
) -> io::Result<T>
where
R: AsyncBufRead + Unpin,
W: AsyncWrite + Unpin,
{
loop {
output.write_all(prompt.as_bytes()).await?;
output.flush().await?;
let mut line = String::new();
if 0 == input.read_line(&mut line).await? {
break Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected end of input",
));
}
match parse(line.trim()) {
Ok(t) => break Ok(t),
Err(_) => continue,
}
}
}
pub type TcpChan<S> = dialectic_tokio_serde::SymmetricalChan<
S,
Bincode,
LengthDelimitedCodec,
OwnedWriteHalf,
OwnedReadHalf,
>;
fn wrap_socket<P>(socket: TcpStream, max_length: usize) -> TcpChan<P>
where
P: Session,
{
let (rx, tx) = socket.into_split();
let (tx, rx) = dialectic_tokio_serde_bincode::length_delimited(tx, rx, 4, max_length);
P::wrap(tx, rx)
}
#[derive(Debug, Clone, StructOpt)]
struct TcpAppOptions {
#[structopt(subcommand)]
party: Party,
}
#[derive(Debug, Clone, StructOpt)]
enum Party {
Client {
#[structopt(short, long, default_value = "5000")]
port: u16,
#[structopt(short, long, default_value = "127.0.0.1")]
address: String,
},
Server {
#[structopt(short, long, default_value = "5000")]
port: u16,
#[structopt(short, long, default_value = "127.0.0.1")]
address: String,
},
}
pub async fn demo<P, Server, ServerFuture, ServerResult, Client, ClientFuture, ClientResult>(
server: &'static Server,
client: &'static Client,
max_length: usize,
) where
P: Session,
P::Dual: Session,
Server: Fn(TcpChan<P>) -> ServerFuture + Sync + 'static,
Client: Fn(BufReader<Stdin>, Stdout, TcpChan<P::Dual>) -> ClientFuture + Sync + 'static,
ServerFuture:
Future<Output = Result<ServerResult, Box<dyn Error>>> + std::marker::Send + 'static,
ClientFuture:
Future<Output = Result<ClientResult, Box<dyn Error>>> + std::marker::Send + 'static,
ServerResult: Debug,
ClientResult: Debug,
{
use Party::*;
let options = TcpAppOptions::from_args();
if let Err(e) = match options.party {
Server { port, ref address } => {
listen_on::<P, _, _, _>(address, port, max_length, server).await
}
Client { port, ref address } => {
connect_to::<P::Dual, _, _, _>(address, port, max_length, client).await
}
} {
let party_name = match options.party {
Server { .. } => "server",
Client { .. } => "client",
};
eprintln!("{}", format!("[{}] Error: {}", party_name, e).red());
process::exit(1);
}
}
async fn listen_on<P, F, Fut, T>(
address: &str,
port: u16,
max_length: usize,
interaction: &'static F,
) -> Result<(), Box<dyn Error>>
where
F: Fn(TcpChan<P>) -> Fut + Sync + 'static,
Fut: Future<Output = Result<T, Box<dyn Error>>> + std::marker::Send,
P: Session,
T: Debug,
{
let listener = TcpListener::bind((address, port)).await?;
println!(
"{}",
format!("[server] Listening on {:?}", listener.local_addr().unwrap()).blue()
);
loop {
let (socket, addr) = listener.accept().await?;
tokio::spawn(async move {
let _ = interaction(wrap_socket::<P>(socket, max_length))
.await
.map(|result| {
println!(
"{}",
format!("[server] {} - Result: {:?}", addr, result).green()
)
})
.map_err(|err| {
eprintln!("{}", format!("[server] {} - Error: {}", addr, err).red())
});
});
}
}
async fn connect_to<P, F, Fut, T>(
address: &str,
port: u16,
max_length: usize,
interaction: &'static F,
) -> Result<(), Box<dyn Error>>
where
F: Fn(BufReader<Stdin>, Stdout, TcpChan<P>) -> Fut + Sync + 'static,
Fut: Future<Output = Result<T, Box<dyn Error>>> + std::marker::Send,
P: Session,
T: Debug,
{
let socket = TcpStream::connect((address, port)).await?;
println!(
"{}",
format!(
"[client] Connected to {:?} from {:?}",
socket.peer_addr().unwrap(),
socket.local_addr().unwrap()
)
.blue()
);
let stdin = BufReader::new(tokio::io::stdin());
let stdout = tokio::io::stdout();
let result = interaction(stdin, stdout, wrap_socket::<P>(socket, max_length)).await?;
println!("{}", format!("[client] Result: {:?}", result).green());
Ok(())
}