1pub mod address_pool;
2mod connection;
3pub mod session;
4
5#[cfg(feature = "metrics")]
6mod metrics;
7
8use std::collections::HashMap;
9use std::net::{IpAddr, SocketAddr};
10use std::sync::Arc;
11#[cfg(feature = "metrics")]
12use std::time::Duration;
13use std::time::Instant;
14
15use bytes::Bytes;
16use dashmap::DashMap;
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use quinn::{Endpoint, VarInt};
20use tokio::signal;
21use tokio::sync::mpsc::error::TrySendError;
22use tokio::sync::mpsc::{Receiver, Sender, channel};
23use tracing::{debug, info, warn};
24
25use crate::server::address_pool::AddressPoolManager;
26use crate::server::connection::{Assigned, QuincyConnection};
27use crate::server::session::{ConnectionSession, UserSessionRegistry};
28use crate::users::UsersFile;
29use quincy::Result;
30use quincy::config::{
31 AddressRange, AllowedNoiseKeys, NoiseKeyExchange, ServerConfig, ServerProtocolConfig,
32};
33use quincy::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
34use quincy::network::interface::{ActiveInterface, Interface, InterfaceIO};
35use quincy::network::packet::Packet;
36use quincy::network::socket::bind_socket;
37use quincy::utils::tasks::abort_all;
38
39type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;
41
42struct AssignmentResult {
44 result: Result<QuincyConnection<Assigned>>,
45 quic_connection: quinn::Connection,
46}
47
48pub struct QuincyServer {
50 config: ServerConfig,
51 connection_queues: ConnectionQueues,
52 address_pool: Arc<AddressPoolManager>,
53 users: Arc<UsersFile>,
54 session_registry: Arc<UserSessionRegistry>,
55}
56
57impl QuincyServer {
58 pub fn new(config: ServerConfig) -> Result<Self> {
65 let users = UsersFile::load(&config.users_file)?;
66
67 let user_pools: HashMap<String, Vec<AddressRange>> = users
68 .users
69 .iter()
70 .filter(|(_, entry)| !entry.address_pool.is_empty())
71 .map(|(name, entry)| (name.clone(), entry.address_pool.clone()))
72 .collect();
73
74 let address_pool = AddressPoolManager::new(config.tunnel_network, user_pools)?;
75
76 Ok(Self {
77 config,
78 connection_queues: Arc::new(DashMap::new()),
79 address_pool: Arc::new(address_pool),
80 users: Arc::new(users),
81 session_registry: Arc::new(UserSessionRegistry::new()),
82 })
83 }
84
85 pub async fn run<I: InterfaceIO>(&self) -> Result<()> {
87 let interface: Interface<I> = Interface::create(
88 self.config.tunnel_network,
89 self.config.connection.mtu,
90 Some(self.config.tunnel_network.network()),
91 self.config.interface_name.clone(),
92 None,
93 None,
94 None,
95 )?;
96 let interface = Arc::new(interface.configure()?);
97
98 #[cfg(feature = "metrics")]
99 if self.config.metrics.enabled {
100 use crate::server::metrics::init_metrics;
101
102 init_metrics(&self.config.metrics)?;
103 }
104
105 let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);
106
107 let mut tasks = FuturesUnordered::new();
108
109 tasks.extend([
110 tokio::spawn(Self::process_outbound_traffic(
111 interface.clone(),
112 self.connection_queues.clone(),
113 )),
114 tokio::spawn(Self::process_inbound_traffic(
115 self.connection_queues.clone(),
116 interface,
117 receiver,
118 self.config.isolate_clients,
119 )),
120 ]);
121
122 let handler_task = self.handle_connections(sender);
123
124 let result = tokio::select! {
125 handler_task_result = handler_task => handler_task_result,
126 Some(task_result) = tasks.next() => task_result?,
127 };
128
129 let _ = abort_all(tasks).await;
130
131 result
132 }
133
134 async fn handle_connections(&self, ingress_queue: Sender<Packet>) -> Result<()> {
139 let endpoint = self.create_quinn_endpoint()?;
140
141 info!(
142 "Starting connection handler: {}",
143 endpoint.local_addr().expect("Endpoint has a local address")
144 );
145
146 let protocol = Arc::new(self.config.protocol.clone());
147 let server_address = self.config.tunnel_network;
148 let users = self.users.clone();
149 let address_pool = self.address_pool.clone();
150 let session_registry = self.session_registry.clone();
151
152 let mut assignment_tasks = FuturesUnordered::new();
153 let mut connection_tasks = FuturesUnordered::new();
154
155 loop {
156 tokio::select! {
157 Some(handshake) = endpoint.accept() => {
159 let client_ip = handshake.remote_address().ip();
160
161 debug!(
162 "Received incoming connection from '{}'",
163 client_ip
164 );
165
166 let quic_connection = match handshake.await {
167 Ok(connection) => connection,
168 Err(e) => {
169 warn!("Connection handshake with client '{client_ip}' failed: {e}");
170 continue;
171 }
172 };
173
174 let quic_connection_clone = quic_connection.clone();
175 let connection = QuincyConnection::new(
176 quic_connection,
177 ingress_queue.clone(),
178 );
179
180 let connection = match connection.identify(&protocol, &users) {
182 Ok(conn) => conn,
183 Err(e) => {
184 warn!("Failed to identify client: {e}");
185 quic_connection_clone.close(VarInt::from_u32(0x02), "Session establishment failed".as_bytes());
186 continue;
187 }
188 };
189
190 let address_pool = address_pool.clone();
191 let server_addr = server_address;
192
193 assignment_tasks.push(async move {
194 let result = connection.assign_ip(&address_pool, server_addr).await;
195 AssignmentResult {
196 result,
197 quic_connection: quic_connection_clone,
198 }
199 });
200 }
201
202 Some(assignment) = assignment_tasks.next() => {
204 let connection = match assignment.result {
205 Ok(connection) => connection,
206 Err(e) => {
207 warn!("Failed to assign IP to client: {e}");
208 assignment.quic_connection.close(
209 VarInt::from_u32(0x02),
210 "Session establishment failed".as_bytes(),
211 );
212 continue;
213 }
214 };
215
216 let client_address = connection.client_address();
217 let username = connection.username().to_string();
218
219 let bandwidth_limit = self
222 .users
223 .users
224 .get(&username)
225 .and_then(|entry| entry.bandwidth_limit)
226 .or(self.config.default_bandwidth_limit);
227
228 let rate_limiter = session_registry.add_connection(
230 &username,
231 ConnectionSession {
232 client_address,
233 connected_at: Instant::now(),
234 },
235 bandwidth_limit,
236 );
237
238 let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);
239
240 connection_tasks.push(tokio::spawn(connection.run(
241 connection_receiver,
242 rate_limiter,
243 #[cfg(feature = "metrics")]
244 Duration::from_secs(self.config.metrics.reporting_interval_s),
245 )));
246 self.connection_queues
247 .insert(client_address.addr(), connection_sender);
248 }
249
250 Some(connection) = connection_tasks.next() => {
252 let (connection, err) = connection?;
253 let username = connection.username();
254 let client_address = connection.client_address();
255
256 self.connection_queues.remove(&client_address.addr());
257 self.address_pool.release_address(username, &client_address.addr());
258 session_registry.remove_connection(username, &client_address);
259
260 warn!(
261 "Connection with client {} (user '{username}') has encountered an error: {err}",
262 client_address.addr()
263 );
264 }
265
266 _ = signal::ctrl_c() => {
268 info!("Received shutdown signal, shutting down");
269 let _ = abort_all(connection_tasks).await;
270
271 endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes());
272
273 return Ok(());
274 }
275 }
276 }
277 }
278
279 fn create_quinn_endpoint(&self) -> Result<Endpoint> {
281 let (allowed_keys, allowed_fingerprints) = match &self.config.protocol {
283 ServerProtocolConfig::Noise(noise) => {
284 let keys = match noise.key_exchange {
285 NoiseKeyExchange::Standard => Some(AllowedNoiseKeys::Standard(
286 self.users.collect_noise_public_keys(),
287 )),
288 NoiseKeyExchange::Hybrid => Some(AllowedNoiseKeys::Hybrid(
289 self.users.collect_noise_pq_public_keys(),
290 )),
291 };
292 (keys, None)
293 }
294 ServerProtocolConfig::Tls(_) => (None, Some(self.users.collect_cert_fingerprints())),
295 };
296
297 let quinn_config = self
298 .config
299 .as_quinn_server_config(allowed_keys, allowed_fingerprints)?;
300
301 let socket = bind_socket(
302 SocketAddr::new(self.config.bind_address, self.config.bind_port),
303 self.config.connection.send_buffer_size as usize,
304 self.config.connection.recv_buffer_size as usize,
305 self.config.reuse_socket,
306 )?;
307
308 let endpoint_config = self
309 .config
310 .connection
311 .as_endpoint_config(self.config.noise_key_exchange())?;
312 let endpoint = Endpoint::new(
313 endpoint_config,
314 Some(quinn_config),
315 socket,
316 QUINN_RUNTIME.clone(),
317 )?;
318
319 Ok(endpoint)
320 }
321
322 async fn process_outbound_traffic(
328 interface: Arc<ActiveInterface<impl InterfaceIO>>,
329 connection_queues: ConnectionQueues,
330 ) -> Result<()> {
331 debug!("Started tunnel outbound traffic task (interface -> connection queue)");
332
333 loop {
334 let packet = interface.read_packet().await?;
335 let dest_addr = match packet.destination() {
336 Ok(addr) => addr,
337 Err(e) => {
338 warn!("Received packet with malformed header structure: {e}");
339 continue;
340 }
341 };
342
343 debug!("Destination address for packet: {dest_addr}");
344
345 let connection_queue = match connection_queues.get(&dest_addr) {
346 Some(connection_queue) => connection_queue,
347 None => continue,
348 };
349
350 debug!("Found connection for IP {dest_addr}");
351
352 match connection_queue.try_send(packet.into()) {
353 Ok(()) => {}
354 Err(TrySendError::Full(_)) => {
355 debug!("Dropping outbound packet for {dest_addr}: per-client queue full");
356 }
357 Err(TrySendError::Closed(_)) => {
358 debug!("Dropping outbound packet for {dest_addr}: connection closed");
359 }
360 }
361 }
362 }
363
364 async fn process_inbound_traffic(
372 connection_queues: ConnectionQueues,
373 interface: Arc<ActiveInterface<impl InterfaceIO>>,
374 ingress_queue: Receiver<Packet>,
375 isolate_clients: bool,
376 ) -> Result<()> {
377 debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");
378
379 if isolate_clients {
380 relay_isolated(connection_queues, interface, ingress_queue).await
381 } else {
382 relay_unisolated(connection_queues, interface, ingress_queue).await
383 }
384 }
385}
386
387#[inline]
388async fn relay_isolated(
389 connection_queues: ConnectionQueues,
390 interface: Arc<ActiveInterface<impl InterfaceIO>>,
391 mut ingress_queue: Receiver<Packet>,
392) -> Result<()> {
393 loop {
394 let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
395 let count = ingress_queue
396 .recv_many(&mut packets, PACKET_BUFFER_SIZE)
397 .await;
398
399 if count == 0 {
401 return Ok(());
402 }
403
404 let filtered_packets = packets
405 .into_iter()
406 .filter(|packet| {
407 let dest_addr = match packet.destination() {
408 Ok(addr) => addr,
409 Err(e) => {
410 warn!("Received packet with malformed header structure: {e}");
411 return false;
412 }
413 };
414 !connection_queues.contains_key(&dest_addr)
415 })
416 .collect::<Vec<_>>();
417
418 interface.write_packets(filtered_packets).await?;
419 }
420}
421
422#[inline]
423async fn relay_unisolated(
424 connection_queues: ConnectionQueues,
425 interface: Arc<ActiveInterface<impl InterfaceIO>>,
426 mut ingress_queue: Receiver<Packet>,
427) -> Result<()> {
428 loop {
429 let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
430
431 let count = ingress_queue
432 .recv_many(&mut packets, PACKET_BUFFER_SIZE)
433 .await;
434
435 if count == 0 {
437 return Ok(());
438 }
439
440 for packet in packets {
441 let dest_addr = match packet.destination() {
442 Ok(addr) => addr,
443 Err(e) => {
444 warn!("Received packet with malformed header structure: {e}");
445 continue;
446 }
447 };
448
449 match connection_queues.get(&dest_addr) {
450 Some(connection_queue) => match connection_queue.try_send(packet.into()) {
452 Ok(()) => {}
453 Err(TrySendError::Full(_)) => {
454 debug!("Dropping client-to-client packet for {dest_addr}: queue full");
455 }
456 Err(TrySendError::Closed(_)) => {
457 debug!(
458 "Dropping client-to-client packet for {dest_addr}: connection closed"
459 );
460 }
461 },
462 None => interface.write_packet(packet).await?,
464 }
465 }
466 }
467}