1use std::collections::HashMap;
3use std::future::Future;
4use std::hash::Hash;
5use std::net::SocketAddr;
6use std::pin::Pin;
7use std::sync::atomic::AtomicU32;
8use std::sync::{Arc, Weak};
9
10use log::{debug, warn};
11use rand::Rng;
12
13use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
14use crate::cache::DerivedValue;
15use crate::capture::{record_flow_config, record_server_send};
16use crate::crypto::{ObfuscationTranscript, ServerCryptoTool};
17use crate::defaults::NoopProbeHandler;
18use crate::flow::config::{FakeBodyMode, FakeHeaderConfig, FlowConfig};
19use crate::flow::decoy::{DecoyFactory, DecoyFlowSender, DecoyProvider};
20use crate::flow::error::FlowControllerError;
21use crate::flow::probe::{ActiveProbeHandler, ProbeFactory, ProbeFlowSender};
22use crate::settings::Settings;
23use crate::tailer::{IdentityType, PacketFlags, Tailer};
24use crate::utils::random::get_rng;
25use crate::utils::socket::{Socket, SocketError};
26use crate::utils::sync::{AsyncExecutor, Mutex, RwLock};
27
28pub(crate) struct RawReceivedPacket<T: IdentityType> {
30 pub(crate) body: DynamicByteBuffer,
31 pub(crate) tailer: Tailer<T>,
32 pub(crate) source_addr: SocketAddr,
33 pub(crate) handshake_transcript: Option<ObfuscationTranscript>,
34 pub(crate) original_wire_packet: Option<DynamicByteBuffer>,
35}
36
37struct PathBinding {
46 addr: SocketAddr,
47 latest_pn: u64,
48}
49
50pub struct ServerFlowManager<T: IdentityType + Clone + Eq + Hash + Send + ToString, AE: AsyncExecutor> {
57 user_bindings: RwLock<HashMap<T, RwLock<PathBinding>>>,
58 decoy_providers: RwLock<HashMap<T, Arc<dyn DecoyProvider>>>,
59 decoy_factory: DecoyFactory<T, AE>,
60 crypto_send: Mutex<ServerCryptoTool<T>>,
61 crypto_recv: Mutex<ServerCryptoTool<T>>,
62 fake_body_mode: FakeBodyMode,
63 fake_header_mode: Mutex<FakeHeaderConfig>,
64 max_overhead: usize,
65 socks: Vec<Arc<Socket>>,
66 mtu: usize,
67 settings: Arc<Settings<AE>>,
68 probe_handler: Mutex<Box<dyn ActiveProbeHandler<AE>>>,
69}
70
71impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> ServerFlowManager<T, AE> {
72 pub(crate) async fn new(config: FlowConfig, probe_factory: Option<&ProbeFactory<AE>>, crypto_send: ServerCryptoTool<T>, crypto_recv: ServerCryptoTool<T>, settings: Arc<Settings<AE>>, socks: Vec<Arc<Socket>>, decoy_factory: DecoyFactory<T, AE>) -> Arc<Self> {
78 let max_overhead = config.max_overhead();
79 let handler_factory = probe_factory.cloned();
80 let settings_for_start = Arc::clone(&settings);
81
82 let manager = Arc::new_cyclic(|_: &Weak<ServerFlowManager<T, AE>>| {
83 let handler: Box<dyn ActiveProbeHandler<AE>> = match &handler_factory {
84 Some(f) => f(),
85 None => Box::new(NoopProbeHandler),
86 };
87 let mtu = settings.mtu();
88 ServerFlowManager {
89 user_bindings: RwLock::new(HashMap::new()),
90 decoy_providers: RwLock::new(HashMap::new()),
91 decoy_factory,
92 crypto_send: Mutex::new(crypto_send),
93 crypto_recv: Mutex::new(crypto_recv),
94 fake_body_mode: config.fake_body_mode,
95 fake_header_mode: Mutex::new(config.fake_header_mode),
96 max_overhead,
97 socks,
98 mtu,
99 settings,
100 probe_handler: Mutex::new(handler),
101 }
102 });
103 let weak: Weak<dyn ProbeFlowSender> = Arc::downgrade(&manager) as Weak<dyn ProbeFlowSender>;
104 manager.probe_handler.lock().await.start(weak, settings_for_start).await;
105 manager
106 }
107
108 pub(crate) fn recv_socks(&self) -> &[Arc<Socket>] {
111 &self.socks
112 }
113
114 pub(crate) async fn forward_to_probe(&self, packet: DynamicByteBuffer, source_addr: SocketAddr) {
117 self.probe_handler.lock().await.process(packet, Some(source_addr)).await;
118 }
119
120 pub async fn register_user_binding(&self, id: T, addr: SocketAddr, latest_pn: u64) {
122 self.user_bindings.write().await.insert(
123 id,
124 RwLock::new(PathBinding {
125 addr,
126 latest_pn,
127 }),
128 );
129 }
130
131 pub async fn register_user(self: &Arc<Self>, id: T, counter: Arc<AtomicU32>) {
134 let weak: Weak<Self> = Arc::downgrade(self);
135 let mgr: Weak<dyn DecoyFlowSender> = weak;
136 let dp = (self.decoy_factory)(mgr, self.settings.clone(), DerivedValue::constant(id.clone()), counter);
137 dp.start().await;
138 let decoy_name = dp.name();
139 self.decoy_providers.write().await.insert(id.clone(), Arc::from(dp));
140 if let Some(binding) = self.user_bindings.read().await.get(&id) {
141 let addr = binding.read().await.addr;
142 let header_len = self.max_overhead - self.fake_body_mode.max_len();
143 record_flow_config(addr, "s2c", || (self.fake_body_mode.description(), header_len, decoy_name));
144 }
145 }
146
147 pub async fn ensure_user(self: &Arc<Self>, id: T, counter: Arc<AtomicU32>) {
153 if !self.decoy_providers.read().await.contains_key(&id) {
154 self.register_user(id, counter).await;
155 }
156 }
157
158 pub async fn remove_user(&self, id: &T) {
160 self.decoy_providers.write().await.remove(id);
161 self.user_bindings.write().await.remove(id);
162 }
163
164 pub(crate) async fn receive_raw(&self, packet: DynamicByteBuffer, sock: &Socket) -> Result<RawReceivedPacket<T>, FlowControllerError> {
169 let encrypted_tailer_len = Tailer::<T>::encrypted_len_c2s();
170
171 loop {
172 let (packet, source_addr) = sock.recv_from(packet.clone()).await.map_err(FlowControllerError::SocketError)?;
173
174 if packet.len() < encrypted_tailer_len {
176 warn!("server flow: undersized wire packet from {source_addr} ({} < {})", packet.len(), encrypted_tailer_len);
177 self.probe_handler.lock().await.process(packet, Some(source_addr)).await;
178 continue;
179 }
180
181 let (encrypted_packet, encrypted_tailer) = packet.split_buf_end(encrypted_tailer_len);
182
183 let (tailer, handshake_transcript) = {
185 let mut crypto = self.crypto_recv.lock().await;
186 let (tailer_buf, transcript) = match crypto.deobfuscate_tailer(encrypted_tailer, self.settings.pool()) {
187 Ok(result) => result,
188 Err(err) => {
189 warn!("server flow: tailer decryption failed from {source_addr}: {err}");
190 self.probe_handler.lock().await.process(encrypted_packet.expand_end(encrypted_tailer_len), Some(source_addr)).await;
191 continue;
192 }
193 };
194 let Some(tailer) = Tailer::<T>::validated(tailer_buf, encrypted_packet.len()) else {
195 warn!("server flow: malformed tailer from {source_addr} (size, flags or payload_length out of range)");
196 self.probe_handler.lock().await.process(encrypted_packet.expand_end(encrypted_tailer_len), Some(source_addr)).await;
197 continue;
198 };
199 if tailer.flags().contains(PacketFlags::HANDSHAKE) {
200 (tailer, Some(transcript))
201 } else {
202 let identity = tailer.identity();
203 if let Err(err) = crypto.verify_tailer(&identity, transcript).await {
204 debug!("error verifying packet tailer: {err}");
205 self.probe_handler.lock().await.process(encrypted_packet.expand_end(encrypted_tailer_len), Some(source_addr)).await;
206 continue;
207 }
208 (tailer, None)
209 }
210 };
211
212 let packet_flags = tailer.flags();
213 let identity = tailer.identity();
214
215 if !packet_flags.contains(PacketFlags::HANDSHAKE) {
219 let pn = tailer.packet_number();
220 let bindings = self.user_bindings.read().await;
221 if let Some(binding_rw) = bindings.get(&identity) {
222 let latest = binding_rw.read().await.latest_pn;
223 if pn > latest {
224 let mut binding = binding_rw.write().await;
225 if pn > binding.latest_pn {
226 binding.latest_pn = pn;
227 if binding.addr != source_addr {
228 binding.addr = source_addr;
229 }
230 }
231 }
232 } else {
233 drop(bindings);
234 self.user_bindings.write().await.entry(identity.clone()).or_insert_with(|| {
235 RwLock::new(PathBinding {
236 addr: source_addr,
237 latest_pn: pn,
238 })
239 });
240 }
241
242 let dp = self.decoy_providers.read().await.get(&identity).cloned();
243 if let Some(dp) = dp {
244 let notified = dp.feed_input(encrypted_packet.clone(), tailer.buffer().clone()).await;
245 if notified.is_none() {
246 continue;
247 }
248 }
249 }
250
251 if packet_flags.is_discardable() {
253 continue;
254 }
255
256 let original_wire_packet = packet_flags.contains(PacketFlags::HANDSHAKE).then(|| encrypted_packet.expand_end(encrypted_tailer_len));
258
259 let body = if packet_flags.contains(PacketFlags::HANDSHAKE) {
262 let payload_len = tailer.payload_length() as usize;
263 encrypted_packet.rebuffer_start(encrypted_packet.len().saturating_sub(payload_len))
264 } else {
265 encrypted_packet
266 };
267
268 debug!("server flow: received {packet_flags:?} packet from {source_addr}");
269 return Ok(RawReceivedPacket {
270 body,
271 tailer,
272 source_addr,
273 handshake_transcript,
274 original_wire_packet,
275 });
276 }
277 }
278}
279
280impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> ProbeFlowSender for ServerFlowManager<T, AE> {
281 fn send_raw<'a>(&'a self, packet: DynamicByteBuffer, target: SocketAddr) -> Pin<Box<dyn Future<Output = Result<(), SocketError>> + Send + 'a>> {
282 Box::pin(async move { self.socks[0].send_to(packet, target).await.map(|_| ()) })
283 }
284}
285
286impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> DecoyFlowSender for ServerFlowManager<T, AE> {
287 fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>> {
288 Box::pin(self.send_packet(packet, fallthrough, is_maintenance))
289 }
290}
291
292impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> ServerFlowManager<T, AE> {
293 pub(crate) async fn send_packet(&self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Result<(), FlowControllerError> {
295 let tailer_len = Tailer::<T>::len();
296 let (body, tailer_buf) = packet.split_buf_end(tailer_len);
297 let identity = ServerCryptoTool::<T>::extract_identity(&tailer_buf);
298
299 let notified_packet = {
301 let dp = self.decoy_providers.read().await.get(&identity).cloned();
302 if let Some(dp) = dp {
303 let notified = dp.feed_output(body, tailer_buf.clone()).await;
304 match notified {
305 None => return Ok(()),
306 Some(b) => b.expand_end(tailer_len),
307 }
308 } else {
309 body.expand_end(tailer_len)
310 }
311 };
312
313 let addr = {
314 let bindings = self.user_bindings.read().await;
315 let binding = bindings.get(&identity).ok_or_else(|| FlowControllerError::UserNotFound {
316 identity: identity.to_string(),
317 })?;
318 binding.read().await.addr
319 };
320
321 let (encrypted_packet, packet_flags, data_len, tailer_overhead) = if fallthrough {
323 let body_only = notified_packet.rebuffer_end(notified_packet.len() - tailer_len);
324 let body_len = body_only.len();
325 (body_only, PacketFlags::DECOY, body_len, 0_usize)
326 } else {
327 let (packet_data, packet_tailer) = notified_packet.split_buf_end(tailer_len);
328 let flags = PacketFlags::from_bits_truncate(*packet_tailer.get(0));
329 let data_len = packet_data.len();
330 let encrypted_tailer = {
331 let mut crypto = self.crypto_send.lock().await;
332 crypto.obfuscate_tailer(packet_tailer, self.settings.pool()).await.map_err(FlowControllerError::TailerEncryption)?
333 };
334 let tailer_overhead = crate::crypto::TAILER_S2C_OVERHEAD;
335 let encrypted = packet_data.expand_end(encrypted_tailer.len());
336 (encrypted, flags, data_len, tailer_overhead)
337 };
338
339 let (full_packet, cap_header, cap_body) = {
341 let mut mode = self.fake_header_mode.lock().await;
342 let fake_header_len = mode.len();
343 let body_len = self.fake_body_mode.get_length(self.mtu, fake_header_len + encrypted_packet.len(), is_maintenance);
344 let full_packet_len = fake_header_len + body_len;
345 let full_packet = encrypted_packet.expand_start(full_packet_len);
346 mode.fill(full_packet.rebuffer_end(fake_header_len));
347 get_rng().fill(&mut full_packet.rebuffer_both(fake_header_len, full_packet_len));
348 (full_packet, fake_header_len, body_len)
349 };
350
351 if full_packet.len() == 0 {
352 return Ok(());
353 }
354 debug!("server flow: sending {packet_flags:?} packet to {addr}");
355 self.socks[0].send_to(full_packet, addr).await.map_err(FlowControllerError::SocketError)?;
356 record_server_send(addr, || {
357 let kind = if fallthrough {
358 "DecoyFallthrough"
359 } else if is_maintenance {
360 "DecoyMaintenance"
361 } else if packet_flags.is_discardable() {
362 "Decoy"
363 } else {
364 "Data"
365 };
366 let tailer_len = if fallthrough {
367 0
368 } else {
369 Tailer::<T>::len()
370 };
371 (kind, tailer_len, tailer_overhead, cap_header, data_len, cap_body)
372 });
373 Ok(())
374 }
375}