Skip to main content

quincy_server/server/
mod.rs

1pub mod address_pool;
2mod connection;
3pub mod session;
4
5#[cfg(feature = "metrics")]
6mod metrics;
7
8use std::collections::HashMap;
9use std::net::{IpAddr, SocketAddr};
10use std::sync::Arc;
11#[cfg(feature = "metrics")]
12use std::time::Duration;
13use std::time::Instant;
14
15use bytes::Bytes;
16use dashmap::DashMap;
17use futures::StreamExt;
18use futures::stream::FuturesUnordered;
19use quinn::{Endpoint, VarInt};
20use tokio::signal;
21use tokio::sync::mpsc::error::TrySendError;
22use tokio::sync::mpsc::{Receiver, Sender, channel};
23use tracing::{debug, info, warn};
24
25use crate::server::address_pool::AddressPoolManager;
26use crate::server::connection::{Assigned, QuincyConnection};
27use crate::server::session::{ConnectionSession, UserSessionRegistry};
28use crate::users::UsersFile;
29use quincy::Result;
30use quincy::config::{
31    AddressRange, AllowedNoiseKeys, NoiseKeyExchange, ServerConfig, ServerProtocolConfig,
32};
33use quincy::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
34use quincy::network::interface::{ActiveInterface, Interface, InterfaceIO};
35use quincy::network::packet::Packet;
36use quincy::network::socket::bind_socket;
37use quincy::utils::tasks::abort_all;
38
39/// Map of connection addresses to their TX channel.
40type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;
41
42/// Result of an IP assignment task, carrying the context needed for cleanup on failure.
43struct AssignmentResult {
44    result: Result<QuincyConnection<Assigned>>,
45    quic_connection: quinn::Connection,
46}
47
48/// Represents a Quincy server encapsulating Quincy connections and TUN interface IO.
49pub struct QuincyServer {
50    config: ServerConfig,
51    connection_queues: ConnectionQueues,
52    address_pool: Arc<AddressPoolManager>,
53    users: Arc<UsersFile>,
54    session_registry: Arc<UserSessionRegistry>,
55}
56
57impl QuincyServer {
58    /// Creates a new instance of the Quincy tunnel.
59    ///
60    /// Loads the users file and initializes the address pool from the tunnel network.
61    ///
62    /// ### Arguments
63    /// - `config` - the server configuration
64    pub fn new(config: ServerConfig) -> Result<Self> {
65        let users = UsersFile::load(&config.users_file)?;
66
67        let user_pools: HashMap<String, Vec<AddressRange>> = users
68            .users
69            .iter()
70            .filter(|(_, entry)| !entry.address_pool.is_empty())
71            .map(|(name, entry)| (name.clone(), entry.address_pool.clone()))
72            .collect();
73
74        let address_pool = AddressPoolManager::new(config.tunnel_network, user_pools)?;
75
76        Ok(Self {
77            config,
78            connection_queues: Arc::new(DashMap::new()),
79            address_pool: Arc::new(address_pool),
80            users: Arc::new(users),
81            session_registry: Arc::new(UserSessionRegistry::new()),
82        })
83    }
84
85    /// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections.
86    pub async fn run<I: InterfaceIO>(&self) -> Result<()> {
87        let interface: Interface<I> = Interface::create(
88            self.config.tunnel_network,
89            self.config.connection.mtu,
90            Some(self.config.tunnel_network.network()),
91            self.config.interface_name.clone(),
92            None,
93            None,
94            None,
95        )?;
96        let interface = Arc::new(interface.configure()?);
97
98        #[cfg(feature = "metrics")]
99        if self.config.metrics.enabled {
100            use crate::server::metrics::init_metrics;
101
102            init_metrics(&self.config.metrics)?;
103        }
104
105        let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);
106
107        let mut tasks = FuturesUnordered::new();
108
109        tasks.extend([
110            tokio::spawn(Self::process_outbound_traffic(
111                interface.clone(),
112                self.connection_queues.clone(),
113            )),
114            tokio::spawn(Self::process_inbound_traffic(
115                self.connection_queues.clone(),
116                interface,
117                receiver,
118                self.config.isolate_clients,
119            )),
120        ]);
121
122        let handler_task = self.handle_connections(sender);
123
124        let result = tokio::select! {
125            handler_task_result = handler_task => handler_task_result,
126            Some(task_result) = tasks.next() => task_result?,
127        };
128
129        let _ = abort_all(tasks).await;
130
131        result
132    }
133
134    /// Handles incoming connections by spawning a new QuincyConnection instance for them.
135    ///
136    /// ### Arguments
137    /// - `ingress_queue` - the queue for sending data to the TUN interface
138    async fn handle_connections(&self, ingress_queue: Sender<Packet>) -> Result<()> {
139        let endpoint = self.create_quinn_endpoint()?;
140
141        info!(
142            "Starting connection handler: {}",
143            endpoint.local_addr().expect("Endpoint has a local address")
144        );
145
146        let protocol = Arc::new(self.config.protocol.clone());
147        let server_address = self.config.tunnel_network;
148        let users = self.users.clone();
149        let address_pool = self.address_pool.clone();
150        let session_registry = self.session_registry.clone();
151
152        let mut assignment_tasks = FuturesUnordered::new();
153        let mut connection_tasks = FuturesUnordered::new();
154
155        loop {
156            tokio::select! {
157                // New connections
158                Some(handshake) = endpoint.accept() => {
159                    let client_ip = handshake.remote_address().ip();
160
161                    debug!(
162                        "Received incoming connection from '{}'",
163                        client_ip
164                    );
165
166                    let quic_connection = match handshake.await {
167                        Ok(connection) => connection,
168                        Err(e) => {
169                            warn!("Connection handshake with client '{client_ip}' failed: {e}");
170                            continue;
171                        }
172                    };
173
174                    let quic_connection_clone = quic_connection.clone();
175                    let connection = QuincyConnection::new(
176                        quic_connection,
177                        ingress_queue.clone(),
178                    );
179
180                    // Identify synchronously (reads peer_identity + HashMap lookup)
181                    let connection = match connection.identify(&protocol, &users) {
182                        Ok(conn) => conn,
183                        Err(e) => {
184                            warn!("Failed to identify client: {e}");
185                            quic_connection_clone.close(VarInt::from_u32(0x02), "Session establishment failed".as_bytes());
186                            continue;
187                        }
188                    };
189
190                    let address_pool = address_pool.clone();
191                    let server_addr = server_address;
192
193                    assignment_tasks.push(async move {
194                        let result = connection.assign_ip(&address_pool, server_addr).await;
195                        AssignmentResult {
196                            result,
197                            quic_connection: quic_connection_clone,
198                        }
199                    });
200                }
201
202                // Assignment tasks
203                Some(assignment) = assignment_tasks.next() => {
204                    let connection = match assignment.result {
205                        Ok(connection) => connection,
206                        Err(e) => {
207                            warn!("Failed to assign IP to client: {e}");
208                            assignment.quic_connection.close(
209                                VarInt::from_u32(0x02),
210                                "Session establishment failed".as_bytes(),
211                            );
212                            continue;
213                        }
214                    };
215
216                    let client_address = connection.client_address();
217                    let username = connection.username().to_string();
218
219                    // Resolve effective bandwidth limit:
220                    // per-user override > server default > None (unlimited)
221                    let bandwidth_limit = self
222                        .users
223                        .users
224                        .get(&username)
225                        .and_then(|entry| entry.bandwidth_limit)
226                        .or(self.config.default_bandwidth_limit);
227
228                    // Register session and obtain the shared rate limiter
229                    let rate_limiter = session_registry.add_connection(
230                        &username,
231                        ConnectionSession {
232                            client_address,
233                            connected_at: Instant::now(),
234                        },
235                        bandwidth_limit,
236                    );
237
238                    let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);
239
240                    connection_tasks.push(tokio::spawn(connection.run(
241                        connection_receiver,
242                        rate_limiter,
243                        #[cfg(feature = "metrics")]
244                        Duration::from_secs(self.config.metrics.reporting_interval_s),
245                    )));
246                    self.connection_queues
247                        .insert(client_address.addr(), connection_sender);
248                }
249
250                // Connection tasks
251                Some(connection) = connection_tasks.next() => {
252                    let (connection, err) = connection?;
253                    let username = connection.username();
254                    let client_address = connection.client_address();
255
256                    self.connection_queues.remove(&client_address.addr());
257                    self.address_pool.release_address(username, &client_address.addr());
258                    session_registry.remove_connection(username, &client_address);
259
260                    warn!(
261                        "Connection with client {} (user '{username}') has encountered an error: {err}",
262                        client_address.addr()
263                    );
264                }
265
266                // Shutdown
267                _ = signal::ctrl_c() => {
268                    info!("Received shutdown signal, shutting down");
269                    let _ = abort_all(connection_tasks).await;
270
271                    endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes());
272
273                    return Ok(());
274                }
275            }
276        }
277    }
278
279    /// Creates a Quinn QUIC endpoint that clients can connect to.
280    fn create_quinn_endpoint(&self) -> Result<Endpoint> {
281        // Build allowed keys/fingerprints from the users file
282        let (allowed_keys, allowed_fingerprints) = match &self.config.protocol {
283            ServerProtocolConfig::Noise(noise) => {
284                let keys = match noise.key_exchange {
285                    NoiseKeyExchange::Standard => Some(AllowedNoiseKeys::Standard(
286                        self.users.collect_noise_public_keys(),
287                    )),
288                    NoiseKeyExchange::Hybrid => Some(AllowedNoiseKeys::Hybrid(
289                        self.users.collect_noise_pq_public_keys(),
290                    )),
291                };
292                (keys, None)
293            }
294            ServerProtocolConfig::Tls(_) => (None, Some(self.users.collect_cert_fingerprints())),
295        };
296
297        let quinn_config = self
298            .config
299            .as_quinn_server_config(allowed_keys, allowed_fingerprints)?;
300
301        let socket = bind_socket(
302            SocketAddr::new(self.config.bind_address, self.config.bind_port),
303            self.config.connection.send_buffer_size as usize,
304            self.config.connection.recv_buffer_size as usize,
305            self.config.reuse_socket,
306        )?;
307
308        let endpoint_config = self
309            .config
310            .connection
311            .as_endpoint_config(self.config.noise_key_exchange())?;
312        let endpoint = Endpoint::new(
313            endpoint_config,
314            Some(quinn_config),
315            socket,
316            QUINN_RUNTIME.clone(),
317        )?;
318
319        Ok(endpoint)
320    }
321
322    /// Reads data from the TUN interface and sends it to the appropriate client.
323    ///
324    /// ### Arguments
325    /// - `tun_read` - the read half of the TUN interface
326    /// - `connection_queues` - the queues for sending data to the QUIC connections
327    async fn process_outbound_traffic(
328        interface: Arc<ActiveInterface<impl InterfaceIO>>,
329        connection_queues: ConnectionQueues,
330    ) -> Result<()> {
331        debug!("Started tunnel outbound traffic task (interface -> connection queue)");
332
333        loop {
334            let packet = interface.read_packet().await?;
335            let dest_addr = match packet.destination() {
336                Ok(addr) => addr,
337                Err(e) => {
338                    warn!("Received packet with malformed header structure: {e}");
339                    continue;
340                }
341            };
342
343            debug!("Destination address for packet: {dest_addr}");
344
345            let connection_queue = match connection_queues.get(&dest_addr) {
346                Some(connection_queue) => connection_queue,
347                None => continue,
348            };
349
350            debug!("Found connection for IP {dest_addr}");
351
352            match connection_queue.try_send(packet.into()) {
353                Ok(()) => {}
354                Err(TrySendError::Full(_)) => {
355                    debug!("Dropping outbound packet for {dest_addr}: per-client queue full");
356                }
357                Err(TrySendError::Closed(_)) => {
358                    debug!("Dropping outbound packet for {dest_addr}: connection closed");
359                }
360            }
361        }
362    }
363
364    /// Reads data from the QUIC connection and sends it to the TUN interface worker.
365    ///
366    /// ### Arguments
367    /// - `connection_queues` - the queues for sending data to the QUIC connections
368    /// - `tun_write` - the write half of the TUN interface
369    /// - `ingress_queue` - the queue for sending data to the TUN interface
370    /// - `isolate_clients` - whether to isolate clients from each other
371    async fn process_inbound_traffic(
372        connection_queues: ConnectionQueues,
373        interface: Arc<ActiveInterface<impl InterfaceIO>>,
374        ingress_queue: Receiver<Packet>,
375        isolate_clients: bool,
376    ) -> Result<()> {
377        debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");
378
379        if isolate_clients {
380            relay_isolated(connection_queues, interface, ingress_queue).await
381        } else {
382            relay_unisolated(connection_queues, interface, ingress_queue).await
383        }
384    }
385}
386
387#[inline]
388async fn relay_isolated(
389    connection_queues: ConnectionQueues,
390    interface: Arc<ActiveInterface<impl InterfaceIO>>,
391    mut ingress_queue: Receiver<Packet>,
392) -> Result<()> {
393    loop {
394        let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
395        let count = ingress_queue
396            .recv_many(&mut packets, PACKET_BUFFER_SIZE)
397            .await;
398
399        // ingress_queue closed
400        if count == 0 {
401            return Ok(());
402        }
403
404        let filtered_packets = packets
405            .into_iter()
406            .filter(|packet| {
407                let dest_addr = match packet.destination() {
408                    Ok(addr) => addr,
409                    Err(e) => {
410                        warn!("Received packet with malformed header structure: {e}");
411                        return false;
412                    }
413                };
414                !connection_queues.contains_key(&dest_addr)
415            })
416            .collect::<Vec<_>>();
417
418        interface.write_packets(filtered_packets).await?;
419    }
420}
421
422#[inline]
423async fn relay_unisolated(
424    connection_queues: ConnectionQueues,
425    interface: Arc<ActiveInterface<impl InterfaceIO>>,
426    mut ingress_queue: Receiver<Packet>,
427) -> Result<()> {
428    loop {
429        let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
430
431        let count = ingress_queue
432            .recv_many(&mut packets, PACKET_BUFFER_SIZE)
433            .await;
434
435        // ingress_queue closed
436        if count == 0 {
437            return Ok(());
438        }
439
440        for packet in packets {
441            let dest_addr = match packet.destination() {
442                Ok(addr) => addr,
443                Err(e) => {
444                    warn!("Received packet with malformed header structure: {e}");
445                    continue;
446                }
447            };
448
449            match connection_queues.get(&dest_addr) {
450                // Send the packet to the appropriate QUIC connection
451                Some(connection_queue) => match connection_queue.try_send(packet.into()) {
452                    Ok(()) => {}
453                    Err(TrySendError::Full(_)) => {
454                        debug!("Dropping client-to-client packet for {dest_addr}: queue full");
455                    }
456                    Err(TrySendError::Closed(_)) => {
457                        debug!(
458                            "Dropping client-to-client packet for {dest_addr}: connection closed"
459                        );
460                    }
461                },
462                // Send the packet to the TUN interface
463                None => interface.write_packet(packet).await?,
464            }
465        }
466    }
467}