use std::net::SocketAddr;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio::time::timeout;
use super::DccEvent;
pub async fn listen_for_chat(
id: String,
listener: TcpListener,
timeout_dur: Duration,
event_tx: mpsc::Sender<DccEvent>,
line_rx: mpsc::Receiver<String>,
) {
let accept_result = timeout(timeout_dur, listener.accept()).await;
match accept_result {
Ok(Ok((stream, _peer_addr))) => {
let _ = event_tx
.send(DccEvent::ChatConnected { id: id.clone() })
.await;
run_chat_session(id, stream, event_tx, line_rx).await;
}
Ok(Err(e)) => {
let _ = event_tx
.send(DccEvent::ChatError {
id,
error: format!("DCC CHAT accept error: {e}"),
})
.await;
}
Err(_elapsed) => {
let _ = event_tx
.send(DccEvent::ChatError {
id,
error: "DCC CHAT request timed out".to_owned(),
})
.await;
}
}
}
pub async fn connect_for_chat(
id: String,
addr: SocketAddr,
timeout_dur: Duration,
event_tx: mpsc::Sender<DccEvent>,
line_rx: mpsc::Receiver<String>,
) {
let connect_result = timeout(timeout_dur, TcpStream::connect(addr)).await;
match connect_result {
Ok(Ok(stream)) => {
let _ = event_tx
.send(DccEvent::ChatConnected { id: id.clone() })
.await;
run_chat_session(id, stream, event_tx, line_rx).await;
}
Ok(Err(e)) => {
let _ = event_tx
.send(DccEvent::ChatError {
id,
error: format!("DCC CHAT connect error: {e}"),
})
.await;
}
Err(_elapsed) => {
let _ = event_tx
.send(DccEvent::ChatError {
id,
error: "DCC CHAT connect timed out".to_owned(),
})
.await;
}
}
}
async fn run_chat_session(
id: String,
stream: TcpStream,
event_tx: mpsc::Sender<DccEvent>,
mut line_rx: mpsc::Receiver<String>,
) {
let (read_half, write_half) = stream.into_split();
let reader_id = id.clone();
let reader_tx = event_tx.clone();
let mut reader_handle = tokio::spawn(async move {
let mut reader = BufReader::new(read_half);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
let _ = reader_tx
.send(DccEvent::ChatClosed {
id: reader_id,
reason: None,
})
.await;
break;
}
Ok(_) => {
let text = line.trim_end_matches(['\n', '\r']).to_owned();
if text.is_empty() {
continue;
}
if let Some(action_text) = text
.strip_prefix("\x01ACTION ")
.and_then(|s| s.strip_suffix('\x01'))
{
let _ = reader_tx
.send(DccEvent::ChatAction {
id: reader_id.clone(),
text: action_text.to_owned(),
})
.await;
} else {
let _ = reader_tx
.send(DccEvent::ChatMessage {
id: reader_id.clone(),
text,
})
.await;
}
}
Err(e) => {
let _ = reader_tx
.send(DccEvent::ChatClosed {
id: reader_id,
reason: Some(format!("read error: {e}")),
})
.await;
break;
}
}
}
});
let writer_id = id.clone();
let writer_tx = event_tx.clone();
let mut writer_handle = tokio::spawn(async move {
let mut write_half = write_half;
while let Some(line) = line_rx.recv().await {
let mut data = line;
data.push('\n');
if let Err(e) = write_half.write_all(data.as_bytes()).await {
let _ = writer_tx
.send(DccEvent::ChatClosed {
id: writer_id,
reason: Some(format!("write error: {e}")),
})
.await;
return;
}
if let Err(e) = write_half.flush().await {
let _ = writer_tx
.send(DccEvent::ChatClosed {
id: writer_id,
reason: Some(format!("flush error: {e}")),
})
.await;
return;
}
}
});
tokio::select! {
biased;
_ = &mut reader_handle => {
writer_handle.abort();
}
_ = &mut writer_handle => {
reader_handle.abort();
}
}
}
pub fn parse_port_range(s: &str) -> (u16, u16) {
let s = s.trim();
if s.is_empty() || s == "0" {
return (0, 0);
}
let maybe_pair = if let Some((lo, hi)) = s.split_once(' ') {
Some((lo.trim(), hi.trim()))
} else {
s.split_once('-').map(|(lo, hi)| (lo.trim(), hi.trim()))
};
if let Some((lo_str, hi_str)) = maybe_pair {
match (lo_str.parse::<u16>(), hi_str.parse::<u16>()) {
(Ok(lo), Ok(hi)) => return (lo, hi),
_ => return (0, 0),
}
}
s.parse::<u16>().map_or((0, 0), |p| (p, p))
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;
use tokio::net::TcpListener;
#[test]
fn parse_port_range_zero() {
assert_eq!(parse_port_range("0"), (0, 0));
}
#[test]
fn parse_port_range_empty() {
assert_eq!(parse_port_range(""), (0, 0));
}
#[test]
fn parse_port_range_single() {
assert_eq!(parse_port_range("5000"), (5000, 5000));
}
#[test]
fn parse_port_range_space() {
assert_eq!(parse_port_range("1025 65535"), (1025, 65535));
}
#[test]
fn parse_port_range_dash() {
assert_eq!(parse_port_range("1025-65535"), (1025, 65535));
}
#[tokio::test]
async fn chat_session_connect_and_exchange() {
let server_listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))
.await
.expect("bind listener");
let server_addr = server_listener.local_addr().expect("local_addr");
let (client_event_tx, mut client_event_rx) = mpsc::channel::<DccEvent>(256);
let (client_line_tx, client_line_rx) = mpsc::channel::<String>(256);
let (server_event_tx, mut server_event_rx) = mpsc::channel::<DccEvent>(256);
let (server_line_tx, server_line_rx) = mpsc::channel::<String>(256);
tokio::spawn(connect_for_chat(
"client".to_owned(),
server_addr,
Duration::from_secs(5),
client_event_tx,
client_line_rx,
));
let (server_stream, _) = server_listener.accept().await.expect("server accept");
tokio::spawn(run_chat_session(
"server".to_owned(),
server_stream,
server_event_tx,
server_line_rx,
));
let event = client_event_rx.recv().await.expect("ChatConnected");
assert!(
matches!(event, DccEvent::ChatConnected { ref id } if id == "client"),
"expected ChatConnected, got {event:?}",
);
server_line_tx
.send("hello from server".to_owned())
.await
.expect("send plain");
let event = client_event_rx.recv().await.expect("ChatMessage");
assert!(
matches!(&event, DccEvent::ChatMessage { id, text }
if id == "client" && text == "hello from server"),
"expected ChatMessage, got {event:?}",
);
client_line_tx
.send("\x01ACTION waves\x01".to_owned())
.await
.expect("send action");
let event = server_event_rx.recv().await.expect("ChatAction");
assert!(
matches!(&event, DccEvent::ChatAction { id, text }
if id == "server" && text == "waves"),
"expected ChatAction, got {event:?}",
);
drop(server_line_tx);
let event = client_event_rx.recv().await.expect("ChatClosed");
assert!(
matches!(event, DccEvent::ChatClosed { ref id, reason: None } if id == "client"),
"expected ChatClosed with no reason, got {event:?}",
);
}
}