cubic_protocol_server/
server.rs1use std::net::SocketAddr;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use tokio::net::{TcpListener, TcpStream};
5use tokio::*;
6use cubic_protocol::packet::PacketState;
7use crate::connection::Connection;
8use crate::handler::{ConnectionHandler, ReadHandler};
9use crate::read::ReadStreamQueue;
10use crate::write::WriteStreamQueue;
11
12pub struct ProtocolServerDeclare<
13 H: ReadHandler + Sized + Send + Sync + 'static,
14 C: ConnectionHandler + Sized + Send + Sync + 'static,
15> {
16 pub host: String,
17 pub read_handler: H,
18 pub connection_handler: C,
19}
20
21pub struct ProtocolServerRuntime {
22 pub running: AtomicBool,
23}
24
25pub struct ProtocolServerTask {
26 pub runtime: Arc<ProtocolServerRuntime>,
27 pub task: task::JoinHandle<io::Result<()>>,
28}
29
30pub fn run_server<
31 H: ReadHandler + Sized + Send + Sync + 'static,
32 C: ConnectionHandler + Sized + Send + Sync + 'static
33>(declare: ProtocolServerDeclare<H, C>) -> ProtocolServerTask {
34 let runtime = Arc::new(
35 ProtocolServerRuntime {
36 running: AtomicBool::new(true),
37 }
38 );
39 let task_runtime = runtime.clone();
40 ProtocolServerTask {
41 task: tokio::spawn(async move {
42 run_server_runtime(declare, task_runtime).await
43 }),
44 runtime,
45 }
46}
47
48const CHANNEL_BUFFER_SIZE: usize = 128;
49const READ_BUFFER_SIZE: usize = 1024;
50
51async fn run_server_runtime<
52 H: ReadHandler + Sized + Send + Sync + 'static,
53 C: ConnectionHandler + Sized + Send + Sync + 'static
54>(declare: ProtocolServerDeclare<H, C>, runtime: Arc<ProtocolServerRuntime>) -> io::Result<()> {
55 let declare = Arc::new(declare);
56 let listener = TcpListener::bind(&declare.host).await?;
57 while runtime.running.load(Ordering::Acquire) {
58 let (stream, addr) = listener.accept().await?;
59 let declare = declare.clone();
60 let runtime = runtime.clone();
61 tokio::spawn(async move { run_connection(declare, runtime, stream, addr).await });
62 }
63 Ok(())
64}
65
66async fn run_connection<
67 H: ReadHandler + Sized + Send + Sync + 'static,
68 C: ConnectionHandler + Sized + Send + Sync + 'static
69>(declare: Arc<ProtocolServerDeclare<H, C>>, runtime: Arc<ProtocolServerRuntime>, stream: TcpStream, addr: SocketAddr) {
70 let (read_half, write_half) = stream.into_split();
71 let (sender, receiver) =
72 sync::mpsc::channel(CHANNEL_BUFFER_SIZE);
73 let connection = Arc::new(Connection::new(addr, sender));
74 declare.connection_handler.handle_connection(connection.clone());
75 let mut read_queue = ReadStreamQueue::<READ_BUFFER_SIZE>::new(read_half);
76 {
77 let write_queue = WriteStreamQueue { write_half, receiver };
78 let connection = connection.clone();
79 let declare = declare.clone();
80 tokio::spawn(async move { write_queue.run(connection, declare).await });
81 }
82 let mut state = PacketState::Handshake;
83 while runtime.running.load(Ordering::Acquire) {
84 if let Err(err) = read_queue.next_packet().await {
85 log::debug!("Received error while getting next packet: {:?}", err);
86 break;
87 }
88 if let Err(err) = declare.read_handler.handle(
89 connection.clone(), &mut state, &mut read_queue).await {
90 log::debug!("Received error while handling next packet: {:?}", err);
91 break;
92 }
93 }
94 let _ = connection.close().await;
95}