cubic_protocol_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use tokio::net::{TcpListener, TcpStream};
5use tokio::*;
6use cubic_protocol::packet::PacketState;
7use crate::connection::Connection;
8use crate::handler::{ConnectionHandler, ReadHandler};
9use crate::read::ReadStreamQueue;
10use crate::write::WriteStreamQueue;
11
12pub struct ProtocolServerDeclare<
13    H: ReadHandler + Sized + Send + Sync + 'static,
14    C: ConnectionHandler + Sized + Send + Sync + 'static,
15> {
16    pub host: String,
17    pub read_handler: H,
18    pub connection_handler: C,
19}
20
21pub struct ProtocolServerRuntime {
22    pub running: AtomicBool,
23}
24
25pub struct ProtocolServerTask {
26    pub runtime: Arc<ProtocolServerRuntime>,
27    pub task: task::JoinHandle<io::Result<()>>,
28}
29
30pub fn run_server<
31    H: ReadHandler + Sized + Send + Sync + 'static,
32    C: ConnectionHandler + Sized + Send + Sync + 'static
33>(declare: ProtocolServerDeclare<H, C>) -> ProtocolServerTask {
34    let runtime = Arc::new(
35        ProtocolServerRuntime {
36            running: AtomicBool::new(true),
37        }
38    );
39    let task_runtime = runtime.clone();
40    ProtocolServerTask {
41        task: tokio::spawn(async move {
42            run_server_runtime(declare, task_runtime).await
43        }),
44        runtime,
45    }
46}
47
48const CHANNEL_BUFFER_SIZE: usize = 128;
49const READ_BUFFER_SIZE: usize = 1024;
50
51async fn run_server_runtime<
52    H: ReadHandler + Sized + Send + Sync + 'static,
53    C: ConnectionHandler + Sized + Send + Sync + 'static
54>(declare: ProtocolServerDeclare<H, C>, runtime: Arc<ProtocolServerRuntime>) -> io::Result<()> {
55    let declare = Arc::new(declare);
56    let listener = TcpListener::bind(&declare.host).await?;
57    while runtime.running.load(Ordering::Acquire) {
58        let (stream, addr) = listener.accept().await?;
59        let declare = declare.clone();
60        let runtime = runtime.clone();
61        tokio::spawn(async move { run_connection(declare, runtime, stream, addr).await });
62    }
63    Ok(())
64}
65
66async fn run_connection<
67    H: ReadHandler + Sized + Send + Sync + 'static,
68    C: ConnectionHandler + Sized + Send + Sync + 'static
69>(declare: Arc<ProtocolServerDeclare<H, C>>, runtime: Arc<ProtocolServerRuntime>, stream: TcpStream, addr: SocketAddr) {
70    let (read_half, write_half) = stream.into_split();
71    let (sender, receiver) =
72        sync::mpsc::channel(CHANNEL_BUFFER_SIZE);
73    let connection = Arc::new(Connection::new(addr, sender));
74    declare.connection_handler.handle_connection(connection.clone());
75    let mut read_queue = ReadStreamQueue::<READ_BUFFER_SIZE>::new(read_half);
76    {
77        let write_queue = WriteStreamQueue { write_half, receiver };
78        let connection = connection.clone();
79        let declare = declare.clone();
80        tokio::spawn(async move { write_queue.run(connection, declare).await });
81    }
82    let mut state = PacketState::Handshake;
83    while runtime.running.load(Ordering::Acquire) {
84        if let Err(err) = read_queue.next_packet().await {
85            log::debug!("Received error while getting next packet: {:?}", err);
86            break;
87        }
88        if let Err(err) = declare.read_handler.handle(
89            connection.clone(), &mut state, &mut read_queue).await {
90            log::debug!("Received error while handling next packet: {:?}", err);
91            break;
92        }
93    }
94    let _ = connection.close().await;
95}