1pub mod address_pool;
2mod connection;
3pub mod session;
4
5#[cfg(feature = "metrics")]
6mod metrics;
7
8use std::collections::HashMap;
9use std::net::{IpAddr, SocketAddr};
10use std::sync::Arc;
11#[cfg(feature = "metrics")]
12use std::time::Duration;
13use std::time::Instant;
14
15use bytes::Bytes;
16use dashmap::DashMap;
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use quinn::{Endpoint, VarInt};
20use tokio::signal;
21use tokio::sync::mpsc::error::TrySendError;
22use tokio::sync::mpsc::{Receiver, Sender, channel};
23use tracing::{debug, info, warn};
24
25use crate::server::address_pool::AddressPoolManager;
26use crate::server::connection::{Assigned, QuincyConnection};
27use crate::server::session::{ConnectionSession, UserSessionRegistry};
28use crate::users::UsersFile;
29use quincy::Result;
30use quincy::config::{
31 AddressRange, AllowedNoiseKeys, NoiseKeyExchange, ServerConfig, ServerProtocolConfig,
32};
33use quincy::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
34use quincy::network::interface::{ActiveInterface, Interface, InterfaceIO};
35use quincy::network::packet::Packet;
36use quincy::network::socket::bind_socket;
37use quincy::utils::tasks::abort_all;
38
39type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;
41
42struct AssignmentResult {
44 result: Result<QuincyConnection<Assigned>>,
45 quic_connection: quinn::Connection,
46}
47
48pub struct QuincyServer {
50 config: ServerConfig,
51 connection_queues: ConnectionQueues,
52 address_pool: Arc<AddressPoolManager>,
53 users: Arc<UsersFile>,
54 session_registry: Arc<UserSessionRegistry>,
55}
56
57impl QuincyServer {
58 pub fn new(config: ServerConfig) -> Result<Self> {
65 let users = UsersFile::load(&config.users_file)?;
66
67 let user_pools: HashMap<String, Vec<AddressRange>> = users
68 .users
69 .iter()
70 .filter(|(_, entry)| !entry.address_pool.is_empty())
71 .map(|(name, entry)| (name.clone(), entry.address_pool.clone()))
72 .collect();
73
74 let address_pool = AddressPoolManager::new(config.tunnel_network, user_pools)?;
75
76 Ok(Self {
77 config,
78 connection_queues: Arc::new(DashMap::new()),
79 address_pool: Arc::new(address_pool),
80 users: Arc::new(users),
81 session_registry: Arc::new(UserSessionRegistry::new()),
82 })
83 }
84
85 pub async fn run<I: InterfaceIO>(&self) -> Result<()> {
87 let interface: Interface<I> = Interface::create(
88 self.config.tunnel_network,
89 self.config.connection.mtu,
90 Some(self.config.tunnel_network.network()),
91 self.config.interface_name.clone(),
92 None,
93 None,
94 None,
95 )?;
96 let interface = Arc::new(interface.configure()?);
97
98 #[cfg(feature = "metrics")]
99 if self.config.metrics.enabled {
100 use crate::server::metrics::init_metrics;
101
102 init_metrics(&self.config.metrics)?;
103 }
104
105 let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);
106
107 let mut tasks = FuturesUnordered::new();
108
109 tasks.extend([
110 tokio::spawn(Self::process_outbound_traffic(
111 interface.clone(),
112 self.connection_queues.clone(),
113 )),
114 tokio::spawn(Self::process_inbound_traffic(
115 self.connection_queues.clone(),
116 interface,
117 receiver,
118 self.config.isolate_clients,
119 )),
120 ]);
121
122 let handler_task = self.handle_connections(sender);
123
124 let result = tokio::select! {
125 handler_task_result = handler_task => handler_task_result,
126 Some(task_result) = tasks.next() => task_result?,
127 };
128
129 let _ = abort_all(tasks).await;
130
131 result
132 }
133
134 async fn handle_connections(&self, ingress_queue: Sender<Packet>) -> Result<()> {
139 let endpoint = self.create_quinn_endpoint()?;
140
141 info!(
142 "Starting connection handler: {}",
143 endpoint.local_addr().expect("Endpoint has a local address")
144 );
145
146 let protocol = Arc::new(self.config.protocol.clone());
147 let server_address = self.config.tunnel_network;
148 let users = self.users.clone();
149 let address_pool = self.address_pool.clone();
150 let session_registry = self.session_registry.clone();
151
152 let mut assignment_tasks = FuturesUnordered::new();
153 let mut connection_tasks = FuturesUnordered::new();
154
155 let shutdown = shutdown_signal();
156 tokio::pin!(shutdown);
157
158 loop {
159 tokio::select! {
160 Some(handshake) = endpoint.accept() => {
162 let client_ip = handshake.remote_address().ip();
163
164 debug!(
165 "Received incoming connection from '{}'",
166 client_ip
167 );
168
169 let quic_connection = match handshake.await {
170 Ok(connection) => connection,
171 Err(e) => {
172 warn!("Connection handshake with client '{client_ip}' failed: {e}");
173 continue;
174 }
175 };
176
177 let quic_connection_clone = quic_connection.clone();
178 let connection = QuincyConnection::new(
179 quic_connection,
180 ingress_queue.clone(),
181 );
182
183 let connection = match connection.identify(&protocol, &users) {
185 Ok(conn) => conn,
186 Err(e) => {
187 warn!("Failed to identify client: {e}");
188 quic_connection_clone.close(VarInt::from_u32(0x02), "Session establishment failed".as_bytes());
189 continue;
190 }
191 };
192
193 let address_pool = address_pool.clone();
194 let server_addr = server_address;
195
196 assignment_tasks.push(async move {
197 let result = connection.assign_ip(&address_pool, server_addr).await;
198 AssignmentResult {
199 result,
200 quic_connection: quic_connection_clone,
201 }
202 });
203 }
204
205 Some(assignment) = assignment_tasks.next() => {
207 let connection = match assignment.result {
208 Ok(connection) => connection,
209 Err(e) => {
210 warn!("Failed to assign IP to client: {e}");
211 assignment.quic_connection.close(
212 VarInt::from_u32(0x02),
213 "Session establishment failed".as_bytes(),
214 );
215 continue;
216 }
217 };
218
219 let client_address = connection.client_address();
220 let username = connection.username().to_string();
221
222 let bandwidth_limit = self
225 .users
226 .users
227 .get(&username)
228 .and_then(|entry| entry.bandwidth_limit)
229 .or(self.config.default_bandwidth_limit);
230
231 let rate_limiter = session_registry.add_connection(
233 &username,
234 ConnectionSession {
235 client_address,
236 connected_at: Instant::now(),
237 },
238 bandwidth_limit,
239 );
240
241 let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);
242
243 connection_tasks.push(tokio::spawn(connection.run(
244 connection_receiver,
245 rate_limiter,
246 #[cfg(feature = "metrics")]
247 Duration::from_secs(self.config.metrics.reporting_interval_s),
248 )));
249 self.connection_queues
250 .insert(client_address.addr(), connection_sender);
251 }
252
253 Some(connection) = connection_tasks.next() => {
255 let (connection, err) = connection?;
256 let username = connection.username();
257 let client_address = connection.client_address();
258
259 self.connection_queues.remove(&client_address.addr());
260 self.address_pool.release_address(username, &client_address.addr());
261 session_registry.remove_connection(username, &client_address);
262
263 warn!(
264 "Connection with client {} (user '{username}') has encountered an error: {err}",
265 client_address.addr()
266 );
267 }
268
269 shutdown_result = &mut shutdown => {
271 shutdown_result?;
272
273 info!("Received shutdown signal, shutting down");
274 let _ = abort_all(connection_tasks).await;
275
276 endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes());
277
278 return Ok(());
279 }
280 }
281 }
282 }
283
284 fn create_quinn_endpoint(&self) -> Result<Endpoint> {
286 let (allowed_keys, allowed_fingerprints) = match &self.config.protocol {
288 ServerProtocolConfig::Noise(noise) => {
289 let keys = match noise.key_exchange {
290 NoiseKeyExchange::Standard => Some(AllowedNoiseKeys::Standard(
291 self.users.collect_noise_public_keys(),
292 )),
293 NoiseKeyExchange::Hybrid => Some(AllowedNoiseKeys::Hybrid(
294 self.users.collect_noise_pq_public_keys(),
295 )),
296 };
297 (keys, None)
298 }
299 ServerProtocolConfig::Tls(_) => (None, Some(self.users.collect_cert_fingerprints())),
300 };
301
302 let quinn_config = self
303 .config
304 .as_quinn_server_config(allowed_keys, allowed_fingerprints)?;
305
306 let socket = bind_socket(
307 SocketAddr::new(self.config.bind_address, self.config.bind_port),
308 self.config.connection.send_buffer_size as usize,
309 self.config.connection.recv_buffer_size as usize,
310 self.config.reuse_socket,
311 )?;
312
313 let endpoint_config = self
314 .config
315 .connection
316 .as_endpoint_config(self.config.noise_key_exchange())?;
317 let endpoint = Endpoint::new(
318 endpoint_config,
319 Some(quinn_config),
320 socket,
321 QUINN_RUNTIME.clone(),
322 )?;
323
324 Ok(endpoint)
325 }
326
327 async fn process_outbound_traffic(
333 interface: Arc<ActiveInterface<impl InterfaceIO>>,
334 connection_queues: ConnectionQueues,
335 ) -> Result<()> {
336 debug!("Started tunnel outbound traffic task (interface -> connection queue)");
337
338 loop {
339 let packet = interface.read_packet().await?;
340 let dest_addr = match packet.destination() {
341 Ok(addr) => addr,
342 Err(e) => {
343 warn!("Received packet with malformed header structure: {e}");
344 continue;
345 }
346 };
347
348 debug!("Destination address for packet: {dest_addr}");
349
350 let connection_queue = match connection_queues.get(&dest_addr) {
351 Some(connection_queue) => connection_queue,
352 None => continue,
353 };
354
355 debug!("Found connection for IP {dest_addr}");
356
357 match connection_queue.try_send(packet.into()) {
358 Ok(()) => {}
359 Err(TrySendError::Full(_)) => {
360 debug!("Dropping outbound packet for {dest_addr}: per-client queue full");
361 }
362 Err(TrySendError::Closed(_)) => {
363 debug!("Dropping outbound packet for {dest_addr}: connection closed");
364 }
365 }
366 }
367 }
368
369 async fn process_inbound_traffic(
377 connection_queues: ConnectionQueues,
378 interface: Arc<ActiveInterface<impl InterfaceIO>>,
379 ingress_queue: Receiver<Packet>,
380 isolate_clients: bool,
381 ) -> Result<()> {
382 debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");
383
384 if isolate_clients {
385 relay_isolated(connection_queues, interface, ingress_queue).await
386 } else {
387 relay_unisolated(connection_queues, interface, ingress_queue).await
388 }
389 }
390}
391
392#[cfg(unix)]
393async fn shutdown_signal() -> Result<()> {
394 let mut interrupt = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
395 let mut terminate = signal::unix::signal(signal::unix::SignalKind::terminate())?;
396
397 tokio::select! {
398 _ = interrupt.recv() => {}
399 _ = terminate.recv() => {}
400 }
401
402 Ok(())
403}
404
405#[cfg(not(unix))]
406async fn shutdown_signal() -> Result<()> {
407 signal::ctrl_c().await?;
408 Ok(())
409}
410
411#[inline]
412async fn relay_isolated(
413 connection_queues: ConnectionQueues,
414 interface: Arc<ActiveInterface<impl InterfaceIO>>,
415 mut ingress_queue: Receiver<Packet>,
416) -> Result<()> {
417 loop {
418 let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
419 let count = ingress_queue
420 .recv_many(&mut packets, PACKET_BUFFER_SIZE)
421 .await;
422
423 if count == 0 {
425 return Ok(());
426 }
427
428 let filtered_packets = packets
429 .into_iter()
430 .filter(|packet| {
431 let dest_addr = match packet.destination() {
432 Ok(addr) => addr,
433 Err(e) => {
434 warn!("Received packet with malformed header structure: {e}");
435 return false;
436 }
437 };
438 !connection_queues.contains_key(&dest_addr)
439 })
440 .collect::<Vec<_>>();
441
442 interface.write_packets(filtered_packets).await?;
443 }
444}
445
446#[inline]
447async fn relay_unisolated(
448 connection_queues: ConnectionQueues,
449 interface: Arc<ActiveInterface<impl InterfaceIO>>,
450 mut ingress_queue: Receiver<Packet>,
451) -> Result<()> {
452 loop {
453 let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
454
455 let count = ingress_queue
456 .recv_many(&mut packets, PACKET_BUFFER_SIZE)
457 .await;
458
459 if count == 0 {
461 return Ok(());
462 }
463
464 for packet in packets {
465 let dest_addr = match packet.destination() {
466 Ok(addr) => addr,
467 Err(e) => {
468 warn!("Received packet with malformed header structure: {e}");
469 continue;
470 }
471 };
472
473 match connection_queues.get(&dest_addr) {
474 Some(connection_queue) => match connection_queue.try_send(packet.into()) {
476 Ok(()) => {}
477 Err(TrySendError::Full(_)) => {
478 debug!("Dropping client-to-client packet for {dest_addr}: queue full");
479 }
480 Err(TrySendError::Closed(_)) => {
481 debug!(
482 "Dropping client-to-client packet for {dest_addr}: connection closed"
483 );
484 }
485 },
486 None => interface.write_packet(packet).await?,
488 }
489 }
490 }
491}