1pub mod address_pool;
2mod connection;
3
4use std::net::{IpAddr, SocketAddr};
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::auth::AuthServer;
9use crate::server::connection::QuincyConnection;
10use crate::users_file::UsersFileServerAuthenticator;
11use bytes::Bytes;
12use dashmap::DashMap;
13use futures::stream::FuturesUnordered;
14use futures::StreamExt;
15use quincy::config::ServerConfig;
16use quincy::network::socket::bind_socket;
17use quincy::Result;
18use quinn::{Endpoint, VarInt};
19use tokio::signal;
20use tokio::sync::mpsc::{channel, Receiver, Sender};
21
22use self::address_pool::AddressPool;
23use quincy::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
24use quincy::network::interface::{Interface, InterfaceIO};
25use quincy::network::packet::Packet;
26use quincy::utils::tasks::abort_all;
27use tracing::{debug, info, warn};
28
29type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;
30
31pub struct QuincyServer {
33 config: ServerConfig,
34 connection_queues: ConnectionQueues,
35 address_pool: Arc<AddressPool>,
36}
37
38impl QuincyServer {
39 pub fn new(config: ServerConfig) -> Result<Self> {
44 let address_pool = AddressPool::new(config.tunnel_network);
45
46 Ok(Self {
47 config,
48 connection_queues: Arc::new(DashMap::new()),
49 address_pool: Arc::new(address_pool),
50 })
51 }
52
53 pub async fn run<I: InterfaceIO>(&self) -> Result<()> {
55 let interface: Interface<I> = Interface::create(
56 self.config.tunnel_network,
57 self.config.connection.mtu,
58 Some(self.config.tunnel_network.network()),
59 self.config.interface_name.clone(),
60 None,
61 None,
62 )?;
63 let interface = Arc::new(interface);
64
65 let authenticator = Box::new(UsersFileServerAuthenticator::new(
66 &self.config.authentication,
67 self.address_pool.clone(),
68 )?);
69 let auth_server = AuthServer::new(
70 authenticator,
71 self.config.tunnel_network,
72 Duration::from_secs(self.config.connection.connection_timeout_s),
73 );
74
75 let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);
76
77 let mut tasks = FuturesUnordered::new();
78
79 tasks.extend([
80 tokio::spawn(Self::process_outbound_traffic(
81 interface.clone(),
82 self.connection_queues.clone(),
83 )),
84 tokio::spawn(Self::process_inbound_traffic(
85 self.connection_queues.clone(),
86 interface,
87 receiver,
88 self.config.isolate_clients,
89 )),
90 ]);
91
92 let handler_task = self.handle_connections(auth_server, sender);
93
94 let result = tokio::select! {
95 handler_task_result = handler_task => handler_task_result,
96 Some(task_result) = tasks.next() => task_result?,
97 };
98
99 let _ = abort_all(tasks).await;
100
101 result
102 }
103
104 async fn handle_connections(
110 &self,
111 auth_server: AuthServer,
112 ingress_queue: Sender<Packet>,
113 ) -> Result<()> {
114 let endpoint = self.create_quinn_endpoint()?;
115
116 info!(
117 "Starting connection handler: {}",
118 endpoint.local_addr().expect("Endpoint has a local address")
119 );
120
121 let mut authentication_tasks = FuturesUnordered::new();
122 let mut connection_tasks = FuturesUnordered::new();
123
124 loop {
125 tokio::select! {
126 Some(handshake) = endpoint.accept() => {
128 let client_ip = handshake.remote_address().ip();
129
130 debug!(
131 "Received incoming connection from '{}'",
132 client_ip
133 );
134
135 let quic_connection = match handshake.await {
136 Ok(connection) => connection,
137 Err(e) => {
138 warn!("Connection handshake with client '{client_ip}' failed: {e}");
139 continue;
140 }
141 };
142
143 let connection = QuincyConnection::new(
144 quic_connection,
145 ingress_queue.clone(),
146 );
147
148 authentication_tasks.push(
149 connection.authenticate(&auth_server)
150 );
151 }
152
153 Some(connection) = authentication_tasks.next() => {
155 let connection = match connection {
156 Ok(connection) => connection,
157 Err(e) => {
158 warn!("Failed to authenticate client: {e}");
159 continue;
160 }
161 };
162
163 let client_address = connection.client_address()?.addr();
164 let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);
165
166 connection_tasks.push(tokio::spawn(connection.run(connection_receiver)));
167 self.connection_queues.insert(client_address, connection_sender);
168 }
169
170 Some(connection) = connection_tasks.next() => {
172 let (connection, err) = connection?;
173 let client_address = &connection.client_address()?.addr();
174
175 self.connection_queues.remove(client_address);
176 self.address_pool.release_address(client_address);
177 warn!("Connection with client {client_address} has encountered an error: {err}");
178 }
179
180 _ = signal::ctrl_c() => {
182 info!("Received shutdown signal, shutting down");
183 let _ = abort_all(connection_tasks).await;
184
185 endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes());
186
187 return Ok(());
188 }
189 }
190 }
191 }
192
193 fn create_quinn_endpoint(&self) -> Result<Endpoint> {
195 let quinn_config = self.config.as_quinn_server_config()?;
196
197 let socket = bind_socket(
198 SocketAddr::new(self.config.bind_address, self.config.bind_port),
199 self.config.connection.send_buffer_size as usize,
200 self.config.connection.recv_buffer_size as usize,
201 self.config.reuse_socket,
202 )?;
203
204 let endpoint_config = self.config.connection.as_endpoint_config()?;
205 let endpoint = Endpoint::new(
206 endpoint_config,
207 Some(quinn_config),
208 socket,
209 QUINN_RUNTIME.clone(),
210 )?;
211
212 Ok(endpoint)
213 }
214
215 async fn process_outbound_traffic(
222 interface: Arc<Interface<impl InterfaceIO>>,
223 connection_queues: ConnectionQueues,
224 ) -> Result<()> {
225 debug!("Started tunnel outbound traffic task (interface -> connection queue)");
226
227 loop {
228 let packet = interface.read_packet().await?;
229 let dest_addr = match packet.destination() {
230 Ok(addr) => addr,
231 Err(e) => {
232 warn!("Received packet with malformed header structure: {e}");
233 continue;
234 }
235 };
236
237 debug!("Destination address for packet: {dest_addr}");
238
239 let connection_queue = match connection_queues.get(&dest_addr) {
240 Some(connection_queue) => connection_queue,
241 None => continue,
242 };
243
244 debug!("Found connection for IP {dest_addr}");
245
246 connection_queue.send(packet.into()).await?;
247 }
248 }
249
250 async fn process_inbound_traffic(
258 connection_queues: ConnectionQueues,
259 interface: Arc<Interface<impl InterfaceIO>>,
260 ingress_queue: Receiver<Packet>,
261 isolate_clients: bool,
262 ) -> Result<()> {
263 debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");
264
265 if isolate_clients {
266 relay_isolated(connection_queues, interface, ingress_queue).await
267 } else {
268 relay_unisolated(connection_queues, interface, ingress_queue).await
269 }
270 }
271}
272
273#[inline]
274async fn relay_isolated(
275 connection_queues: ConnectionQueues,
276 interface: Arc<Interface<impl InterfaceIO>>,
277 mut ingress_queue: Receiver<Packet>,
278) -> Result<()> {
279 loop {
280 let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
281 ingress_queue
282 .recv_many(&mut packets, PACKET_BUFFER_SIZE)
283 .await;
284
285 let filtered_packets = packets
286 .into_iter()
287 .filter(|packet| {
288 let dest_addr = match packet.destination() {
289 Ok(addr) => addr,
290 Err(e) => {
291 warn!("Received packet with malformed header structure: {e}");
292 return false;
293 }
294 };
295 !connection_queues.contains_key(&dest_addr)
296 })
297 .collect::<Vec<_>>();
298
299 interface.write_packets(filtered_packets).await?;
300 }
301}
302
303#[inline]
304async fn relay_unisolated(
305 connection_queues: ConnectionQueues,
306 interface: Arc<Interface<impl InterfaceIO>>,
307 mut ingress_queue: Receiver<Packet>,
308) -> Result<()> {
309 loop {
310 let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
311
312 ingress_queue
313 .recv_many(&mut packets, PACKET_BUFFER_SIZE)
314 .await;
315
316 for packet in packets {
317 let dest_addr = match packet.destination() {
318 Ok(addr) => addr,
319 Err(e) => {
320 warn!("Received packet with malformed header structure: {e}");
321 continue;
322 }
323 };
324
325 match connection_queues.get(&dest_addr) {
326 Some(connection_queue) => connection_queue.send(packet.into()).await?,
328 None => interface.write_packet(packet).await?,
330 }
331 }
332 }
333}