Skip to main content

typhoon/flow/
server.rs

1/// Server-side flow manager implementation.
2use std::collections::HashMap;
3use std::future::Future;
4use std::hash::Hash;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::AtomicU32;
8use std::sync::{Arc, Weak};
9
10use log::{debug, warn};
11use rand::Rng;
12
13use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
14use crate::cache::DerivedValue;
15use crate::capture::{record_flow_config, record_server_send};
16use crate::crypto::{ObfuscationTranscript, ServerCryptoTool};
17use crate::defaults::NoopProbeHandler;
18use crate::flow::config::{FakeBodyMode, FakeHeaderConfig, FlowConfig};
19use crate::flow::decoy::{DecoyFactory, DecoyFlowSender, DecoyProvider};
20use crate::flow::error::FlowControllerError;
21use crate::flow::probe::{ActiveProbeHandler, ProbeFactory, ProbeFlowSender};
22use crate::settings::Settings;
23use crate::tailer::{IdentityType, PacketFlags, Tailer};
24use crate::utils::random::get_rng;
25use crate::utils::socket::{Socket, SocketError};
26use crate::utils::sync::{AsyncExecutor, Mutex, RwLock};
27
28/// Raw received packet from the server flow manager before session-level processing.
29pub(crate) struct RawReceivedPacket<T: IdentityType> {
30    pub(crate) body: DynamicByteBuffer,
31    pub(crate) tailer: Tailer<T>,
32    pub(crate) source_addr: SocketAddr,
33    pub(crate) handshake_transcript: Option<ObfuscationTranscript>,
34    pub(crate) original_wire_packet: Option<DynamicByteBuffer>,
35}
36
37/// Per-(flow, identity) path-binding state: the current return source address
38/// for this client on this flow, and the latest PN we have seen.
39///
40/// Path-rebinding gate: the stored `addr` is updated only when an authenticated
41/// non-handshake packet arrives with `pn > latest_pn` on this flow.  Out-of-order
42/// data packets are still accepted into session processing — they just don't
43/// move the binding.  Out-of-order decoys are dropped at the usual discardable
44/// exit point.  See PROTOCOL.md §Identification and rebinding.
45struct PathBinding {
46    addr: SocketAddr,
47    latest_pn: u64,
48}
49
50/// Server-side flow manager that handles per-user packet encryption, decoy traffic, and socket I/O.
51/// Per-user crypto state is in the global SharedMap (accessed via ServerCryptoTool).
52/// Send and receive crypto are split into independent instances so their locks never contend.
53/// Per-user source addresses and decoy providers are local to each flow manager instance.
54/// When built with multiple sockets (SO_REUSEPORT on Linux), each socket is polled by its own
55/// drain task in the listener; the kernel distributes incoming datagrams across all sockets.
56pub struct ServerFlowManager<T: IdentityType + Clone + Eq + Hash + Send + ToString, AE: AsyncExecutor> {
57    user_bindings: RwLock<HashMap<T, RwLock<PathBinding>>>,
58    decoy_providers: RwLock<HashMap<T, Arc<dyn DecoyProvider>>>,
59    decoy_factory: DecoyFactory<T, AE>,
60    crypto_send: Mutex<ServerCryptoTool<T>>,
61    crypto_recv: Mutex<ServerCryptoTool<T>>,
62    fake_body_mode: FakeBodyMode,
63    fake_header_mode: Mutex<FakeHeaderConfig>,
64    max_overhead: usize,
65    socks: Vec<Arc<Socket>>,
66    mtu: usize,
67    settings: Arc<Settings<AE>>,
68    probe_handler: Mutex<Box<dyn ActiveProbeHandler<AE>>>,
69}
70
71impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> ServerFlowManager<T, AE> {
72    /// Create a new server flow manager.
73    /// `crypto_send` and `crypto_recv` must be independent instances (e.g. two `create_cache()` calls
74    /// on the same `SharedMap`) so their mutexes never contend between the send and receive paths.
75    /// `socks` must contain at least one socket; on Linux with SO_REUSEPORT multiple sockets may be
76    /// supplied so that the listener can spawn one drain task per socket.
77    pub(crate) async fn new(config: FlowConfig, probe_factory: Option<&ProbeFactory<AE>>, crypto_send: ServerCryptoTool<T>, crypto_recv: ServerCryptoTool<T>, settings: Arc<Settings<AE>>, socks: Vec<Arc<Socket>>, decoy_factory: DecoyFactory<T, AE>) -> Arc<Self> {
78        let max_overhead = config.max_overhead();
79        let handler_factory = probe_factory.cloned();
80        let settings_for_start = Arc::clone(&settings);
81
82        let manager = Arc::new_cyclic(|_: &Weak<ServerFlowManager<T, AE>>| {
83            let handler: Box<dyn ActiveProbeHandler<AE>> = match &handler_factory {
84                Some(f) => f(),
85                None => Box::new(NoopProbeHandler),
86            };
87            let mtu = settings.mtu();
88            ServerFlowManager {
89                user_bindings: RwLock::new(HashMap::new()),
90                decoy_providers: RwLock::new(HashMap::new()),
91                decoy_factory,
92                crypto_send: Mutex::new(crypto_send),
93                crypto_recv: Mutex::new(crypto_recv),
94                fake_body_mode: config.fake_body_mode,
95                fake_header_mode: Mutex::new(config.fake_header_mode),
96                max_overhead,
97                socks,
98                mtu,
99                settings,
100                probe_handler: Mutex::new(handler),
101            }
102        });
103        let weak: Weak<dyn ProbeFlowSender> = Arc::downgrade(&manager) as Weak<dyn ProbeFlowSender>;
104        manager.probe_handler.lock().await.start(weak, settings_for_start).await;
105        manager
106    }
107
108    /// Return the slice of sockets owned by this flow manager.
109    /// The listener spawns one drain task per socket so that all sockets are polled concurrently.
110    pub(crate) fn recv_socks(&self) -> &[Arc<Socket>] {
111        &self.socks
112    }
113
114    /// Forward a wire packet to this flow's active probe handler.
115    /// Used by the listener to route handshake-flagged packets whose deferred HMAC verification failed.
116    pub(crate) async fn forward_to_probe(&self, packet: DynamicByteBuffer, source_addr: SocketAddr) {
117        self.probe_handler.lock().await.process(packet, Some(source_addr)).await;
118    }
119
120    /// Insert (or replace) the path binding for a user on this flow.
121    pub async fn register_user_binding(&self, id: T, addr: SocketAddr, latest_pn: u64) {
122        self.user_bindings.write().await.insert(
123            id,
124            RwLock::new(PathBinding {
125                addr,
126                latest_pn,
127            }),
128        );
129    }
130
131    /// Register a per-user decoy provider and start its background timer.
132    /// The user's crypto state must already be in the global SharedMap.
133    pub async fn register_user(self: &Arc<Self>, id: T, counter: Arc<AtomicU32>) {
134        let weak: Weak<Self> = Arc::downgrade(self);
135        let mgr: Weak<dyn DecoyFlowSender> = weak;
136        let dp = (self.decoy_factory)(mgr, self.settings.clone(), DerivedValue::constant(id.clone()), counter);
137        dp.start().await;
138        let decoy_name = dp.name();
139        self.decoy_providers.write().await.insert(id.clone(), Arc::from(dp));
140        if let Some(binding) = self.user_bindings.read().await.get(&id) {
141            let addr = binding.read().await.addr;
142            let header_len = self.max_overhead - self.fake_body_mode.max_len();
143            record_flow_config(addr, "s2c", || (self.fake_body_mode.description(), header_len, decoy_name));
144        }
145    }
146
147    /// Lazily register the per-user decoy provider on this flow if not already
148    /// present.  Called from the route task when a non-handshake packet from a
149    /// known session arrives on a flow that has not yet seen this user; the
150    /// path binding has already been anchored by `receive_raw`'s first-packet
151    /// branch by the time we get here.
152    pub async fn ensure_user(self: &Arc<Self>, id: T, counter: Arc<AtomicU32>) {
153        if !self.decoy_providers.read().await.contains_key(&id) {
154            self.register_user(id, counter).await;
155        }
156    }
157
158    /// Remove a user's decoy provider and path binding from this flow manager.
159    pub async fn remove_user(&self, id: &T) {
160        self.decoy_providers.write().await.remove(id);
161        self.user_bindings.write().await.remove(id);
162    }
163
164    /// Receive a raw packet, deobfuscating the tailer but returning the full body + tailer view.
165    /// For handshake packets, per-user verification is skipped (user not registered yet).
166    /// Decoy packets are filtered. Non-handshake packets are verified per-user and fed to decoy providers.
167    /// `sock` is the specific socket to read from; the caller (drain task) owns one socket per task.
168    pub(crate) async fn receive_raw(&self, packet: DynamicByteBuffer, sock: &Socket) -> Result<RawReceivedPacket<T>, FlowControllerError> {
169        let encrypted_tailer_len = Tailer::<T>::encrypted_len_c2s();
170
171        loop {
172            let (packet, source_addr) = sock.recv_from(packet.clone()).await.map_err(FlowControllerError::SocketError)?;
173
174            // Undersized wire packets (shorter than the encrypted tailer) can't be valid Typhoon; forward to the probe handler and keep draining.
175            if packet.len() < encrypted_tailer_len {
176                warn!("server flow: undersized wire packet from {source_addr} ({} < {})", packet.len(), encrypted_tailer_len);
177                self.probe_handler.lock().await.process(packet, Some(source_addr)).await;
178                continue;
179            }
180
181            let (encrypted_packet, encrypted_tailer) = packet.split_buf_end(encrypted_tailer_len);
182
183            // Deobfuscate tailer; for non-handshake packets verify immediately, for handshake packets defer the HMAC check until the listener has decapsulated the body and can recompute the initial-data encryption key.
184            let (tailer, handshake_transcript) = {
185                let mut crypto = self.crypto_recv.lock().await;
186                let (tailer_buf, transcript) = match crypto.deobfuscate_tailer(encrypted_tailer, self.settings.pool()) {
187                    Ok(result) => result,
188                    Err(err) => {
189                        warn!("server flow: tailer decryption failed from {source_addr}: {err}");
190                        self.probe_handler.lock().await.process(encrypted_packet.expand_end(encrypted_tailer_len), Some(source_addr)).await;
191                        continue;
192                    }
193                };
194                let Some(tailer) = Tailer::<T>::validated(tailer_buf, encrypted_packet.len()) else {
195                    warn!("server flow: malformed tailer from {source_addr} (size, flags or payload_length out of range)");
196                    self.probe_handler.lock().await.process(encrypted_packet.expand_end(encrypted_tailer_len), Some(source_addr)).await;
197                    continue;
198                };
199                if tailer.flags().contains(PacketFlags::HANDSHAKE) {
200                    (tailer, Some(transcript))
201                } else {
202                    let identity = tailer.identity();
203                    if let Err(err) = crypto.verify_tailer(&identity, transcript).await {
204                        debug!("error verifying packet tailer: {err}");
205                        self.probe_handler.lock().await.process(encrypted_packet.expand_end(encrypted_tailer_len), Some(source_addr)).await;
206                        continue;
207                    }
208                    (tailer, None)
209                }
210            };
211
212            let packet_flags = tailer.flags();
213            let identity = tailer.identity();
214
215            // For non-handshake packets, refresh the path binding for this
216            // identity on this flow and feed the decoy provider if one has
217            // already been instantiated locally.
218            if !packet_flags.contains(PacketFlags::HANDSHAKE) {
219                let pn = tailer.packet_number();
220                let bindings = self.user_bindings.read().await;
221                if let Some(binding_rw) = bindings.get(&identity) {
222                    let latest = binding_rw.read().await.latest_pn;
223                    if pn > latest {
224                        let mut binding = binding_rw.write().await;
225                        if pn > binding.latest_pn {
226                            binding.latest_pn = pn;
227                            if binding.addr != source_addr {
228                                binding.addr = source_addr;
229                            }
230                        }
231                    }
232                } else {
233                    drop(bindings);
234                    self.user_bindings.write().await.entry(identity.clone()).or_insert_with(|| {
235                        RwLock::new(PathBinding {
236                            addr: source_addr,
237                            latest_pn: pn,
238                        })
239                    });
240                }
241
242                let dp = self.decoy_providers.read().await.get(&identity).cloned();
243                if let Some(dp) = dp {
244                    let notified = dp.feed_input(encrypted_packet.clone(), tailer.buffer().clone()).await;
245                    if notified.is_none() {
246                        continue;
247                    }
248                }
249            }
250
251            // Decoy packets are always discarded at flow level.
252            if packet_flags.is_discardable() {
253                continue;
254            }
255
256            // Preserve a view over the original wire bytes so a failed deferred handshake verification can route the packet to the flow's probe handler.
257            let original_wire_packet = packet_flags.contains(PacketFlags::HANDSHAKE).then(|| encrypted_packet.expand_end(encrypted_tailer_len));
258
259            // For handshake packets, strip fake header/body using payload_length so the
260            // session layer receives only the raw handshake data.
261            let body = if packet_flags.contains(PacketFlags::HANDSHAKE) {
262                let payload_len = tailer.payload_length() as usize;
263                encrypted_packet.rebuffer_start(encrypted_packet.len().saturating_sub(payload_len))
264            } else {
265                encrypted_packet
266            };
267
268            debug!("server flow: received {packet_flags:?} packet from {source_addr}");
269            return Ok(RawReceivedPacket {
270                body,
271                tailer,
272                source_addr,
273                handshake_transcript,
274                original_wire_packet,
275            });
276        }
277    }
278}
279
280impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> ProbeFlowSender for ServerFlowManager<T, AE> {
281    fn send_raw<'a>(&'a self, packet: DynamicByteBuffer, target: SocketAddr) -> Pin<Box<dyn Future<Output = Result<(), SocketError>> + Send + 'a>> {
282        Box::pin(async move { self.socks[0].send_to(packet, target).await.map(|_| ()) })
283    }
284}
285
286impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> DecoyFlowSender for ServerFlowManager<T, AE> {
287    fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>> {
288        Box::pin(self.send_packet(packet, fallthrough, is_maintenance))
289    }
290}
291
292impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> ServerFlowManager<T, AE> {
293    /// Send a packet through this flow.
294    pub(crate) async fn send_packet(&self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Result<(), FlowControllerError> {
295        let tailer_len = Tailer::<T>::len();
296        let (body, tailer_buf) = packet.split_buf_end(tailer_len);
297        let identity = ServerCryptoTool::<T>::extract_identity(&tailer_buf);
298
299        // Feed decoy provider for rate tracking.
300        let notified_packet = {
301            let dp = self.decoy_providers.read().await.get(&identity).cloned();
302            if let Some(dp) = dp {
303                let notified = dp.feed_output(body, tailer_buf.clone()).await;
304                match notified {
305                    None => return Ok(()),
306                    Some(b) => b.expand_end(tailer_len),
307                }
308            } else {
309                body.expand_end(tailer_len)
310            }
311        };
312
313        let addr = {
314            let bindings = self.user_bindings.read().await;
315            let binding = bindings.get(&identity).ok_or_else(|| FlowControllerError::UserNotFound {
316                identity: identity.to_string(),
317            })?;
318            binding.read().await.addr
319        };
320
321        // Fallthrough decoys: drop the plaintext tailer, skip encryption, treat the remaining body as opaque random bytes.  Non-fallthrough path is unchanged.
322        let (encrypted_packet, packet_flags, data_len, tailer_overhead) = if fallthrough {
323            let body_only = notified_packet.rebuffer_end(notified_packet.len() - tailer_len);
324            let body_len = body_only.len();
325            (body_only, PacketFlags::DECOY, body_len, 0_usize)
326        } else {
327            let (packet_data, packet_tailer) = notified_packet.split_buf_end(tailer_len);
328            let flags = PacketFlags::from_bits_truncate(*packet_tailer.get(0));
329            let data_len = packet_data.len();
330            let encrypted_tailer = {
331                let mut crypto = self.crypto_send.lock().await;
332                crypto.obfuscate_tailer(packet_tailer, self.settings.pool()).await.map_err(FlowControllerError::TailerEncryption)?
333            };
334            let tailer_overhead = crate::crypto::TAILER_S2C_OVERHEAD;
335            let encrypted = packet_data.expand_end(encrypted_tailer.len());
336            (encrypted, flags, data_len, tailer_overhead)
337        };
338
339        // Add fake header and body (single lock scope: len + fill must be consistent).
340        let (full_packet, cap_header, cap_body) = {
341            let mut mode = self.fake_header_mode.lock().await;
342            let fake_header_len = mode.len();
343            let body_len = self.fake_body_mode.get_length(self.mtu, fake_header_len + encrypted_packet.len(), is_maintenance);
344            let full_packet_len = fake_header_len + body_len;
345            let full_packet = encrypted_packet.expand_start(full_packet_len);
346            mode.fill(full_packet.rebuffer_end(fake_header_len));
347            get_rng().fill(&mut full_packet.rebuffer_both(fake_header_len, full_packet_len));
348            (full_packet, fake_header_len, body_len)
349        };
350
351        if full_packet.len() == 0 {
352            return Ok(());
353        }
354        debug!("server flow: sending {packet_flags:?} packet to {addr}");
355        self.socks[0].send_to(full_packet, addr).await.map_err(FlowControllerError::SocketError)?;
356        record_server_send(addr, || {
357            let kind = if fallthrough {
358                "DecoyFallthrough"
359            } else if is_maintenance {
360                "DecoyMaintenance"
361            } else if packet_flags.is_discardable() {
362                "Decoy"
363            } else {
364                "Data"
365            };
366            let tailer_len = if fallthrough {
367                0
368            } else {
369                Tailer::<T>::len()
370            };
371            (kind, tailer_len, tailer_overhead, cap_header, data_len, cap_body)
372        });
373        Ok(())
374    }
375}