1use std::collections::HashMap;
2use std::net::SocketAddr;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU32;
5
6use log::{debug, info};
7use rand::Rng;
8use rand::seq::SliceRandom;
9
10use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
11use crate::cache::SharedValue;
12use crate::certificate::{CertificateError, ClientCertificate};
13use crate::crypto::{ClientCryptoTool, KEY_LENGTH, PAYLOAD_CRYPTO_OVERHEAD};
14use crate::flow::client::ClientFlowManager;
15use crate::flow::decoy::{DecoyFactory, random_decoy_factory};
16use crate::flow::probe::ProbeFactory;
17use crate::flow::{FlowConfig, FlowControllerError};
18use crate::session::{ClientSessionManager, SessionManager};
19use crate::settings::{Settings, keys};
20use crate::socket::error::ClientSocketError;
21use crate::tailer::{ClientConnectionHandler, IdentityType, Tailer};
22use crate::utils::random::{SupportRng, get_rng, jittered_chunk_size};
23use crate::utils::socket::Socket;
24use crate::utils::sync::{AsyncExecutor, Mutex, NotifyQueueReceiver, assert_runtime, create_notify_queue};
25
26pub struct ClientSocketBuilder<T: IdentityType + Clone, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler> {
28 settings: Option<Arc<Settings<AE>>>,
29 flow_overrides: HashMap<SocketAddr, FlowConfig>,
31 certificate: ClientCertificate,
32 initial_data_generator: CC,
33 decoy_factory: DecoyFactory<T, AE>,
34 probe_factory: Option<ProbeFactory<AE>>,
35}
36
37impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler + 'static> ClientSocketBuilder<T, AE, CC> {
38 pub fn new(certificate: ClientCertificate, initial_data_generator: CC) -> Self {
49 Self {
50 settings: None,
51 flow_overrides: HashMap::new(),
52 certificate,
53 initial_data_generator,
54 decoy_factory: random_decoy_factory(),
55 probe_factory: None,
56 }
57 }
58
59 pub fn with_settings(mut self, settings: Arc<Settings<AE>>) -> Self {
61 self.settings = Some(settings);
62 self
63 }
64
65 pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
67 self.decoy_factory = factory;
68 self
69 }
70
71 pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
73 self.decoy_factory = crate::flow::decoy::decoy_factory::<T, AE, DP>();
74 self
75 }
76
77 pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
79 self.probe_factory = Some(factory);
80 self
81 }
82
83 pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
85 self.probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
86 self
87 }
88
89 pub fn with_flow_config(mut self, addr: SocketAddr, config: FlowConfig) -> Self {
96 self.flow_overrides.insert(addr, config);
97 self
98 }
99
100 pub async fn build(mut self) -> Result<ClientSocket<T, AE, CC>, ClientSocketError> {
108 assert_runtime().map_err(ClientSocketError::UnsupportedRuntime)?;
109 let cert_addrs = self.certificate.addresses();
110 if cert_addrs.is_empty() {
111 return Err(ClientSocketError::CertificateError(CertificateError::NoAddresses));
112 }
113
114 let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
115
116 for addr in self.flow_overrides.keys() {
117 if !cert_addrs.contains(addr) {
118 return Err(ClientSocketError::AddressNotInCertificate(*addr));
119 }
120 }
121
122 let addr_configs: Vec<(SocketAddr, FlowConfig)> = if self.flow_overrides.is_empty() {
123 let mut rng = get_rng();
124 let n = rng.gen_range(1..=cert_addrs.len());
125 cert_addrs.choose_multiple(&mut rng, n).map(|&addr| (addr, FlowConfig::random(&settings))).collect()
126 } else {
127 self.flow_overrides.drain().collect()
128 };
129
130 let identity_bytes = T::from_bytes(self.initial_data_generator.version(T::length()).slice());
131 let static_key = get_rng().random_byte_buffer::<KEY_LENGTH>();
132 let cipher = SharedValue::new(ClientCryptoTool::new(self.certificate.clone(), identity_bytes, &static_key));
133
134 let tailer_wire_len = Tailer::<T>::encrypted_len_c2s();
135 let mut max_data_payload = usize::MAX;
136
137 let counter = Arc::new(AtomicU32::new(0));
139
140 let mut flows = Vec::with_capacity(addr_configs.len());
141 for (addr, config) in addr_configs {
142 config.assert(settings.mtu()).map_err(ClientSocketError::FlowError)?;
143
144 max_data_payload = max_data_payload.min(config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
145
146 let sock = Socket::new(addr, None).await.map_err(ClientSocketError::SocketError)?;
147 let cipher_cache = cipher.create_cache();
148 let flow = ClientFlowManager::new(config, cipher_cache, settings.clone(), sock, self.probe_factory.as_ref(), &self.decoy_factory, Arc::clone(&counter), addr).await.map_err(ClientSocketError::FlowError)?;
149 flows.push(flow);
150 }
151 let max_data_payload = if max_data_payload == usize::MAX {
152 settings.mtu()
153 } else {
154 max_data_payload
155 };
156 if max_data_payload == 0 {
157 return Err(ClientSocketError::FlowError(FlowControllerError::AssertionFailed {
158 message: "flow configuration leaves no room for user data (max_data_payload = 0); reduce fake-body constant length or increase MTU".to_string(),
159 }));
160 }
161 info!("client socket built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
162
163 let session = ClientSessionManager::new(cipher, flows, settings.clone(), counter, self.initial_data_generator).map_err(ClientSocketError::SessionError)?;
164
165 let (incoming_tx, incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
166
167 let receive_session = session.clone();
168 settings.executor().spawn(async move {
169 loop {
170 match receive_session.receive_packet().await {
171 Ok(buffer) => {
172 incoming_tx.push(buffer);
173 }
174 Err(err) => {
175 debug!("client bg-recv: terminated: {err}");
176 break;
177 }
178 }
179 }
180 });
181
182 session.start().await.map_err(ClientSocketError::SessionError)?;
183
184 Ok(ClientSocket {
185 session,
186 incoming_rx: Mutex::new(incoming_rx),
187 max_data_payload,
188 settings,
189 })
190 }
191}
192
193pub struct ClientSocket<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler + 'static> {
195 session: Arc<ClientSessionManager<T, AE, Arc<ClientFlowManager<T, AE>>, CC>>,
196 incoming_rx: Mutex<NotifyQueueReceiver<DynamicByteBuffer>>,
197 max_data_payload: usize,
199 settings: Arc<Settings<AE>>,
200}
201
202impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static, CC: ClientConnectionHandler + 'static> ClientSocket<T, AE, CC> {
203 pub async fn send(&self, packet: DynamicByteBuffer) -> Result<(), ClientSocketError> {
205 self.session.send_packet(packet, false).await.map_err(ClientSocketError::SessionError)
206 }
207
208 pub async fn send_bytes(&self, data: &[u8]) -> Result<(), ClientSocketError> {
217 let jitter = self.settings.get(&keys::SEND_BYTES_JITTER);
218 let chunk = self.settings.get(&keys::SEND_BYTES_CHUNK) as usize;
219 let mut offset = 0;
220 while offset < data.len() {
221 let remaining = data.len() - offset;
222 let chunk_size = if remaining <= self.max_data_payload {
223 remaining
224 } else {
225 jittered_chunk_size(self.max_data_payload, chunk, jitter)
226 };
227 let buffer = self.settings.pool().allocate(Some(chunk_size));
228 buffer.slice_mut().copy_from_slice(&data[offset..offset + chunk_size]);
229 self.send(buffer).await?;
230 offset += chunk_size;
231 }
232 Ok(())
233 }
234
235 pub fn max_data_payload(&self) -> usize {
237 self.max_data_payload
238 }
239
240 pub async fn receive(&self) -> Result<DynamicByteBuffer, ClientSocketError> {
242 let buf = self.incoming_rx.lock().await.recv().await.ok_or(ClientSocketError::ChannelClosed)?;
243 Ok(buf)
244 }
245
246 pub async fn receive_bytes(&self) -> Result<Vec<u8>, ClientSocketError> {
248 let buffer = self.receive().await?;
249 Ok(buffer.slice().to_vec())
250 }
251}