Skip to main content

typhoon/socket/
client.rs

1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6use log::{debug, info};
7use rand::Rng;
8use rand::seq::SliceRandom;
9
10use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
11use crate::cache::SharedValue;
12use crate::certificate::{CertificateError, ClientCertificate};
13use crate::crypto::{ClientCryptoTool, KEY_LENGTH, PAYLOAD_CRYPTO_OVERHEAD};
14use crate::flow::client::ClientFlowManager;
15use crate::flow::decoy::{DecoyFactory, random_decoy_factory};
16use crate::flow::probe::ProbeFactory;
17use crate::flow::{FlowConfig, FlowControllerError};
18use crate::session::{ClientSessionManager, SessionManager};
19use crate::settings::{Settings, keys};
20use crate::socket::error::ClientSocketError;
21use crate::tailer::{ClientConnectionHandler, IdentityType, Tailer};
22use crate::utils::random::{SupportRng, get_rng, jittered_chunk_size};
23use crate::utils::socket::Socket;
24use crate::utils::sync::{AsyncExecutor, Mutex, NotifyQueueReceiver, assert_runtime, create_notify_queue};
25
26/// Builder for constructing a `ClientSocket`.
27pub struct ClientSocketBuilder<T: IdentityType + Clone, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler> {
28    settings: Option<Arc<Settings<AE>>>,
29    /// Per-address flow config overrides. Empty means auto-fill mode (random subset of addresses).
30    flow_overrides: HashMap<SocketAddr, FlowConfig>,
31    certificate: ClientCertificate,
32    initial_data_generator: CC,
33    decoy_factory: DecoyFactory<T, AE>,
34    probe_factory: Option<ProbeFactory<AE>>,
35}
36
37impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler + 'static> ClientSocketBuilder<T, AE, CC> {
38    /// Create a new builder with the given certificate and client connection handler.
39    /// Decoy providers are randomly selected per-flow by default.
40    ///
41    /// The certificate must contain at least one server address; otherwise `build` will return
42    /// [`CertificateError::NoAddresses`](crate::certificate::CertificateError::NoAddresses).
43    ///
44    /// By default, a random number of addresses (1 to the total in the certificate) is selected
45    /// automatically, each with a random [`FlowConfig`].  Call
46    /// [`with_flow_config`](Self::with_flow_config) one or more times to opt out of
47    /// auto-selection and configure exactly which flows to open.
48    pub fn new(certificate: ClientCertificate, initial_data_generator: CC) -> Self {
49        Self {
50            settings: None,
51            flow_overrides: HashMap::new(),
52            certificate,
53            initial_data_generator,
54            decoy_factory: random_decoy_factory(),
55            probe_factory: None,
56        }
57    }
58
59    /// Set custom settings to use for the socket.
60    pub fn with_settings(mut self, settings: Arc<Settings<AE>>) -> Self {
61        self.settings = Some(settings);
62        self
63    }
64
65    /// Set the decoy factory used for all flows.
66    pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
67        self.decoy_factory = factory;
68        self
69    }
70
71    /// Set a fixed decoy provider type for all flows.
72    pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
73        self.decoy_factory = crate::flow::decoy::decoy_factory::<T, AE, DP>();
74        self
75    }
76
77    /// Set the active probe handler factory for all flows.
78    pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
79        self.probe_factory = Some(factory);
80        self
81    }
82
83    /// Set a fixed active probe handler type for all flows.
84    pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
85        self.probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
86        self
87    }
88
89    /// Set an explicit [`FlowConfig`] for a specific server address.
90    ///
91    /// Calling this method at least once disables auto-flow-selection: only the addresses
92    /// configured via this method will be connected.  The address must be present in the
93    /// certificate; otherwise `build` will return
94    /// [`ClientSocketError::AddressNotInCertificate`].
95    pub fn with_flow_config(mut self, addr: SocketAddr, config: FlowConfig) -> Self {
96        self.flow_overrides.insert(addr, config);
97        self
98    }
99
100    /// Build the client socket, validating all flow configs and creating underlying managers.
101    ///
102    /// Returns [`ClientSocketError::FlowError`] wrapping [`FlowControllerError::AssertionFailed`]
103    /// if the combined flow configuration leaves zero bytes available for user data
104    /// (e.g. `constant` fake-body mode with a per-flow constant length sampled from
105    /// `[TYPHOON_FAKE_BODY_CONSTANT_LENGTH_MIN, TYPHOON_FAKE_BODY_CONSTANT_LENGTH_MAX]` larger than
106    /// the remaining packet budget after protocol overhead).
107    pub async fn build(mut self) -> Result<ClientSocket<T, AE, CC>, ClientSocketError> {
108        assert_runtime().map_err(ClientSocketError::UnsupportedRuntime)?;
109        let cert_addrs = self.certificate.addresses();
110        if cert_addrs.is_empty() {
111            return Err(ClientSocketError::CertificateError(CertificateError::NoAddresses));
112        }
113
114        let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
115
116        for addr in self.flow_overrides.keys() {
117            if !cert_addrs.contains(addr) {
118                return Err(ClientSocketError::AddressNotInCertificate(*addr));
119            }
120        }
121
122        let addr_configs: Vec<(SocketAddr, FlowConfig)> = if self.flow_overrides.is_empty() {
123            let mut rng = get_rng();
124            let n = rng.gen_range(1..=cert_addrs.len());
125            cert_addrs.choose_multiple(&mut rng, n).map(|&addr| (addr, FlowConfig::random(&settings))).collect()
126        } else {
127            self.flow_overrides.drain().collect()
128        };
129
130        let identity_bytes = T::from_bytes(self.initial_data_generator.version(T::length()).slice());
131        let static_key = get_rng().random_byte_buffer::<KEY_LENGTH>();
132        let cipher = SharedValue::new(ClientCryptoTool::new(self.certificate.clone(), identity_bytes, &static_key));
133
134        let tailer_wire_len = Tailer::<T>::encrypted_len_c2s();
135        let mut max_data_payload = usize::MAX;
136
137        // Per-session monotonic packet-number counter, created before the flow managers so it can be shared with every decoy provider, the session manager, and the health-check provider — every emitter on this session advances the same sequence.
138        let counter = Arc::new(AtomicU32::new(0));
139
140        let mut flows = Vec::with_capacity(addr_configs.len());
141        for (addr, config) in addr_configs {
142            config.assert(settings.mtu()).map_err(ClientSocketError::FlowError)?;
143
144            max_data_payload = max_data_payload.min(config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
145
146            let sock = Socket::new(addr, None).await.map_err(ClientSocketError::SocketError)?;
147            let cipher_cache = cipher.create_cache();
148            let flow = ClientFlowManager::new(config, cipher_cache, settings.clone(), sock, self.probe_factory.as_ref(), &self.decoy_factory, Arc::clone(&counter), addr).await.map_err(ClientSocketError::FlowError)?;
149            flows.push(flow);
150        }
151        let max_data_payload = if max_data_payload == usize::MAX {
152            settings.mtu()
153        } else {
154            max_data_payload
155        };
156        if max_data_payload == 0 {
157            return Err(ClientSocketError::FlowError(FlowControllerError::AssertionFailed {
158                message: "flow configuration leaves no room for user data (max_data_payload = 0); reduce fake-body constant length or increase MTU".to_string(),
159            }));
160        }
161        info!("client socket built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
162
163        let session = ClientSessionManager::new(cipher, flows, settings.clone(), counter, self.initial_data_generator).map_err(ClientSocketError::SessionError)?;
164
165        let (incoming_tx, incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
166
167        let receive_session = session.clone();
168        settings.executor().spawn(async move {
169            loop {
170                match receive_session.receive_packet().await {
171                    Ok(buffer) => {
172                        incoming_tx.push(buffer);
173                    }
174                    Err(err) => {
175                        debug!("client bg-recv: terminated: {err}");
176                        break;
177                    }
178                }
179            }
180        });
181
182        session.start().await.map_err(ClientSocketError::SessionError)?;
183
184        Ok(ClientSocket {
185            session,
186            incoming_rx: Mutex::new(incoming_rx),
187            max_data_payload,
188            settings,
189        })
190    }
191}
192
193/// Client-side TYPHOON socket providing send/receive operations.
194pub struct ClientSocket<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler + 'static> {
195    session: Arc<ClientSessionManager<T, AE, Arc<ClientFlowManager<T, AE>>, CC>>,
196    incoming_rx: Mutex<NotifyQueueReceiver<DynamicByteBuffer>>,
197    /// Maximum user-data bytes per packet so the wire packet fits within MTU.
198    max_data_payload: usize,
199    settings: Arc<Settings<AE>>,
200}
201
202impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler + 'static> ClientSocket<T, AE, CC> {
203    /// Send a packet using a pre-allocated buffer.
204    pub async fn send(&self, packet: DynamicByteBuffer) -> Result<(), ClientSocketError> {
205        self.session.send_packet(packet, false).await.map_err(ClientSocketError::SessionError)
206    }
207
208    /// Send a byte slice, splitting into payload-sized chunks so each wire packet fits within MTU.
209    ///
210    /// When fragmentation is unavoidable (`remaining > max_data_payload`), each
211    /// non-final chunk is sized in `[max_data_payload * (1 - jitter), max_data_payload]`
212    /// — `jitter = TYPHOON_SEND_BYTES_JITTER` (default `0.0`, reproducing the
213    /// deterministic equal-chunk split).  The final chunk and any single-packet
214    /// send go through unfragmented to avoid synthesising a small-packet tail
215    /// that a passive observer could latch onto.
216    pub async fn send_bytes(&self, data: &[u8]) -> Result<(), ClientSocketError> {
217        let jitter = self.settings.get(&keys::SEND_BYTES_JITTER);
218        let chunk = self.settings.get(&keys::SEND_BYTES_CHUNK) as usize;
219        let mut offset = 0;
220        while offset < data.len() {
221            let remaining = data.len() - offset;
222            let chunk_size = if remaining <= self.max_data_payload {
223                remaining
224            } else {
225                jittered_chunk_size(self.max_data_payload, chunk, jitter)
226            };
227            let buffer = self.settings.pool().allocate(Some(chunk_size));
228            buffer.slice_mut().copy_from_slice(&data[offset..offset + chunk_size]);
229            self.send(buffer).await?;
230            offset += chunk_size;
231        }
232        Ok(())
233    }
234
235    /// Maximum user-data bytes per `send` call so the wire packet fits within MTU.
236    pub fn max_data_payload(&self) -> usize {
237        self.max_data_payload
238    }
239
240    /// Receive a packet, returning the decrypted payload as a buffer.
241    pub async fn receive(&self) -> Result<DynamicByteBuffer, ClientSocketError> {
242        let buf = self.incoming_rx.lock().await.recv().await.ok_or(ClientSocketError::ChannelClosed)?;
243        Ok(buf)
244    }
245
246    /// Receive a packet, returning the decrypted payload as a byte vector.
247    pub async fn receive_bytes(&self) -> Result<Vec<u8>, ClientSocketError> {
248        let buffer = self.receive().await?;
249        Ok(buffer.slice().to_vec())
250    }
251}