Skip to main content

typhoon/socket/
server.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::hash::Hash;
4use std::net::SocketAddr;
5use std::sync::{Arc, Weak};
6
7use async_trait::async_trait;
8use log::{debug, info, warn};
9
10use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
11use crate::cache::SharedMap;
12#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
13use crate::certificate::ObfuscationBufferContainer;
14use crate::certificate::{ServerKeyPair, ServerSecret};
15use crate::crypto::{PAYLOAD_CRYPTO_OVERHEAD, ServerCryptoTool, UserCryptoState, UserServerState, verify_transcript_with_key};
16use crate::flow::decoy::{DecoyFactory, random_decoy_factory};
17use crate::flow::probe::ProbeFactory;
18use crate::flow::server::{RawReceivedPacket, ServerFlowManager};
19use crate::flow::{FlowConfig, FlowControllerError};
20use crate::session::SessionControllerError;
21use crate::session::server::{IncomingPacket, OutgoingRouter, ServerSessionManager};
22use crate::settings::{Settings, keys};
23use crate::socket::error::ServerSocketError;
24use crate::tailer::{IdentityType, PacketFlags, ReturnCode, ServerConnectionHandler, Tailer};
25use crate::utils::random::jittered_chunk_size;
26use crate::utils::socket::Socket;
27use crate::utils::sync::{AsyncExecutor, Mutex, NotifyQueueReceiver, NotifyQueueSender, RwLock, assert_runtime, create_bounded_notify_queue, create_notify_queue};
28use crate::utils::unix_timestamp_ms;
29
30/// Configuration for a single server flow manager.
31pub struct ServerFlowConfiguration<T: IdentityType + Clone, AE: AsyncExecutor> {
32    socket: Option<Socket>,
33    address: Option<SocketAddr>,
34    config: FlowConfig,
35    /// Number of SO_REUSEPORT reader sockets to create (Linux only; default 1).
36    /// Values > 1 are silently clamped to 1 on non-Linux platforms.
37    reader_count: usize,
38    /// Optional per-flow decoy factory. Falls back to the listener's default when `None`.
39    decoy_factory: Option<DecoyFactory<T, AE>>,
40    /// Optional per-flow probe factory. Falls back to the listener's default when `None`.
41    probe_factory: Option<ProbeFactory<AE>>,
42}
43
44impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ServerFlowConfiguration<T, AE> {
45    /// Create a configuration with a pre-built socket.
46    pub fn new(config: FlowConfig, socket: Socket) -> Self {
47        Self {
48            socket: Some(socket),
49            address: None,
50            config,
51            reader_count: 1,
52            decoy_factory: None,
53            probe_factory: None,
54        }
55    }
56
57    /// Create a configuration that will bind a socket to the given address.
58    pub fn with_address(config: FlowConfig, address: SocketAddr) -> Self {
59        Self {
60            socket: None,
61            address: Some(address),
62            config,
63            reader_count: 1,
64            decoy_factory: None,
65            probe_factory: None,
66        }
67    }
68
69    /// Set the number of SO_REUSEPORT reader sockets (Linux only).
70    /// The kernel distributes incoming datagrams across all sockets by 4-tuple hash,
71    /// enabling N concurrent `recv_from` drain tasks with no per-packet locking.
72    /// Has no effect (silently clamped to 1) on non-Linux platforms.
73    pub fn with_reader_count(mut self, count: usize) -> Self {
74        self.reader_count = count.max(1);
75        self
76    }
77
78    /// Override the decoy provider factory for this flow.
79    /// When not set, the listener's default factory (random selection) is used.
80    pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
81        self.decoy_factory = Some(factory);
82        self
83    }
84
85    /// Override the decoy provider for this flow using a concrete type.
86    pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
87        self.decoy_factory = Some(crate::flow::decoy::decoy_factory::<T, AE, DP>());
88        self
89    }
90
91    /// Override the active probe handler factory for this flow.
92    pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
93        self.probe_factory = Some(factory);
94        self
95    }
96
97    /// Override the active probe handler type for this flow.
98    pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
99        self.probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
100        self
101    }
102}
103
104/// Builder for constructing a `Listener`.
105pub struct ListenerBuilder<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T>> {
106    settings: Option<Arc<Settings<AE>>>,
107    flow_configs: Vec<ServerFlowConfiguration<T, AE>>,
108    secret: ServerSecret<'static>,
109    identity_generator: IG,
110    default_decoy_factory: DecoyFactory<T, AE>,
111    default_probe_factory: Option<ProbeFactory<AE>>,
112}
113
114impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> ListenerBuilder<T, AE, IG> {
115    /// Create a new builder with the given server key pair and identity generator.
116    /// Decoy providers are randomly selected per-user by default.
117    pub fn new(key_pair: ServerKeyPair, identity_generator: IG) -> Self {
118        Self {
119            settings: None,
120            flow_configs: Vec::new(),
121            secret: key_pair.into_server_secret(),
122            identity_generator,
123            default_decoy_factory: random_decoy_factory(),
124            default_probe_factory: None,
125        }
126    }
127
128    /// Set custom settings.
129    pub fn with_settings(mut self, settings: Arc<Settings<AE>>) -> Self {
130        self.settings = Some(settings);
131        self
132    }
133
134    /// Override the default decoy factory used for flows that have no per-flow override.
135    pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
136        self.default_decoy_factory = factory;
137        self
138    }
139
140    /// Override the default decoy provider type for all flows without a per-flow override.
141    pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
142        self.default_decoy_factory = crate::flow::decoy::decoy_factory::<T, AE, DP>();
143        self
144    }
145
146    /// Set the default active probe handler factory for flows that have no per-flow override.
147    pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
148        self.default_probe_factory = Some(factory);
149        self
150    }
151
152    /// Set the default active probe handler type for flows without a per-flow override.
153    pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
154        self.default_probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
155        self
156    }
157
158    /// Append a single flow manager configuration.
159    pub fn add_flow(mut self, config: ServerFlowConfiguration<T, AE>) -> Self {
160        self.flow_configs.push(config);
161        self
162    }
163
164    /// Set all flow manager configurations at once.
165    pub fn with_flows(mut self, configs: Vec<ServerFlowConfiguration<T, AE>>) -> Self {
166        self.flow_configs = configs;
167        self
168    }
169
170    /// Build the listener, creating all flow managers.
171    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
172    pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
173        assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
174        if self.flow_configs.is_empty() {
175            return Err(ServerSocketError::NoFlows);
176        }
177
178        let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
179        let users: SharedMap<T, UserServerState> = SharedMap::new();
180        let mut flows = Vec::with_capacity(self.flow_configs.len());
181
182        let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
183        let mut max_data_payload = usize::MAX;
184
185        let obfs_buffer = self.secret.obfuscation_buffer();
186
187        for flow_config in self.flow_configs.drain(..) {
188            flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;
189
190            max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
191
192            let socks: Vec<Arc<Socket>> = if let Some(socket) = flow_config.socket {
193                vec![Arc::new(socket)]
194            } else {
195                let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
196                cfg_if::cfg_if! {
197                    if #[cfg(target_os = "linux")] {
198                        if flow_config.reader_count > 1 {
199                            Socket::bind_reuse_port(address, flow_config.reader_count)
200                                .map_err(ServerSocketError::SocketError)?
201                                .into_iter().map(Arc::new).collect()
202                        } else {
203                            vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
204                        }
205                    } else {
206                        vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
207                    }
208                }
209            };
210
211            let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
212            let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
213            let crypto_send = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
214            let crypto_recv = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
215            let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
216            flows.push(flow);
217        }
218        let max_data_payload = if max_data_payload == usize::MAX {
219            settings.mtu()
220        } else {
221            max_data_payload
222        };
223        info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
224
225        let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();
226
227        let router = Arc::new(Router {
228            flows,
229            sessions: RwLock::new(HashMap::new()),
230            users: Mutex::new(users),
231        });
232
233        Ok(Listener {
234            router,
235            secret: self.secret,
236            identity_generator: self.identity_generator,
237            accept_tx,
238            accept_rx: Mutex::new(accept_rx),
239            max_data_payload,
240            settings,
241        })
242    }
243
244    /// Build the listener, creating all flow managers (full mode).
245    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
246    pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
247        assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
248        if self.flow_configs.is_empty() {
249            return Err(ServerSocketError::NoFlows);
250        }
251
252        let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
253        let users: SharedMap<T, UserServerState> = SharedMap::new();
254        let mut flows = Vec::with_capacity(self.flow_configs.len());
255
256        let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
257        let mut max_data_payload = usize::MAX;
258
259        let secret_arc = Arc::new(self.secret);
260
261        for flow_config in self.flow_configs.drain(..) {
262            flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;
263
264            max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
265
266            let socks: Vec<Arc<Socket>> = match flow_config.socket {
267                Some(socket) => vec![Arc::new(socket)],
268                None => {
269                    let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
270                    cfg_if::cfg_if! {
271                        if #[cfg(target_os = "linux")] {
272                            if flow_config.reader_count > 1 {
273                                Socket::bind_reuse_port(address, flow_config.reader_count)
274                                    .map_err(ServerSocketError::SocketError)?
275                                    .into_iter().map(Arc::new).collect()
276                            } else {
277                                vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
278                            }
279                        } else {
280                            vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
281                        }
282                    }
283                }
284            };
285
286            let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
287            let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
288            let crypto_send = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
289            let crypto_recv = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
290            let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
291            flows.push(flow);
292        }
293        let max_data_payload = if max_data_payload == usize::MAX {
294            settings.mtu()
295        } else {
296            max_data_payload
297        };
298        info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
299
300        let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();
301
302        let router = Arc::new(Router {
303            flows,
304            sessions: RwLock::new(HashMap::new()),
305            users: Mutex::new(users),
306        });
307
308        Ok(Listener {
309            router,
310            secret: secret_arc,
311            identity_generator: self.identity_generator,
312            accept_tx,
313            accept_rx: Mutex::new(accept_rx),
314            max_data_payload,
315            settings,
316        })
317    }
318}
319
320/// Routing and session-lifecycle surface, shared by the `Listener` and every `ClientHandle` it produces.
321pub(crate) struct Router<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
322    flows: Vec<Arc<ServerFlowManager<T, AE>>>,
323    sessions: RwLock<HashMap<T, Arc<ServerSessionManager<T, AE>>>>,
324    users: Mutex<SharedMap<T, UserServerState>>,
325}
326
327impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Router<T, AE> {
328    /// Number of flow managers the router routes through.
329    #[inline]
330    pub(crate) fn flow_count(&self) -> usize {
331        self.flows.len()
332    }
333}
334
335/// Server-side listener that drives the handshake path and produces `ClientHandle`s.
336/// All routing and session lifecycle state lives in the shared [`Router`].
337pub struct Listener<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> {
338    router: Arc<Router<T, AE>>,
339    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
340    secret: ServerSecret<'static>,
341    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
342    secret: Arc<ServerSecret<'static>>,
343    identity_generator: IG,
344    accept_tx: NotifyQueueSender<ClientHandle<T, AE>>,
345    accept_rx: Mutex<NotifyQueueReceiver<ClientHandle<T, AE>>>,
346    /// Maximum user-data bytes per packet so the wire packet fits within MTU.
347    max_data_payload: usize,
348    settings: Arc<Settings<AE>>,
349}
350
351impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> Listener<T, AE, IG> {
352    /// Create initial user crypto state from a handshake key (fast mode).
353    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
354    #[inline]
355    fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
356        UserCryptoState::new(initial_key, self.secret.obfuscation_buffer())
357    }
358
359    /// Create initial user crypto state from a handshake key (full mode).
360    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
361    #[inline]
362    fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
363        UserCryptoState::new(initial_key)
364    }
365
366    /// Upgrade a user's crypto state from initial key to session key (fast mode).
367    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
368    #[inline]
369    fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
370        user_state.upgrade_crypto(session_key, self.secret.obfuscation_buffer());
371    }
372
373    /// Upgrade a user's crypto state from initial key to session key (full mode).
374    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
375    #[inline]
376    fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
377        user_state.upgrade_crypto(session_key);
378    }
379
380    /// Start the listener's background receive loops.
381    /// Must be called after build() to begin processing incoming packets.
382    ///
383    /// Each flow gets N+1 tasks (where N = number of SO_REUSEPORT sockets, normally 1):
384    /// - N **drain tasks**, one per socket, each calling `receive_raw` and immediately pushing
385    ///   raw packets into a shared bounded channel. If the route task is slow and the channel is
386    ///   full the packet is dropped, keeping the OS socket buffer empty at all times.
387    ///   When all N drain tasks exit the `Arc<BoundedNotifyQueueSender>` refcount reaches 0,
388    ///   dropping the sender and closing the channel so the route task terminates.
389    /// - A **route task** that pulls from the shared channel and calls `route_incoming`.
390    pub fn start(self: &Arc<Self>) -> impl Future<Output = ()> {
391        let drain_capacity = self.settings.get(&keys::DRAIN_CHANNEL_CAPACITY) as usize;
392
393        for (index, flow) in self.router.flows.iter().enumerate() {
394            let (drain_tx, mut drain_rx) = create_bounded_notify_queue(drain_capacity);
395            let drain_tx = Arc::new(drain_tx);
396
397            for (sock_index, sock) in flow.recv_socks().iter().enumerate() {
398                let drain_tx = Arc::clone(&drain_tx);
399                let sock = Arc::clone(sock);
400                let flow_drain = Arc::clone(flow);
401                let settings_drain = Arc::clone(&self.settings);
402                self.settings.executor().spawn(async move {
403                    loop {
404                        let recv_buf = settings_drain.pool().allocate_for_recv();
405                        match flow_drain.receive_raw(recv_buf, &sock).await {
406                            Ok(raw_packet) => drain_tx.push(raw_packet),
407                            Err(err) => {
408                                warn!("flow manager {index} socket {sock_index}: receive error: {err}");
409                                break;
410                            }
411                        }
412                    }
413                });
414            }
415            drop(drain_tx);
416
417            let listener = Arc::clone(self);
418            self.settings.executor().spawn(async move {
419                while let Some(raw_packet) = drain_rx.recv().await {
420                    listener.route_incoming(raw_packet, index).await;
421                }
422            });
423        }
424
425        async {}
426    }
427
428    /// Route an incoming packet to the appropriate session or create a new one.
429    async fn route_incoming(self: &Arc<Self>, raw_packet: RawReceivedPacket<T>, flow_index: usize) {
430        let identity = raw_packet.tailer.identity();
431
432        if raw_packet.tailer.flags().contains(PacketFlags::HANDSHAKE) {
433            self.handle_new_client(raw_packet, flow_index).await;
434            return;
435        }
436
437        let session = {
438            let sessions = self.router.sessions.read().await;
439            sessions.get(&identity).cloned()
440        };
441
442        if let Some(session) = session {
443            self.router.flows[flow_index].ensure_user(identity.clone(), session.counter()).await;
444            session.note_active_flow(flow_index);
445
446            let incoming = IncomingPacket {
447                body: raw_packet.body,
448                tailer: raw_packet.tailer,
449            };
450            if let Err(err) = session.process_incoming(incoming).await {
451                debug!("session processing error for {}: {}", identity.to_string(), err);
452                if matches!(err, SessionControllerError::ConnectionTerminated(_)) {
453                    self.router.remove_session(&identity).await;
454                }
455            }
456        } else {
457            debug!("packet from unknown identity {}, dropping", identity.to_string());
458        }
459    }
460
461    /// Handle a handshake from a new client: create session, send response, publish ClientHandle.
462    async fn handle_new_client(self: &Arc<Self>, mut raw_packet: RawReceivedPacket<T>, flow_index: usize) {
463        let handshake_transcript = raw_packet.handshake_transcript.take();
464        let original_wire_packet = raw_packet.original_wire_packet.take();
465        let source_addr = raw_packet.source_addr;
466        let Some((server_data, initial_key, client_initial_data)) = self.secret.decapsulate_handshake_server(raw_packet.body, self.settings.pool()) else {
467            if let Some(packet) = original_wire_packet {
468                debug!("handshake decapsulation failed from {source_addr} (body too short for crypto header), forwarding to probe handler");
469                self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
470            } else {
471                debug!("handshake decapsulation failed from {source_addr} and original wire packet unavailable, dropping");
472            }
473            return;
474        };
475
476        // Verify the handshake tailer with the initial-data encryption key just produced by the KEM decapsulation.
477        let verified = matches!((&handshake_transcript, &original_wire_packet), (Some(transcript), Some(_)) if verify_transcript_with_key(&initial_key, transcript).is_ok());
478        if !verified {
479            if let Some(packet) = original_wire_packet {
480                debug!("handshake tailer verification failed from {source_addr}, forwarding to probe handler");
481                self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
482            } else {
483                debug!("handshake packet from {source_addr} missing deferred transcript or wire packet, dropping");
484            }
485            return;
486        }
487
488        let client_version_identity = raw_packet.tailer.identity();
489        let handshake_pn = raw_packet.tailer.packet_number();
490        if !self.identity_generator.verify_version(client_version_identity.to_bytes()) {
491            {
492                let mut users = self.router.users.lock().await;
493                let crypto_state = self.make_initial_crypto_state(&initial_key);
494                users.insert(client_version_identity.clone(), UserServerState::new(crypto_state)).await;
495            }
496            self.router.flows[flow_index].register_user_binding(client_version_identity.clone(), raw_packet.source_addr, handshake_pn).await;
497            let pn = ((unix_timestamp_ms() / 1000) as u64) << 32;
498            let buf = self.settings.pool().allocate(Some(T::length()));
499            let tailer = Tailer::termination(buf, &client_version_identity, ReturnCode::VersionMismatch, pn);
500            if let Err(err) = self.router.flows[flow_index].send_packet(tailer.into_buffer(), false, false).await {
501                warn!("failed to send version mismatch rejection: {err}");
502            }
503            {
504                let mut users = self.router.users.lock().await;
505                users.remove(&client_version_identity).await;
506            }
507            self.router.flows[flow_index].remove_user(&client_version_identity).await;
508            return;
509        }
510
511        let identity = self.identity_generator.generate(client_initial_data.slice());
512        let server_initial_data = self.identity_generator.initial_data(&identity);
513
514        let (incoming_tx, incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
515        let router_weak: Weak<dyn OutgoingRouter<T>> = Arc::downgrade(&self.router) as Weak<dyn OutgoingRouter<T>>;
516
517        let (response_body, session_key) = self.secret.encapsulate_handshake_server(server_data, self.settings.pool(), server_initial_data.slice(), &initial_key);
518
519        let (session, response_packet, replacing) = {
520            let mut users = self.router.users.lock().await;
521            let replacing = users.contains_key(&identity);
522            if replacing {
523                debug!("re-handshake for {}: replacing existing session (last wins)", identity.to_string());
524                users.remove(&identity).await;
525            }
526            let initial_crypto_state = self.make_initial_crypto_state(&initial_key);
527            let result = ServerSessionManager::assemble_session(initial_crypto_state, response_body, raw_packet.tailer, identity.clone(), &mut users, incoming_tx, router_weak, self.router.flow_count(), self.settings.clone()).await;
528            match result {
529                Ok((session, response_packet)) => (session, response_packet, replacing),
530                Err(err) => {
531                    warn!("handshake failed: {err}");
532                    return;
533                }
534            }
535        };
536
537        if replacing {
538            self.router.sessions.write().await.remove(&identity);
539            for flow in &self.router.flows {
540                flow.remove_user(&identity).await;
541            }
542        }
543
544        self.router.flows[flow_index].register_user_binding(identity.clone(), raw_packet.source_addr, handshake_pn).await;
545        self.router.flows[flow_index].register_user(identity.clone(), session.counter()).await;
546
547        if let Err(err) = self.router.flows[flow_index].send_packet(response_packet, false, false).await {
548            warn!("failed to send handshake response: {err}");
549            self.router.users.lock().await.remove(&identity).await;
550            for flow in &self.router.flows {
551                flow.remove_user(&identity).await;
552            }
553            return;
554        }
555
556        {
557            let mut users = self.router.users.lock().await;
558            users
559                .modify(&identity, |user_state| {
560                    self.upgrade_user_crypto(user_state, &session_key);
561                })
562                .await;
563        }
564
565        session.note_active_flow(flow_index);
566
567        {
568            let mut sessions = self.router.sessions.write().await;
569            if sessions.contains_key(&identity) {
570                debug!("concurrent handshake for {}: last wins, displacing earlier session", identity.to_string());
571            }
572            sessions.insert(identity.clone(), Arc::clone(&session));
573        }
574
575        let client_handle = ClientHandle {
576            session,
577            identity: identity.clone(),
578            incoming_rx: Mutex::new(incoming_rx),
579            max_data_payload: self.max_data_payload,
580            settings: self.settings.clone(),
581            router: Arc::clone(&self.router),
582        };
583        self.accept_tx.push(client_handle);
584
585        info!("new client connected: {}", identity.to_string());
586    }
587
588    /// Wait for the next client connection and return a handle to it.
589    pub async fn accept(&self) -> Result<ClientHandle<T, AE>, ServerSocketError> {
590        self.accept_rx.lock().await.recv().await.ok_or(ServerSocketError::ListenerStopped)
591    }
592}
593
594/// OutgoingRouter implementation: selects an active flow via the per-session bitmask and sends the packet.
595#[async_trait]
596impl<T: IdentityType + Clone + Eq + Hash + Send + Sync + ToString + 'static, AE: AsyncExecutor + 'static> OutgoingRouter<T> for Router<T, AE> {
597    async fn route_packet(&self, packet: DynamicByteBuffer, identity: &T) -> bool {
598        let session = {
599            let sessions = self.sessions.read().await;
600            sessions.get(identity).cloned()
601        };
602        let Some(session) = session else {
603            return false;
604        };
605        let flow_idx = session.select_active_flow(self.flows.len());
606        if flow_idx < self.flows.len() {
607            self.flows[flow_idx].send_packet(packet, false, false).await.is_ok()
608        } else {
609            false
610        }
611    }
612
613    async fn remove_session(&self, identity: &T) {
614        if self.sessions.write().await.remove(identity).is_none() {
615            return;
616        }
617        self.users.lock().await.remove(identity).await;
618        for flow in &self.flows {
619            flow.remove_user(identity).await;
620        }
621        info!("client session removed: {}", identity.to_string());
622    }
623}
624
625/// Handle to a connected client, providing send/receive operations.
626/// Not cloneable — only one handle per connection.
627pub struct ClientHandle<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
628    session: Arc<ServerSessionManager<T, AE>>,
629    identity: T,
630    incoming_rx: Mutex<NotifyQueueReceiver<DynamicByteBuffer>>,
631    /// Maximum user-data bytes per packet so the wire packet fits within MTU.
632    max_data_payload: usize,
633    settings: Arc<Settings<AE>>,
634    router: Arc<Router<T, AE>>,
635}
636
637impl<T: IdentityType + Clone + Eq + Hash + Send + ToString, AE: AsyncExecutor> ClientHandle<T, AE> {
638    /// Send a packet using a pre-allocated buffer.
639    pub async fn send(&self, packet: DynamicByteBuffer) -> Result<(), ServerSocketError> {
640        let wire = self.session.prepare_outgoing(packet, false).await.map_err(ServerSocketError::SessionError)?;
641        if !self.router.route_packet(wire, &self.identity).await {
642            return Err(ServerSocketError::SessionError(SessionControllerError::FlowError(FlowControllerError::UserNotFound {
643                identity: self.identity.to_string(),
644            })));
645        }
646        Ok(())
647    }
648
649    /// Send a byte slice, splitting into payload-sized chunks so each wire packet fits within MTU.
650    ///
651    /// See `ClientSocket::send_bytes` — same fragmentation-only-when-needed +
652    /// `TYPHOON_SEND_BYTES_JITTER`-driven per-chunk length sampling applies
653    /// here for s2c traffic.
654    pub async fn send_bytes(&self, data: &[u8]) -> Result<(), ServerSocketError> {
655        let jitter = self.settings.get(&keys::SEND_BYTES_JITTER);
656        let chunk = self.settings.get(&keys::SEND_BYTES_CHUNK) as usize;
657        let mut offset = 0;
658        while offset < data.len() {
659            let remaining = data.len() - offset;
660            let chunk_size = if remaining <= self.max_data_payload {
661                remaining
662            } else {
663                jittered_chunk_size(self.max_data_payload, chunk, jitter)
664            };
665            let buffer = self.settings.pool().allocate(Some(chunk_size));
666            buffer.slice_mut().copy_from_slice(&data[offset..offset + chunk_size]);
667            self.send(buffer).await?;
668            offset += chunk_size;
669        }
670        Ok(())
671    }
672
673    /// Maximum user-data bytes per `send` call so the wire packet fits within MTU.
674    pub fn max_data_payload(&self) -> usize {
675        self.max_data_payload
676    }
677
678    /// Receive a packet, returning the decrypted payload as a buffer.
679    pub async fn receive(&self) -> Result<DynamicByteBuffer, ServerSocketError> {
680        let buf = self.incoming_rx.lock().await.recv().await.ok_or(ServerSocketError::ChannelClosed)?;
681        Ok(buf)
682    }
683
684    /// Receive a packet, returning the decrypted payload as a byte vector.
685    pub async fn receive_bytes(&self) -> Result<Vec<u8>, ServerSocketError> {
686        let buffer = self.receive().await?;
687        Ok(buffer.slice().to_vec())
688    }
689}
690
691impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Drop for ClientHandle<T, AE> {
692    /// Emit a TERMINATION packet and remove the session from the shared router before the handle is released.
693    fn drop(&mut self) {
694        let executor = self.settings.executor().clone();
695        let pn = (unix_timestamp_ms() / 1000) as u64 * (1u64 << 32);
696        let buf = self.settings.pool().allocate(Some(Tailer::<T>::len()));
697        let termination = Tailer::termination(buf, &self.identity, ReturnCode::Success, pn).into_buffer();
698        executor.block_on(async {
699            self.router.route_packet(termination, &self.identity).await;
700            self.router.remove_session(&self.identity).await;
701        });
702    }
703}