use std::path::PathBuf;
use crate::common::format_version_to_display;
use crate::input::{parse_gamestate, Game, InitOptions};
use crate::net::common::StreamType;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, UnixListener};
use tokio::runtime::Runtime;
use tokio::sync::broadcast;
pub fn block_on_play(props: StreamType) -> std::io::Result<()> {
let rt = Runtime::new()?;
rt.block_on(play(props));
Ok(())
}
pub async fn play(props: StreamType) {
match parse_gamestate() {
Ok(input) => {
let mut options_passthrough = input.options.clone();
options_passthrough
.features_with_version
.insert("tcp-play-rust".to_string(), format_version_to_display());
println!("{}\r", serde_json::to_string(&options_passthrough).unwrap());
let (tx, _) = broadcast::channel::<Game>(10);
match props {
StreamType::Tcp(bind_addr) => {
tokio::spawn(create_tcp_server(
bind_addr,
tx.clone(),
options_passthrough.clone(),
));
}
StreamType::Socket(socket_path) => {
tokio::spawn(create_socket_server(
socket_path,
tx.clone(),
options_passthrough.clone(),
));
}
}
brodcast_lines(input.lines, tx.clone()).unwrap();
}
Err(e) => {
eprintln!("Error occurred while parsing stdin: \"{}\"", e);
std::process::exit(exitcode::DATAERR);
}
}
}
fn brodcast_lines(
lines: Box<dyn Iterator<Item = Game>>,
tx: broadcast::Sender<Game>,
) -> std::io::Result<()> {
for parsed_line in lines {
println!("{}\r", serde_json::to_string(&parsed_line).unwrap());
let _ = tx.send(parsed_line);
}
Ok(())
}
async fn create_tcp_server(
bind_addr: String,
tx: tokio::sync::broadcast::Sender<Game>,
init_options: InitOptions,
) -> std::io::Result<()> {
match TcpListener::bind(&bind_addr).await {
Ok(listener) => loop {
match listener.accept().await {
Ok((tcp_stream, _socket_addr)) => {
let tx = tx.clone();
let init_options = init_options.clone();
tokio::spawn(async move {
let _ = handle_client_task(tcp_stream, tx, init_options).await;
});
}
Err(_) => {}
}
},
Err(err) => {
if err.kind() == std::io::ErrorKind::AddrInUse {
eprintln!("{} already in use.\r", bind_addr);
std::process::exit(exitcode::IOERR);
} else {
eprintln!("Failed to launch TCP server.\r\n{:?}", err);
std::process::exit(exitcode::IOERR);
}
}
}
}
async fn create_socket_server(
socket_path: PathBuf,
tx: tokio::sync::broadcast::Sender<Game>,
init_options: InitOptions,
) -> std::io::Result<()> {
let listener = UnixListener::bind(socket_path)?;
loop {
match listener.accept().await {
Ok((socket_stream, _socket_addr)) => {
let tx = tx.clone();
let init_options = init_options.clone();
tokio::spawn(async move {
let _ = handle_client_task(socket_stream, tx, init_options).await;
});
}
Err(_) => {}
}
}
}
async fn handle_client_task(
mut stream: impl AsyncWriteExt + std::marker::Unpin,
tx: tokio::sync::broadcast::Sender<Game>,
init_options: InitOptions,
) -> std::io::Result<()> {
let mut rx = tx.subscribe();
stream
.write_all(format!("{}\r\n", serde_json::to_string(&init_options).unwrap()).as_bytes())
.await
.unwrap();
loop {
tokio::select! {
Ok(parsed_line) = rx.recv() => {
match stream.write_all(format!("{}\r\n", serde_json::to_string(&parsed_line).unwrap()).as_bytes()).await {
Ok(_) => {
continue;
}
Err(_) => {
break;
}
}
}
}
}
Ok(())
}