use crate::core::codec::PacketCodec;
use crate::core::packet::Packet;
use crate::error::Result;
use futures::SinkExt;
use futures::StreamExt;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::Framed;
use tracing::{debug, error, info, instrument};
#[instrument(skip(addr), fields(address = %addr))]
pub async fn start_server(addr: &str) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
info!(address = %addr, "Server listening");
loop {
let (stream, peer) = listener.accept().await?;
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, peer).await {
error!(error = %e, peer = %peer, "Connection error");
}
});
}
}
#[instrument(skip(stream), fields(peer = %peer))]
async fn handle_connection(stream: TcpStream, peer: SocketAddr) -> Result<()> {
let mut framed = Framed::new(stream, PacketCodec);
info!("Client connected");
while let Some(packet) = framed.next().await {
match packet {
Ok(pkt) => {
debug!(bytes = pkt.payload.len(), "Packet received");
on_packet(pkt, &mut framed).await?;
}
Err(e) => {
error!(error = %e, "Protocol error");
break;
}
}
}
info!("Client disconnected");
Ok(())
}
#[instrument(skip(framed), fields(packet_version = pkt.version, payload_size = pkt.payload.len()))]
async fn on_packet(pkt: Packet, framed: &mut Framed<TcpStream, PacketCodec>) -> Result<()> {
let response = Packet {
version: pkt.version,
payload: pkt.payload,
};
framed.send(response).await?;
Ok(())
}
#[instrument(skip(addr), fields(address = %addr))]
pub async fn connect(addr: &str) -> Result<Framed<TcpStream, PacketCodec>> {
let stream = TcpStream::connect(addr).await?;
let framed = Framed::new(stream, PacketCodec);
Ok(framed)
}