Skip to main content

typhoon/flow/
client.rs

1/// Client-side flow manager implementation.
2use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::atomic::AtomicU32;
6use std::sync::{Arc, Weak};
7
8use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
9use crate::cache::CachedValue;
10use crate::capture::{CaptureContext, record_flow_config};
11use crate::crypto::ClientCryptoTool;
12use crate::defaults::NoopProbeHandler;
13use crate::flow::common::{FlowManager, FlowReceiveInternal, FlowSendInternal, ProcessIncomingResult};
14use crate::flow::config::FlowConfig;
15use crate::flow::decoy::{DecoyFactory, DecoyFlowSender, DecoyProvider};
16use crate::flow::error::FlowControllerError;
17use crate::flow::probe::{ActiveProbeHandler, ProbeFactory, ProbeFlowSender};
18use crate::settings::Settings;
19use crate::tailer::{IdentityType, Tailer};
20use crate::utils::socket::{Socket, SocketError};
21use crate::utils::sync::{AsyncExecutor, Mutex};
22
23/// Client-side flow manager that handles packet encryption, decoy traffic, and socket I/O.
24pub struct ClientFlowManager<T: IdentityType + Clone, AE: AsyncExecutor> {
25    decoy_provider: Box<dyn DecoyProvider>,
26    send_internal: Mutex<FlowSendInternal<T>>,
27    receive_internal: Mutex<FlowReceiveInternal<T>>,
28    sock: Socket,
29    mtu: usize,
30    settings: Arc<Settings<AE>>,
31    probe_handler: Mutex<Box<dyn ActiveProbeHandler<AE>>>,
32}
33
34impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ClientFlowManager<T, AE> {
35    /// Create a new client flow manager.
36    pub(crate) async fn new(config: FlowConfig, cipher: CachedValue<ClientCryptoTool<T>>, settings: Arc<Settings<AE>>, sock: Socket, probe_factory: Option<&ProbeFactory<AE>>, decoy_factory: &DecoyFactory<T, AE>, counter: Arc<AtomicU32>, addr: SocketAddr) -> Result<Arc<Self>, FlowControllerError> {
37        let identity = cipher.derive(ClientCryptoTool::<T>::identity).map_err(FlowControllerError::MissingCache)?;
38        let send_provider = cipher.create_sibling().map_err(FlowControllerError::MissingCache)?;
39        let receive_provider = cipher.create_sibling().map_err(FlowControllerError::MissingCache)?;
40        let handler_factory = probe_factory.cloned();
41        let settings_for_start = Arc::clone(&settings);
42
43        let manager_ref = Arc::new_cyclic(|m: &Weak<ClientFlowManager<T, AE>>| {
44            let mgr: Weak<dyn DecoyFlowSender> = m.clone();
45            let decoy = decoy_factory(mgr, settings.clone(), identity, counter);
46            let probe_handler: Box<dyn ActiveProbeHandler<AE>> = match &handler_factory {
47                Some(f) => f(),
48                None => Box::new(NoopProbeHandler),
49            };
50            let mtu = settings.mtu();
51            record_flow_config(addr, "c2s", || (config.fake_body_mode.description(), config.fake_header_mode.len(), decoy.name()));
52            ClientFlowManager {
53                decoy_provider: decoy,
54                send_internal: Mutex::new(FlowSendInternal {
55                    provider: send_provider,
56                    config,
57                    capture: CaptureContext::new(addr),
58                }),
59                receive_internal: Mutex::new(FlowReceiveInternal {
60                    provider: receive_provider,
61                }),
62                sock,
63                mtu,
64                settings,
65                probe_handler: Mutex::new(probe_handler),
66            }
67        });
68        manager_ref.decoy_provider.start().await;
69        let weak: Weak<dyn ProbeFlowSender> = Arc::downgrade(&manager_ref) as Weak<dyn ProbeFlowSender>;
70        manager_ref.probe_handler.lock().await.start(weak, settings_for_start).await;
71        Ok(manager_ref)
72    }
73}
74
75impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ProbeFlowSender for ClientFlowManager<T, AE> {
76    fn send_raw<'a>(&'a self, packet: DynamicByteBuffer, _target: SocketAddr) -> Pin<Box<dyn Future<Output = Result<(), SocketError>> + Send + 'a>> {
77        Box::pin(async move { self.sock.send(packet).await.map(|_| ()) })
78    }
79}
80
81impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyFlowSender for ClientFlowManager<T, AE> {
82    fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>> {
83        Box::pin(<Self as FlowManager>::send_packet(self, packet, fallthrough, is_maintenance))
84    }
85}
86
87impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> FlowManager for ClientFlowManager<T, AE> {
88    async fn send_packet(&self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Result<(), FlowControllerError> {
89        let tailer_len = Tailer::<T>::len();
90        let (body, tailer_buf) = packet.split_buf_end(tailer_len);
91
92        let Some(notified_body) = self.decoy_provider.feed_output(body, tailer_buf.clone()).await else {
93            return Ok(());
94        };
95
96        let mut lock = self.send_internal.lock().await;
97        let full_packet = lock.prepare_outgoing(notified_body.expand_end(tailer_buf.len()), self.mtu, self.settings.pool(), fallthrough, is_maintenance)?;
98        if full_packet.len() > 0 {
99            self.sock.send(full_packet).await.map_err(FlowControllerError::SocketError)?;
100        }
101        Ok(())
102    }
103
104    async fn receive_packet(&self, packet: DynamicByteBuffer) -> Result<DynamicByteBuffer, FlowControllerError> {
105        loop {
106            let wire_packet = self.sock.recv(packet.clone()).await.map_err(FlowControllerError::SocketError)?;
107
108            let (body, tailer_buf) = {
109                let mut lock = self.receive_internal.lock().await;
110                match lock.deobfuscate_incoming(wire_packet.clone(), self.settings.pool())? {
111                    None => {
112                        self.probe_handler.lock().await.process(wire_packet, None).await;
113                        continue;
114                    }
115                    Some(pair) => pair,
116                }
117            };
118
119            let Some(notified_body) = self.decoy_provider.feed_input(body.clone(), tailer_buf.clone()).await else {
120                continue;
121            };
122
123            let incoming_packet = {
124                let lock = self.receive_internal.lock().await;
125                lock.process_with_tailer(notified_body, tailer_buf)
126            };
127            match incoming_packet {
128                ProcessIncomingResult::Decoy => {}
129                ProcessIncomingResult::Valid(result) => return Ok(result),
130            }
131        }
132    }
133}