typhoon-protocol 0.1.0

A sample implementation of TYPHOON protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};

use async_trait::async_trait;
use log::{debug, info, warn};

use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
use crate::cache::SharedMap;
#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
use crate::certificate::ObfuscationBufferContainer;
use crate::certificate::{ServerKeyPair, ServerSecret};
use crate::crypto::{PAYLOAD_CRYPTO_OVERHEAD, ServerCryptoTool, UserCryptoState, UserServerState, verify_transcript_with_key};
use crate::flow::decoy::{DecoyFactory, random_decoy_factory};
use crate::flow::probe::ProbeFactory;
use crate::flow::server::{RawReceivedPacket, ServerFlowManager};
use crate::flow::{FlowConfig, FlowControllerError};
use crate::session::SessionControllerError;
use crate::session::server::{IncomingPacket, OutgoingRouter, ServerSessionManager};
use crate::settings::{Settings, keys};
use crate::socket::error::ServerSocketError;
use crate::tailer::{IdentityType, PacketFlags, ReturnCode, ServerConnectionHandler, Tailer};
use crate::utils::random::jittered_chunk_size;
use crate::utils::socket::Socket;
use crate::utils::sync::{AsyncExecutor, Mutex, NotifyQueueReceiver, NotifyQueueSender, RwLock, assert_runtime, create_bounded_notify_queue, create_notify_queue};
use crate::utils::unix_timestamp_ms;

/// Configuration for a single server flow manager.
pub struct ServerFlowConfiguration<T: IdentityType + Clone, AE: AsyncExecutor> {
    socket: Option<Socket>,
    address: Option<SocketAddr>,
    config: FlowConfig,
    /// Number of SO_REUSEPORT reader sockets to create (Linux only; default 1).
    /// Values > 1 are silently clamped to 1 on non-Linux platforms.
    reader_count: usize,
    /// Optional per-flow decoy factory. Falls back to the listener's default when `None`.
    decoy_factory: Option<DecoyFactory<T, AE>>,
    /// Optional per-flow probe factory. Falls back to the listener's default when `None`.
    probe_factory: Option<ProbeFactory<AE>>,
}

impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ServerFlowConfiguration<T, AE> {
    /// Create a configuration with a pre-built socket.
    pub fn new(config: FlowConfig, socket: Socket) -> Self {
        Self {
            socket: Some(socket),
            address: None,
            config,
            reader_count: 1,
            decoy_factory: None,
            probe_factory: None,
        }
    }

    /// Create a configuration that will bind a socket to the given address.
    pub fn with_address(config: FlowConfig, address: SocketAddr) -> Self {
        Self {
            socket: None,
            address: Some(address),
            config,
            reader_count: 1,
            decoy_factory: None,
            probe_factory: None,
        }
    }

    /// Set the number of SO_REUSEPORT reader sockets (Linux only).
    /// The kernel distributes incoming datagrams across all sockets by 4-tuple hash,
    /// enabling N concurrent `recv_from` drain tasks with no per-packet locking.
    /// Has no effect (silently clamped to 1) on non-Linux platforms.
    pub fn with_reader_count(mut self, count: usize) -> Self {
        self.reader_count = count.max(1);
        self
    }

    /// Override the decoy provider factory for this flow.
    /// When not set, the listener's default factory (random selection) is used.
    pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
        self.decoy_factory = Some(factory);
        self
    }

    /// Override the decoy provider for this flow using a concrete type.
    pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
        self.decoy_factory = Some(crate::flow::decoy::decoy_factory::<T, AE, DP>());
        self
    }

    /// Override the active probe handler factory for this flow.
    pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
        self.probe_factory = Some(factory);
        self
    }

    /// Override the active probe handler type for this flow.
    pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
        self.probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
        self
    }
}

/// Builder for constructing a `Listener`.
pub struct ListenerBuilder<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T>> {
    settings: Option<Arc<Settings<AE>>>,
    flow_configs: Vec<ServerFlowConfiguration<T, AE>>,
    secret: ServerSecret<'static>,
    identity_generator: IG,
    default_decoy_factory: DecoyFactory<T, AE>,
    default_probe_factory: Option<ProbeFactory<AE>>,
}

impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> ListenerBuilder<T, AE, IG> {
    /// Create a new builder with the given server key pair and identity generator.
    /// Decoy providers are randomly selected per-user by default.
    pub fn new(key_pair: ServerKeyPair, identity_generator: IG) -> Self {
        Self {
            settings: None,
            flow_configs: Vec::new(),
            secret: key_pair.into_server_secret(),
            identity_generator,
            default_decoy_factory: random_decoy_factory(),
            default_probe_factory: None,
        }
    }

    /// Set custom settings.
    pub fn with_settings(mut self, settings: Arc<Settings<AE>>) -> Self {
        self.settings = Some(settings);
        self
    }

    /// Override the default decoy factory used for flows that have no per-flow override.
    pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
        self.default_decoy_factory = factory;
        self
    }

    /// Override the default decoy provider type for all flows without a per-flow override.
    pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
        self.default_decoy_factory = crate::flow::decoy::decoy_factory::<T, AE, DP>();
        self
    }

    /// Set the default active probe handler factory for flows that have no per-flow override.
    pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
        self.default_probe_factory = Some(factory);
        self
    }

    /// Set the default active probe handler type for flows without a per-flow override.
    pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
        self.default_probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
        self
    }

    /// Append a single flow manager configuration.
    pub fn add_flow(mut self, config: ServerFlowConfiguration<T, AE>) -> Self {
        self.flow_configs.push(config);
        self
    }

    /// Set all flow manager configurations at once.
    pub fn with_flows(mut self, configs: Vec<ServerFlowConfiguration<T, AE>>) -> Self {
        self.flow_configs = configs;
        self
    }

    /// Build the listener, creating all flow managers.
    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
    pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
        assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
        if self.flow_configs.is_empty() {
            return Err(ServerSocketError::NoFlows);
        }

        let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
        let users: SharedMap<T, UserServerState> = SharedMap::new();
        let mut flows = Vec::with_capacity(self.flow_configs.len());

        let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
        let mut max_data_payload = usize::MAX;

        let obfs_buffer = self.secret.obfuscation_buffer();

        for flow_config in self.flow_configs.drain(..) {
            flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;

            max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));

            let socks: Vec<Arc<Socket>> = if let Some(socket) = flow_config.socket {
                vec![Arc::new(socket)]
            } else {
                let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
                cfg_if::cfg_if! {
                    if #[cfg(target_os = "linux")] {
                        if flow_config.reader_count > 1 {
                            Socket::bind_reuse_port(address, flow_config.reader_count)
                                .map_err(ServerSocketError::SocketError)?
                                .into_iter().map(Arc::new).collect()
                        } else {
                            vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
                        }
                    } else {
                        vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
                    }
                }
            };

            let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
            let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
            let crypto_send = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
            let crypto_recv = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
            let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
            flows.push(flow);
        }
        let max_data_payload = if max_data_payload == usize::MAX {
            settings.mtu()
        } else {
            max_data_payload
        };
        info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());

        let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();

        let router = Arc::new(Router {
            flows,
            sessions: RwLock::new(HashMap::new()),
            users: Mutex::new(users),
        });

        Ok(Listener {
            router,
            secret: self.secret,
            identity_generator: self.identity_generator,
            accept_tx,
            accept_rx: Mutex::new(accept_rx),
            max_data_payload,
            settings,
        })
    }

    /// Build the listener, creating all flow managers (full mode).
    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
    pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
        assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
        if self.flow_configs.is_empty() {
            return Err(ServerSocketError::NoFlows);
        }

        let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
        let users: SharedMap<T, UserServerState> = SharedMap::new();
        let mut flows = Vec::with_capacity(self.flow_configs.len());

        let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
        let mut max_data_payload = usize::MAX;

        let secret_arc = Arc::new(self.secret);

        for flow_config in self.flow_configs.drain(..) {
            flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;

            max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));

            let socks: Vec<Arc<Socket>> = match flow_config.socket {
                Some(socket) => vec![Arc::new(socket)],
                None => {
                    let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
                    cfg_if::cfg_if! {
                        if #[cfg(target_os = "linux")] {
                            if flow_config.reader_count > 1 {
                                Socket::bind_reuse_port(address, flow_config.reader_count)
                                    .map_err(ServerSocketError::SocketError)?
                                    .into_iter().map(Arc::new).collect()
                            } else {
                                vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
                            }
                        } else {
                            vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
                        }
                    }
                }
            };

            let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
            let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
            let crypto_send = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
            let crypto_recv = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
            let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
            flows.push(flow);
        }
        let max_data_payload = if max_data_payload == usize::MAX {
            settings.mtu()
        } else {
            max_data_payload
        };
        info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());

        let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();

        let router = Arc::new(Router {
            flows,
            sessions: RwLock::new(HashMap::new()),
            users: Mutex::new(users),
        });

        Ok(Listener {
            router,
            secret: secret_arc,
            identity_generator: self.identity_generator,
            accept_tx,
            accept_rx: Mutex::new(accept_rx),
            max_data_payload,
            settings,
        })
    }
}

/// Routing and session-lifecycle surface, shared by the `Listener` and every `ClientHandle` it produces.
pub(crate) struct Router<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
    flows: Vec<Arc<ServerFlowManager<T, AE>>>,
    sessions: RwLock<HashMap<T, Arc<ServerSessionManager<T, AE>>>>,
    users: Mutex<SharedMap<T, UserServerState>>,
}

impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Router<T, AE> {
    /// Number of flow managers the router routes through.
    #[inline]
    pub(crate) fn flow_count(&self) -> usize {
        self.flows.len()
    }
}

/// Server-side listener that drives the handshake path and produces `ClientHandle`s.
/// All routing and session lifecycle state lives in the shared [`Router`].
pub struct Listener<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> {
    router: Arc<Router<T, AE>>,
    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
    secret: ServerSecret<'static>,
    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
    secret: Arc<ServerSecret<'static>>,
    identity_generator: IG,
    accept_tx: NotifyQueueSender<ClientHandle<T, AE>>,
    accept_rx: Mutex<NotifyQueueReceiver<ClientHandle<T, AE>>>,
    /// Maximum user-data bytes per packet so the wire packet fits within MTU.
    max_data_payload: usize,
    settings: Arc<Settings<AE>>,
}

impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> Listener<T, AE, IG> {
    /// Create initial user crypto state from a handshake key (fast mode).
    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
    #[inline]
    fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
        UserCryptoState::new(initial_key, self.secret.obfuscation_buffer())
    }

    /// Create initial user crypto state from a handshake key (full mode).
    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
    #[inline]
    fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
        UserCryptoState::new(initial_key)
    }

    /// Upgrade a user's crypto state from initial key to session key (fast mode).
    #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
    #[inline]
    fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
        user_state.upgrade_crypto(session_key, self.secret.obfuscation_buffer());
    }

    /// Upgrade a user's crypto state from initial key to session key (full mode).
    #[cfg(any(feature = "full_software", feature = "full_hardware"))]
    #[inline]
    fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
        user_state.upgrade_crypto(session_key);
    }

    /// Start the listener's background receive loops.
    /// Must be called after build() to begin processing incoming packets.
    ///
    /// Each flow gets N+1 tasks (where N = number of SO_REUSEPORT sockets, normally 1):
    /// - N **drain tasks**, one per socket, each calling `receive_raw` and immediately pushing
    ///   raw packets into a shared bounded channel. If the route task is slow and the channel is
    ///   full the packet is dropped, keeping the OS socket buffer empty at all times.
    ///   When all N drain tasks exit the `Arc<BoundedNotifyQueueSender>` refcount reaches 0,
    ///   dropping the sender and closing the channel so the route task terminates.
    /// - A **route task** that pulls from the shared channel and calls `route_incoming`.
    pub fn start(self: &Arc<Self>) -> impl Future<Output = ()> {
        let drain_capacity = self.settings.get(&keys::DRAIN_CHANNEL_CAPACITY) as usize;

        for (index, flow) in self.router.flows.iter().enumerate() {
            let (drain_tx, mut drain_rx) = create_bounded_notify_queue(drain_capacity);
            let drain_tx = Arc::new(drain_tx);

            for (sock_index, sock) in flow.recv_socks().iter().enumerate() {
                let drain_tx = Arc::clone(&drain_tx);
                let sock = Arc::clone(sock);
                let flow_drain = Arc::clone(flow);
                let settings_drain = Arc::clone(&self.settings);
                self.settings.executor().spawn(async move {
                    loop {
                        let recv_buf = settings_drain.pool().allocate_for_recv();
                        match flow_drain.receive_raw(recv_buf, &sock).await {
                            Ok(raw_packet) => drain_tx.push(raw_packet),
                            Err(err) => {
                                warn!("flow manager {index} socket {sock_index}: receive error: {err}");
                                break;
                            }
                        }
                    }
                });
            }
            drop(drain_tx);

            let listener = Arc::clone(self);
            self.settings.executor().spawn(async move {
                while let Some(raw_packet) = drain_rx.recv().await {
                    listener.route_incoming(raw_packet, index).await;
                }
            });
        }

        async {}
    }

    /// Route an incoming packet to the appropriate session or create a new one.
    async fn route_incoming(self: &Arc<Self>, raw_packet: RawReceivedPacket<T>, flow_index: usize) {
        let identity = raw_packet.tailer.identity();

        if raw_packet.tailer.flags().contains(PacketFlags::HANDSHAKE) {
            self.handle_new_client(raw_packet, flow_index).await;
            return;
        }

        let session = {
            let sessions = self.router.sessions.read().await;
            sessions.get(&identity).cloned()
        };

        if let Some(session) = session {
            self.router.flows[flow_index].ensure_user(identity.clone(), session.counter()).await;
            session.note_active_flow(flow_index);

            let incoming = IncomingPacket {
                body: raw_packet.body,
                tailer: raw_packet.tailer,
            };
            if let Err(err) = session.process_incoming(incoming).await {
                debug!("session processing error for {}: {}", identity.to_string(), err);
                if matches!(err, SessionControllerError::ConnectionTerminated(_)) {
                    self.router.remove_session(&identity).await;
                }
            }
        } else {
            debug!("packet from unknown identity {}, dropping", identity.to_string());
        }
    }

    /// Handle a handshake from a new client: create session, send response, publish ClientHandle.
    async fn handle_new_client(self: &Arc<Self>, mut raw_packet: RawReceivedPacket<T>, flow_index: usize) {
        let handshake_transcript = raw_packet.handshake_transcript.take();
        let original_wire_packet = raw_packet.original_wire_packet.take();
        let source_addr = raw_packet.source_addr;
        let Some((server_data, initial_key, client_initial_data)) = self.secret.decapsulate_handshake_server(raw_packet.body, self.settings.pool()) else {
            if let Some(packet) = original_wire_packet {
                debug!("handshake decapsulation failed from {source_addr} (body too short for crypto header), forwarding to probe handler");
                self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
            } else {
                debug!("handshake decapsulation failed from {source_addr} and original wire packet unavailable, dropping");
            }
            return;
        };

        // Verify the handshake tailer with the initial-data encryption key just produced by the KEM decapsulation.
        let verified = matches!((&handshake_transcript, &original_wire_packet), (Some(transcript), Some(_)) if verify_transcript_with_key(&initial_key, transcript).is_ok());
        if !verified {
            if let Some(packet) = original_wire_packet {
                debug!("handshake tailer verification failed from {source_addr}, forwarding to probe handler");
                self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
            } else {
                debug!("handshake packet from {source_addr} missing deferred transcript or wire packet, dropping");
            }
            return;
        }

        let client_version_identity = raw_packet.tailer.identity();
        let handshake_pn = raw_packet.tailer.packet_number();
        if !self.identity_generator.verify_version(client_version_identity.to_bytes()) {
            {
                let mut users = self.router.users.lock().await;
                let crypto_state = self.make_initial_crypto_state(&initial_key);
                users.insert(client_version_identity.clone(), UserServerState::new(crypto_state)).await;
            }
            self.router.flows[flow_index].register_user_binding(client_version_identity.clone(), raw_packet.source_addr, handshake_pn).await;
            let pn = ((unix_timestamp_ms() / 1000) as u64) << 32;
            let buf = self.settings.pool().allocate(Some(T::length()));
            let tailer = Tailer::termination(buf, &client_version_identity, ReturnCode::VersionMismatch, pn);
            if let Err(err) = self.router.flows[flow_index].send_packet(tailer.into_buffer(), false, false).await {
                warn!("failed to send version mismatch rejection: {err}");
            }
            {
                let mut users = self.router.users.lock().await;
                users.remove(&client_version_identity).await;
            }
            self.router.flows[flow_index].remove_user(&client_version_identity).await;
            return;
        }

        let identity = self.identity_generator.generate(client_initial_data.slice());
        let server_initial_data = self.identity_generator.initial_data(&identity);

        let (incoming_tx, incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
        let router_weak: Weak<dyn OutgoingRouter<T>> = Arc::downgrade(&self.router) as Weak<dyn OutgoingRouter<T>>;

        let (response_body, session_key) = self.secret.encapsulate_handshake_server(server_data, self.settings.pool(), server_initial_data.slice(), &initial_key);

        let (session, response_packet, replacing) = {
            let mut users = self.router.users.lock().await;
            let replacing = users.contains_key(&identity);
            if replacing {
                debug!("re-handshake for {}: replacing existing session (last wins)", identity.to_string());
                users.remove(&identity).await;
            }
            let initial_crypto_state = self.make_initial_crypto_state(&initial_key);
            let result = ServerSessionManager::assemble_session(initial_crypto_state, response_body, raw_packet.tailer, identity.clone(), &mut users, incoming_tx, router_weak, self.router.flow_count(), self.settings.clone()).await;
            match result {
                Ok((session, response_packet)) => (session, response_packet, replacing),
                Err(err) => {
                    warn!("handshake failed: {err}");
                    return;
                }
            }
        };

        if replacing {
            self.router.sessions.write().await.remove(&identity);
            for flow in &self.router.flows {
                flow.remove_user(&identity).await;
            }
        }

        self.router.flows[flow_index].register_user_binding(identity.clone(), raw_packet.source_addr, handshake_pn).await;
        self.router.flows[flow_index].register_user(identity.clone(), session.counter()).await;

        if let Err(err) = self.router.flows[flow_index].send_packet(response_packet, false, false).await {
            warn!("failed to send handshake response: {err}");
            self.router.users.lock().await.remove(&identity).await;
            for flow in &self.router.flows {
                flow.remove_user(&identity).await;
            }
            return;
        }

        {
            let mut users = self.router.users.lock().await;
            users
                .modify(&identity, |user_state| {
                    self.upgrade_user_crypto(user_state, &session_key);
                })
                .await;
        }

        session.note_active_flow(flow_index);

        {
            let mut sessions = self.router.sessions.write().await;
            if sessions.contains_key(&identity) {
                debug!("concurrent handshake for {}: last wins, displacing earlier session", identity.to_string());
            }
            sessions.insert(identity.clone(), Arc::clone(&session));
        }

        let client_handle = ClientHandle {
            session,
            identity: identity.clone(),
            incoming_rx: Mutex::new(incoming_rx),
            max_data_payload: self.max_data_payload,
            settings: self.settings.clone(),
            router: Arc::clone(&self.router),
        };
        self.accept_tx.push(client_handle);

        info!("new client connected: {}", identity.to_string());
    }

    /// Wait for the next client connection and return a handle to it.
    pub async fn accept(&self) -> Result<ClientHandle<T, AE>, ServerSocketError> {
        self.accept_rx.lock().await.recv().await.ok_or(ServerSocketError::ListenerStopped)
    }
}

/// OutgoingRouter implementation: selects an active flow via the per-session bitmask and sends the packet.
#[async_trait]
impl<T: IdentityType + Clone + Eq + Hash + Send + Sync + ToString + 'static, AE: AsyncExecutor + 'static> OutgoingRouter<T> for Router<T, AE> {
    async fn route_packet(&self, packet: DynamicByteBuffer, identity: &T) -> bool {
        let session = {
            let sessions = self.sessions.read().await;
            sessions.get(identity).cloned()
        };
        let Some(session) = session else {
            return false;
        };
        let flow_idx = session.select_active_flow(self.flows.len());
        if flow_idx < self.flows.len() {
            self.flows[flow_idx].send_packet(packet, false, false).await.is_ok()
        } else {
            false
        }
    }

    async fn remove_session(&self, identity: &T) {
        if self.sessions.write().await.remove(identity).is_none() {
            return;
        }
        self.users.lock().await.remove(identity).await;
        for flow in &self.flows {
            flow.remove_user(identity).await;
        }
        info!("client session removed: {}", identity.to_string());
    }
}

/// Handle to a connected client, providing send/receive operations.
/// Not cloneable — only one handle per connection.
pub struct ClientHandle<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
    session: Arc<ServerSessionManager<T, AE>>,
    identity: T,
    incoming_rx: Mutex<NotifyQueueReceiver<DynamicByteBuffer>>,
    /// Maximum user-data bytes per packet so the wire packet fits within MTU.
    max_data_payload: usize,
    settings: Arc<Settings<AE>>,
    router: Arc<Router<T, AE>>,
}

impl<T: IdentityType + Clone + Eq + Hash + Send + ToString, AE: AsyncExecutor> ClientHandle<T, AE> {
    /// Send a packet using a pre-allocated buffer.
    pub async fn send(&self, packet: DynamicByteBuffer) -> Result<(), ServerSocketError> {
        let wire = self.session.prepare_outgoing(packet, false).await.map_err(ServerSocketError::SessionError)?;
        if !self.router.route_packet(wire, &self.identity).await {
            return Err(ServerSocketError::SessionError(SessionControllerError::FlowError(FlowControllerError::UserNotFound {
                identity: self.identity.to_string(),
            })));
        }
        Ok(())
    }

    /// Send a byte slice, splitting into payload-sized chunks so each wire packet fits within MTU.
    ///
    /// See `ClientSocket::send_bytes` — same fragmentation-only-when-needed +
    /// `TYPHOON_SEND_BYTES_JITTER`-driven per-chunk length sampling applies
    /// here for s2c traffic.
    pub async fn send_bytes(&self, data: &[u8]) -> Result<(), ServerSocketError> {
        let jitter = self.settings.get(&keys::SEND_BYTES_JITTER);
        let chunk = self.settings.get(&keys::SEND_BYTES_CHUNK) as usize;
        let mut offset = 0;
        while offset < data.len() {
            let remaining = data.len() - offset;
            let chunk_size = if remaining <= self.max_data_payload {
                remaining
            } else {
                jittered_chunk_size(self.max_data_payload, chunk, jitter)
            };
            let buffer = self.settings.pool().allocate(Some(chunk_size));
            buffer.slice_mut().copy_from_slice(&data[offset..offset + chunk_size]);
            self.send(buffer).await?;
            offset += chunk_size;
        }
        Ok(())
    }

    /// Maximum user-data bytes per `send` call so the wire packet fits within MTU.
    pub fn max_data_payload(&self) -> usize {
        self.max_data_payload
    }

    /// Receive a packet, returning the decrypted payload as a buffer.
    pub async fn receive(&self) -> Result<DynamicByteBuffer, ServerSocketError> {
        let buf = self.incoming_rx.lock().await.recv().await.ok_or(ServerSocketError::ChannelClosed)?;
        Ok(buf)
    }

    /// Receive a packet, returning the decrypted payload as a byte vector.
    pub async fn receive_bytes(&self) -> Result<Vec<u8>, ServerSocketError> {
        let buffer = self.receive().await?;
        Ok(buffer.slice().to_vec())
    }
}

impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Drop for ClientHandle<T, AE> {
    /// Emit a TERMINATION packet and remove the session from the shared router before the handle is released.
    fn drop(&mut self) {
        let executor = self.settings.executor().clone();
        let pn = (unix_timestamp_ms() / 1000) as u64 * (1u64 << 32);
        let buf = self.settings.pool().allocate(Some(Tailer::<T>::len()));
        let termination = Tailer::termination(buf, &self.identity, ReturnCode::Success, pn).into_buffer();
        executor.block_on(async {
            self.router.route_packet(termination, &self.identity).await;
            self.router.remove_session(&self.identity).await;
        });
    }
}