Skip to main content

quincy_server/server/
mod.rs

1pub mod address_pool;
2mod connection;
3
4use std::net::{IpAddr, SocketAddr};
5use std::sync::Arc;
6use std::time::Duration;
7
8use crate::auth::AuthServer;
9use crate::server::connection::QuincyConnection;
10use crate::users_file::UsersFileServerAuthenticator;
11use bytes::Bytes;
12use dashmap::DashMap;
13use futures::stream::FuturesUnordered;
14use futures::StreamExt;
15use quincy::config::ServerConfig;
16use quincy::network::socket::bind_socket;
17use quincy::Result;
18use quinn::{Endpoint, VarInt};
19use tokio::signal;
20use tokio::sync::mpsc::{channel, Receiver, Sender};
21
22use self::address_pool::AddressPool;
23use quincy::constants::{PACKET_BUFFER_SIZE, PACKET_CHANNEL_SIZE, QUINN_RUNTIME};
24use quincy::network::interface::{Interface, InterfaceIO};
25use quincy::network::packet::Packet;
26use quincy::utils::tasks::abort_all;
27use tracing::{debug, info, warn};
28
29type ConnectionQueues = Arc<DashMap<IpAddr, Sender<Bytes>>>;
30
31/// Represents a Quincy server encapsulating Quincy connections and TUN interface IO.
32pub struct QuincyServer {
33    config: ServerConfig,
34    connection_queues: ConnectionQueues,
35    address_pool: Arc<AddressPool>,
36}
37
38impl QuincyServer {
39    /// Creates a new instance of the Quincy tunnel.
40    ///
41    /// ### Arguments
42    /// - `config` - the server configuration
43    pub fn new(config: ServerConfig) -> Result<Self> {
44        let address_pool = AddressPool::new(config.tunnel_network);
45
46        Ok(Self {
47            config,
48            connection_queues: Arc::new(DashMap::new()),
49            address_pool: Arc::new(address_pool),
50        })
51    }
52
53    /// Starts the tasks for this instance of Quincy tunnel and listens for incoming connections.
54    pub async fn run<I: InterfaceIO>(&self) -> Result<()> {
55        let interface: Interface<I> = Interface::create(
56            self.config.tunnel_network,
57            self.config.connection.mtu,
58            Some(self.config.tunnel_network.network()),
59            self.config.interface_name.clone(),
60            None,
61            None,
62        )?;
63        let interface = Arc::new(interface);
64
65        let authenticator = Box::new(UsersFileServerAuthenticator::new(
66            &self.config.authentication,
67            self.address_pool.clone(),
68        )?);
69        let auth_server = AuthServer::new(
70            authenticator,
71            self.config.tunnel_network,
72            Duration::from_secs(self.config.connection.connection_timeout_s),
73        );
74
75        let (sender, receiver) = channel(PACKET_CHANNEL_SIZE);
76
77        let mut tasks = FuturesUnordered::new();
78
79        tasks.extend([
80            tokio::spawn(Self::process_outbound_traffic(
81                interface.clone(),
82                self.connection_queues.clone(),
83            )),
84            tokio::spawn(Self::process_inbound_traffic(
85                self.connection_queues.clone(),
86                interface,
87                receiver,
88                self.config.isolate_clients,
89            )),
90        ]);
91
92        let handler_task = self.handle_connections(auth_server, sender);
93
94        let result = tokio::select! {
95            handler_task_result = handler_task => handler_task_result,
96            Some(task_result) = tasks.next() => task_result?,
97        };
98
99        let _ = abort_all(tasks).await;
100
101        result
102    }
103
104    /// Handles incoming connections by spawning a new QuincyConnection instance for them.
105    ///
106    /// ### Arguments
107    /// - `auth_server` - the authentication server to use for authenticating clients
108    /// - `ingress_queue` - the queue for sending data to the TUN interface
109    async fn handle_connections(
110        &self,
111        auth_server: AuthServer,
112        ingress_queue: Sender<Packet>,
113    ) -> Result<()> {
114        let endpoint = self.create_quinn_endpoint()?;
115
116        info!(
117            "Starting connection handler: {}",
118            endpoint.local_addr().expect("Endpoint has a local address")
119        );
120
121        let mut authentication_tasks = FuturesUnordered::new();
122        let mut connection_tasks = FuturesUnordered::new();
123
124        loop {
125            tokio::select! {
126                // New connections
127                Some(handshake) = endpoint.accept() => {
128                    let client_ip = handshake.remote_address().ip();
129
130                    debug!(
131                        "Received incoming connection from '{}'",
132                        client_ip
133                    );
134
135                    let quic_connection = match handshake.await {
136                        Ok(connection) => connection,
137                        Err(e) => {
138                            warn!("Connection handshake with client '{client_ip}' failed: {e}");
139                            continue;
140                        }
141                    };
142
143                    let connection = QuincyConnection::new(
144                        quic_connection,
145                        ingress_queue.clone(),
146                    );
147
148                    authentication_tasks.push(
149                        connection.authenticate(&auth_server)
150                    );
151                }
152
153                // Authentication tasks
154                Some(connection) = authentication_tasks.next() => {
155                    let connection = match connection {
156                        Ok(connection) => connection,
157                        Err(e) => {
158                            warn!("Failed to authenticate client: {e}");
159                            continue;
160                        }
161                    };
162
163                    let client_address = connection.client_address()?.addr();
164                    let (connection_sender, connection_receiver) = channel(PACKET_CHANNEL_SIZE);
165
166                    connection_tasks.push(tokio::spawn(connection.run(connection_receiver)));
167                    self.connection_queues.insert(client_address, connection_sender);
168                }
169
170                // Connection tasks
171                Some(connection) = connection_tasks.next() => {
172                    let (connection, err) = connection?;
173                    let client_address = &connection.client_address()?.addr();
174
175                    self.connection_queues.remove(client_address);
176                    self.address_pool.release_address(client_address);
177                    warn!("Connection with client {client_address} has encountered an error: {err}");
178                }
179
180                // Shutdown
181                _ = signal::ctrl_c() => {
182                    info!("Received shutdown signal, shutting down");
183                    let _ = abort_all(connection_tasks).await;
184
185                    endpoint.close(VarInt::from_u32(0x01), "Server shutdown".as_bytes());
186
187                    return Ok(());
188                }
189            }
190        }
191    }
192
193    /// Creates a Quinn QUIC endpoint that clients can connect to.
194    fn create_quinn_endpoint(&self) -> Result<Endpoint> {
195        let quinn_config = self.config.as_quinn_server_config()?;
196
197        let socket = bind_socket(
198            SocketAddr::new(self.config.bind_address, self.config.bind_port),
199            self.config.connection.send_buffer_size as usize,
200            self.config.connection.recv_buffer_size as usize,
201            self.config.reuse_socket,
202        )?;
203
204        let endpoint_config = self.config.connection.as_endpoint_config()?;
205        let endpoint = Endpoint::new(
206            endpoint_config,
207            Some(quinn_config),
208            socket,
209            QUINN_RUNTIME.clone(),
210        )?;
211
212        Ok(endpoint)
213    }
214
215    /// Reads data from the TUN interface and sends it to the appropriate client.
216    ///
217    /// ### Arguments
218    /// - `tun_read` - the read half of the TUN interface
219    /// - `connection_queues` - the queues for sending data to the QUIC connections
220    /// - `buffer_size` - the size of the buffer to use when reading from the TUN interface
221    async fn process_outbound_traffic(
222        interface: Arc<Interface<impl InterfaceIO>>,
223        connection_queues: ConnectionQueues,
224    ) -> Result<()> {
225        debug!("Started tunnel outbound traffic task (interface -> connection queue)");
226
227        loop {
228            let packet = interface.read_packet().await?;
229            let dest_addr = match packet.destination() {
230                Ok(addr) => addr,
231                Err(e) => {
232                    warn!("Received packet with malformed header structure: {e}");
233                    continue;
234                }
235            };
236
237            debug!("Destination address for packet: {dest_addr}");
238
239            let connection_queue = match connection_queues.get(&dest_addr) {
240                Some(connection_queue) => connection_queue,
241                None => continue,
242            };
243
244            debug!("Found connection for IP {dest_addr}");
245
246            connection_queue.send(packet.into()).await?;
247        }
248    }
249
250    /// Reads data from the QUIC connection and sends it to the TUN interface worker.
251    ///
252    /// ### Arguments
253    /// - `connection_queues` - the queues for sending data to the QUIC connections
254    /// - `tun_write` - the write half of the TUN interface
255    /// - `ingress_queue` - the queue for sending data to the TUN interface
256    /// - `isolate_clients` - whether to isolate clients from each other
257    async fn process_inbound_traffic(
258        connection_queues: ConnectionQueues,
259        interface: Arc<Interface<impl InterfaceIO>>,
260        ingress_queue: Receiver<Packet>,
261        isolate_clients: bool,
262    ) -> Result<()> {
263        debug!("Started tunnel inbound traffic task (tunnel queue -> interface)");
264
265        if isolate_clients {
266            relay_isolated(connection_queues, interface, ingress_queue).await
267        } else {
268            relay_unisolated(connection_queues, interface, ingress_queue).await
269        }
270    }
271}
272
273#[inline]
274async fn relay_isolated(
275    connection_queues: ConnectionQueues,
276    interface: Arc<Interface<impl InterfaceIO>>,
277    mut ingress_queue: Receiver<Packet>,
278) -> Result<()> {
279    loop {
280        let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
281        ingress_queue
282            .recv_many(&mut packets, PACKET_BUFFER_SIZE)
283            .await;
284
285        let filtered_packets = packets
286            .into_iter()
287            .filter(|packet| {
288                let dest_addr = match packet.destination() {
289                    Ok(addr) => addr,
290                    Err(e) => {
291                        warn!("Received packet with malformed header structure: {e}");
292                        return false;
293                    }
294                };
295                !connection_queues.contains_key(&dest_addr)
296            })
297            .collect::<Vec<_>>();
298
299        interface.write_packets(filtered_packets).await?;
300    }
301}
302
303#[inline]
304async fn relay_unisolated(
305    connection_queues: ConnectionQueues,
306    interface: Arc<Interface<impl InterfaceIO>>,
307    mut ingress_queue: Receiver<Packet>,
308) -> Result<()> {
309    loop {
310        let mut packets = Vec::with_capacity(PACKET_BUFFER_SIZE);
311
312        ingress_queue
313            .recv_many(&mut packets, PACKET_BUFFER_SIZE)
314            .await;
315
316        for packet in packets {
317            let dest_addr = match packet.destination() {
318                Ok(addr) => addr,
319                Err(e) => {
320                    warn!("Received packet with malformed header structure: {e}");
321                    continue;
322                }
323            };
324
325            match connection_queues.get(&dest_addr) {
326                // Send the packet to the appropriate QUIC connection
327                Some(connection_queue) => connection_queue.send(packet.into()).await?,
328                // Send the packet to the TUN interface
329                None => interface.write_packet(packet).await?,
330            }
331        }
332    }
333}