Skip to main content

quincy_server/server/
mod.rs

1pub mod address_pool;
2mod connection;
3pub mod session;
4
5#[cfg(feature = "metrics")]
6mod metrics;
7
8use std::collections::HashMap;
9use std::net::{IpAddr, SocketAddr};
10use std::sync::Arc;
11#[cfg(feature = "metrics")]
12use std::time::Duration;
13use std::time::Instant;
14
15use bytes::Bytes;
16use dashmap::DashMap;
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use quinn::{Endpoint, VarInt};
20use tokio::signal;
21use tokio::sync::mpsc::error::TrySendError;
22use tokio::sync::mpsc::{Receiver, Sender, channel};
23use tracing::{debug, info, warn};
24
25use crate::server::address_pool::AddressPoolManager;
26use crate::server::connection::{Assigned, QuincyConnection};
27use crate::server::session::{ConnectionSession, UserSessionRegistry};
28use crate::users::UsersFile;
29use quincy::Result;
30use quincy::config::{
31    AddressRange, AllowedNoiseKeys, NoiseKeyExchange, ServerConfig, ServerProtocolConfig,
32};
33use quincy::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
34use quincy::network::interface::{ActiveInterface, Interface, InterfaceIO};
35use quincy::network::packet::Packet;
36use quincy::network::socket::bind_socket;
37use quincy::utils::tasks::abort_all;
38
39/// Map of connection addresses to their TX channel.
40type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;
41
42/// Result of an IP assignment task, carrying the context needed for cleanup on failure.
43struct AssignmentResult {
44    result: Result<QuincyConnection<Assigned>>,
45    quic_connection: quinn::Connection,
46}
47
48/// Represents a Quincy server encapsulating Quincy connections and TUN interface IO.
49pub struct QuincyServer {
50    config: ServerConfig,
51    connection_queues: ConnectionQueues,
52    address_pool: Arc<AddressPoolManager>,
53    users: Arc<UsersFile>,
54    session_registry: Arc<UserSessionRegistry>,
55}
56
57impl QuincyServer {
58    /// Creates a new instance of the Quincy tunnel.
59    ///
60    /// Loads the users file and initializes the address pool from the tunnel network.
61    ///
62    /// ### Arguments
63    /// - `config` - the server configuration
64    pub fn new(config: ServerConfig) -> Result<Self> {
65        let users = UsersFile::load(&config.users_file)?;
66
67        let user_pools: HashMap<String, Vec<AddressRange>> = users
68            .users
69            .iter()
70            .filter(|(_, entry)| !entry.address_pool.is_empty())
71            .map(|(name, entry)| (name.clone(), entry.address_pool.clone()))
72            .collect();
73
74        let address_pool = AddressPoolManager::new(config.tunnel_network, user_pools)?;
75
76        Ok(Self {
77            config,
78            connection_queues: Arc::new(DashMap::new()),
79            address_pool: Arc::new(address_pool),
80            users: Arc::new(users),
81            session_registry: Arc::new(UserSessionRegistry::new()),
82        })
83    }
84
85    /// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections.
86    pub async fn run<I: InterfaceIO>(&self) -> Result<()> {
87        let interface: Interface<I> = Interface::create(
88            self.config.tunnel_network,
89            self.config.connection.mtu,
90            Some(self.config.tunnel_network.network()),
91            self.config.interface_name.clone(),
92            None,
93            None,
94            None,
95        )?;
96        let interface = Arc::new(interface.configure()?);
97
98        #[cfg(feature = "metrics")]
99        if self.config.metrics.enabled {
100            use crate::server::metrics::init_metrics;
101
102            init_metrics(&self.config.metrics)?;
103        }
104
105        let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);
106
107        let mut tasks = FuturesUnordered::new();
108
109        tasks.extend([
110            tokio::spawn(Self::process_outbound_traffic(
111                interface.clone(),
112                self.connection_queues.clone(),
113            )),
114            tokio::spawn(Self::process_inbound_traffic(
115                self.connection_queues.clone(),
116                interface,
117                receiver,
118                self.config.isolate_clients,
119            )),
120        ]);
121
122        let handler_task = self.handle_connections(sender);
123
124        let result = tokio::select! {
125            handler_task_result = handler_task => handler_task_result,
126            Some(task_result) = tasks.next() => task_result?,
127        };
128
129        let _ = abort_all(tasks).await;
130
131        result
132    }
133
134    /// Handles incoming connections by spawning a new QuincyConnection instance for them.
135    ///
136    /// ### Arguments
137    /// - `ingress_queue` - the queue for sending data to the TUN interface
138    async fn handle_connections(&self, ingress_queue: Sender<Packet>) -> Result<()> {
139        let endpoint = self.create_quinn_endpoint()?;
140
141        info!(
142            "Starting connection handler: {}",
143            endpoint.local_addr().expect("Endpoint has a local address")
144        );
145
146        let protocol = Arc::new(self.config.protocol.clone());
147        let server_address = self.config.tunnel_network;
148        let users = self.users.clone();
149        let address_pool = self.address_pool.clone();
150        let session_registry = self.session_registry.clone();
151
152        let mut assignment_tasks = FuturesUnordered::new();
153        let mut connection_tasks = FuturesUnordered::new();
154
155        let shutdown = shutdown_signal();
156        tokio::pin!(shutdown);
157
158        loop {
159            tokio::select! {
160                // New connections
161                Some(handshake) = endpoint.accept() => {
162                    let client_ip = handshake.remote_address().ip();
163
164                    debug!(
165                        "Received incoming connection from '{}'",
166                        client_ip
167                    );
168
169                    let quic_connection = match handshake.await {
170                        Ok(connection) => connection,
171                        Err(e) => {
172                            warn!("Connection handshake with client '{client_ip}' failed: {e}");
173                            continue;
174                        }
175                    };
176
177                    let quic_connection_clone = quic_connection.clone();
178                    let connection = QuincyConnection::new(
179                        quic_connection,
180                        ingress_queue.clone(),
181                    );
182
183                    // Identify synchronously (reads peer_identity + HashMap lookup)
184                    let connection = match connection.identify(&protocol, &users) {
185                        Ok(conn) => conn,
186                        Err(e) => {
187                            warn!("Failed to identify client: {e}");
188                            quic_connection_clone.close(VarInt::from_u32(0x02), "Session establishment failed".as_bytes());
189                            continue;
190                        }
191                    };
192
193                    let address_pool = address_pool.clone();
194                    let server_addr = server_address;
195
196                    assignment_tasks.push(async move {
197                        let result = connection.assign_ip(&address_pool, server_addr).await;
198                        AssignmentResult {
199                            result,
200                            quic_connection: quic_connection_clone,
201                        }
202                    });
203                }
204
205                // Assignment tasks
206                Some(assignment) = assignment_tasks.next() => {
207                    let connection = match assignment.result {
208                        Ok(connection) => connection,
209                        Err(e) => {
210                            warn!("Failed to assign IP to client: {e}");
211                            assignment.quic_connection.close(
212                                VarInt::from_u32(0x02),
213                                "Session establishment failed".as_bytes(),
214                            );
215                            continue;
216                        }
217                    };
218
219                    let client_address = connection.client_address();
220                    let username = connection.username().to_string();
221
222                    // Resolve effective bandwidth limit:
223                    // per-user override > server default > None (unlimited)
224                    let bandwidth_limit = self
225                        .users
226                        .users
227                        .get(&username)
228                        .and_then(|entry| entry.bandwidth_limit)
229                        .or(self.config.default_bandwidth_limit);
230
231                    // Register session and obtain the shared rate limiter
232                    let rate_limiter = session_registry.add_connection(
233                        &username,
234                        ConnectionSession {
235                            client_address,
236                            connected_at: Instant::now(),
237                        },
238                        bandwidth_limit,
239                    );
240
241                    let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);
242
243                    connection_tasks.push(tokio::spawn(connection.run(
244                        connection_receiver,
245                        rate_limiter,
246                        #[cfg(feature = "metrics")]
247                        Duration::from_secs(self.config.metrics.reporting_interval_s),
248                    )));
249                    self.connection_queues
250                        .insert(client_address.addr(), connection_sender);
251                }
252
253                // Connection tasks
254                Some(connection) = connection_tasks.next() => {
255                    let (connection, err) = connection?;
256                    let username = connection.username();
257                    let client_address = connection.client_address();
258
259                    self.connection_queues.remove(&client_address.addr());
260                    self.address_pool.release_address(username, &client_address.addr());
261                    session_registry.remove_connection(username, &client_address);
262
263                    warn!(
264                        "Connection with client {} (user '{username}') has encountered an error: {err}",
265                        client_address.addr()
266                    );
267                }
268
269                // Shutdown
270                shutdown_result = &mut shutdown => {
271                    shutdown_result?;
272
273                    info!("Received shutdown signal, shutting down");
274                    let _ = abort_all(connection_tasks).await;
275
276                    endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes());
277
278                    return Ok(());
279                }
280            }
281        }
282    }
283
284    /// Creates a Quinn QUIC endpoint that clients can connect to.
285    fn create_quinn_endpoint(&self) -> Result<Endpoint> {
286        // Build allowed keys/fingerprints from the users file
287        let (allowed_keys, allowed_fingerprints) = match &self.config.protocol {
288            ServerProtocolConfig::Noise(noise) => {
289                let keys = match noise.key_exchange {
290                    NoiseKeyExchange::Standard => Some(AllowedNoiseKeys::Standard(
291                        self.users.collect_noise_public_keys(),
292                    )),
293                    NoiseKeyExchange::Hybrid => Some(AllowedNoiseKeys::Hybrid(
294                        self.users.collect_noise_pq_public_keys(),
295                    )),
296                };
297                (keys, None)
298            }
299            ServerProtocolConfig::Tls(_) => (None, Some(self.users.collect_cert_fingerprints())),
300        };
301
302        let quinn_config = self
303            .config
304            .as_quinn_server_config(allowed_keys, allowed_fingerprints)?;
305
306        let socket = bind_socket(
307            SocketAddr::new(self.config.bind_address, self.config.bind_port),
308            self.config.connection.send_buffer_size as usize,
309            self.config.connection.recv_buffer_size as usize,
310            self.config.reuse_socket,
311        )?;
312
313        let endpoint_config = self
314            .config
315            .connection
316            .as_endpoint_config(self.config.noise_key_exchange())?;
317        let endpoint = Endpoint::new(
318            endpoint_config,
319            Some(quinn_config),
320            socket,
321            QUINN_RUNTIME.clone(),
322        )?;
323
324        Ok(endpoint)
325    }
326
327    /// Reads data from the TUN interface and sends it to the appropriate client.
328    ///
329    /// ### Arguments
330    /// - `tun_read` - the read half of the TUN interface
331    /// - `connection_queues` - the queues for sending data to the QUIC connections
332    async fn process_outbound_traffic(
333        interface: Arc<ActiveInterface<impl InterfaceIO>>,
334        connection_queues: ConnectionQueues,
335    ) -> Result<()> {
336        debug!("Started tunnel outbound traffic task (interface -> connection queue)");
337
338        loop {
339            let packet = interface.read_packet().await?;
340            let dest_addr = match packet.destination() {
341                Ok(addr) => addr,
342                Err(e) => {
343                    warn!("Received packet with malformed header structure: {e}");
344                    continue;
345                }
346            };
347
348            debug!("Destination address for packet: {dest_addr}");
349
350            let connection_queue = match connection_queues.get(&dest_addr) {
351                Some(connection_queue) => connection_queue,
352                None => continue,
353            };
354
355            debug!("Found connection for IP {dest_addr}");
356
357            match connection_queue.try_send(packet.into()) {
358                Ok(()) => {}
359                Err(TrySendError::Full(_)) => {
360                    debug!("Dropping outbound packet for {dest_addr}: per-client queue full");
361                }
362                Err(TrySendError::Closed(_)) => {
363                    debug!("Dropping outbound packet for {dest_addr}: connection closed");
364                }
365            }
366        }
367    }
368
369    /// Reads data from the QUIC connection and sends it to the TUN interface worker.
370    ///
371    /// ### Arguments
372    /// - `connection_queues` - the queues for sending data to the QUIC connections
373    /// - `tun_write` - the write half of the TUN interface
374    /// - `ingress_queue` - the queue for sending data to the TUN interface
375    /// - `isolate_clients` - whether to isolate clients from each other
376    async fn process_inbound_traffic(
377        connection_queues: ConnectionQueues,
378        interface: Arc<ActiveInterface<impl InterfaceIO>>,
379        ingress_queue: Receiver<Packet>,
380        isolate_clients: bool,
381    ) -> Result<()> {
382        debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");
383
384        if isolate_clients {
385            relay_isolated(connection_queues, interface, ingress_queue).await
386        } else {
387            relay_unisolated(connection_queues, interface, ingress_queue).await
388        }
389    }
390}
391
392#[cfg(unix)]
393async fn shutdown_signal() -> Result<()> {
394    let mut interrupt = signal::unix::signal(signal::unix::SignalKind::interrupt())?;
395    let mut terminate = signal::unix::signal(signal::unix::SignalKind::terminate())?;
396
397    tokio::select! {
398        _ = interrupt.recv() => {}
399        _ = terminate.recv() => {}
400    }
401
402    Ok(())
403}
404
405#[cfg(not(unix))]
406async fn shutdown_signal() -> Result<()> {
407    signal::ctrl_c().await?;
408    Ok(())
409}
410
411#[inline]
412async fn relay_isolated(
413    connection_queues: ConnectionQueues,
414    interface: Arc<ActiveInterface<impl InterfaceIO>>,
415    mut ingress_queue: Receiver<Packet>,
416) -> Result<()> {
417    loop {
418        let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
419        let count = ingress_queue
420            .recv_many(&mut packets, PACKET_BUFFER_SIZE)
421            .await;
422
423        // ingress_queue closed
424        if count == 0 {
425            return Ok(());
426        }
427
428        let filtered_packets = packets
429            .into_iter()
430            .filter(|packet| {
431                let dest_addr = match packet.destination() {
432                    Ok(addr) => addr,
433                    Err(e) => {
434                        warn!("Received packet with malformed header structure: {e}");
435                        return false;
436                    }
437                };
438                !connection_queues.contains_key(&dest_addr)
439            })
440            .collect::<Vec<_>>();
441
442        interface.write_packets(filtered_packets).await?;
443    }
444}
445
446#[inline]
447async fn relay_unisolated(
448    connection_queues: ConnectionQueues,
449    interface: Arc<ActiveInterface<impl InterfaceIO>>,
450    mut ingress_queue: Receiver<Packet>,
451) -> Result<()> {
452    loop {
453        let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
454
455        let count = ingress_queue
456            .recv_many(&mut packets, PACKET_BUFFER_SIZE)
457            .await;
458
459        // ingress_queue closed
460        if count == 0 {
461            return Ok(());
462        }
463
464        for packet in packets {
465            let dest_addr = match packet.destination() {
466                Ok(addr) => addr,
467                Err(e) => {
468                    warn!("Received packet with malformed header structure: {e}");
469                    continue;
470                }
471            };
472
473            match connection_queues.get(&dest_addr) {
474                // Send the packet to the appropriate QUIC connection
475                Some(connection_queue) => match connection_queue.try_send(packet.into()) {
476                    Ok(()) => {}
477                    Err(TrySendError::Full(_)) => {
478                        debug!("Dropping client-to-client packet for {dest_addr}: queue full");
479                    }
480                    Err(TrySendError::Closed(_)) => {
481                        debug!(
482                            "Dropping client-to-client packet for {dest_addr}: connection closed"
483                        );
484                    }
485                },
486                // Send the packet to the TUN interface
487                None => interface.write_packet(packet).await?,
488            }
489        }
490    }
491}