network_protocol/transport/
remote.rs

1//! # Remote Transport Layer
2//! 
3//! This file is part of the Network Protocol project.
4//!
5//! It defines the remote transport layer for network communication,
6//! including TCP-based client-server interactions.
7//!
8//! The remote transport layer is responsible for handling the actual data transmission
9//! between nodes in a network, ensuring that packets are sent and received correctly.
10//! 
11//! It abstracts the underlying network details,
12//! allowing higher-level protocol logic to focus on message routing and processing.
13//! 
14//! The remote transport layer is designed to be modular and extensible,
15//! supporting various transport mechanisms such as TCP, UDP, and custom protocols.
16//! 
17//! ## Responsibilities
18//! - Send and receive packets over the network
19//! - Handle connection management
20//! - Provide a unified interface for different transport protocols
21//! 
22//! This module is essential for processing protocol packets in a networked environment,
23//! ensuring correct parsing and serialization.
24//! 
25//! It is designed to be efficient, minimal, and easy to integrate into the protocol layer.
26use tokio::net::{TcpListener, TcpStream};
27use tokio_util::codec::Framed;
28use crate::core::codec::PacketCodec;
29use crate::core::packet::Packet;
30use crate::error::Result;
31use futures::StreamExt;
32use futures::SinkExt;
33use std::net::SocketAddr;
34use tracing::{info, error, debug, instrument};
35
36/// Starts a TCP server at the given address
37#[instrument(skip(addr), fields(address = %addr))]
38pub async fn start_server(addr: &str) -> Result<()> {
39    let listener = TcpListener::bind(addr).await?;
40
41    info!(address = %addr, "Server listening");
42
43    loop {
44        let (stream, peer) = listener.accept().await?;
45        tokio::spawn(async move {
46            if let Err(e) = handle_connection(stream, peer).await {
47                error!(error = %e, peer = %peer, "Connection error");
48            }
49        });
50    }
51}
52
53/// Handles a single client connection
54#[instrument(skip(stream), fields(peer = %peer))]
55async fn handle_connection(stream: TcpStream, peer: SocketAddr) -> Result<()> {
56    let mut framed = Framed::new(stream, PacketCodec);
57
58    info!("Client connected");
59
60    while let Some(packet) = framed.next().await {
61        match packet {
62            Ok(pkt) => {
63                debug!(bytes = pkt.payload.len(), "Packet received");
64                on_packet(pkt, &mut framed).await?;
65            }
66            Err(e) => {
67                error!(error = %e, "Protocol error");
68                break;
69            }
70        }
71    }
72
73    info!("Client disconnected");
74    Ok(())
75}
76
77/// Placeholder: handles incoming packets
78#[instrument(skip(framed), fields(packet_version = pkt.version, payload_size = pkt.payload.len()))]
79async fn on_packet(pkt: Packet, framed: &mut Framed<TcpStream, PacketCodec>) -> Result<()> {
80    // Echo the packet back
81    let response = Packet {
82        version: pkt.version,
83        payload: pkt.payload,
84    };
85
86    framed.send(response).await?;
87    Ok(())
88}
89
90/// Connect to a remote server and return the framed transport
91#[instrument(skip(addr), fields(address = %addr))]
92pub async fn connect(addr: &str) -> Result<Framed<TcpStream, PacketCodec>> {
93    let stream = TcpStream::connect(addr).await?;
94    let framed = Framed::new(stream, PacketCodec);
95    Ok(framed)
96}