network_protocol/transport/
remote.rs1use tokio::net::{TcpListener, TcpStream};
27use tokio_util::codec::Framed;
28use crate::core::codec::PacketCodec;
29use crate::core::packet::Packet;
30use crate::error::Result;
31use futures::StreamExt;
32use futures::SinkExt;
33use std::net::SocketAddr;
34use tracing::{info, error, debug, instrument};
35
36#[instrument(skip(addr), fields(address = %addr))]
38pub async fn start_server(addr: &str) -> Result<()> {
39 let listener = TcpListener::bind(addr).await?;
40
41 info!(address = %addr, "Server listening");
42
43 loop {
44 let (stream, peer) = listener.accept().await?;
45 tokio::spawn(async move {
46 if let Err(e) = handle_connection(stream, peer).await {
47 error!(error = %e, peer = %peer, "Connection error");
48 }
49 });
50 }
51}
52
53#[instrument(skip(stream), fields(peer = %peer))]
55async fn handle_connection(stream: TcpStream, peer: SocketAddr) -> Result<()> {
56 let mut framed = Framed::new(stream, PacketCodec);
57
58 info!("Client connected");
59
60 while let Some(packet) = framed.next().await {
61 match packet {
62 Ok(pkt) => {
63 debug!(bytes = pkt.payload.len(), "Packet received");
64 on_packet(pkt, &mut framed).await?;
65 }
66 Err(e) => {
67 error!(error = %e, "Protocol error");
68 break;
69 }
70 }
71 }
72
73 info!("Client disconnected");
74 Ok(())
75}
76
77#[instrument(skip(framed), fields(packet_version = pkt.version, payload_size = pkt.payload.len()))]
79async fn on_packet(pkt: Packet, framed: &mut Framed<TcpStream, PacketCodec>) -> Result<()> {
80 let response = Packet {
82 version: pkt.version,
83 payload: pkt.payload,
84 };
85
86 framed.send(response).await?;
87 Ok(())
88}
89
90#[instrument(skip(addr), fields(address = %addr))]
92pub async fn connect(addr: &str) -> Result<Framed<TcpStream, PacketCodec>> {
93 let stream = TcpStream::connect(addr).await?;
94 let framed = Framed::new(stream, PacketCodec);
95 Ok(framed)
96}