1use std::collections::HashMap;
2use std::future::Future;
3use std::hash::Hash;
4use std::net::SocketAddr;
5use std::sync::{Arc, Weak};
6
7use async_trait::async_trait;
8use log::{debug, info, warn};
9
10use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
11use crate::cache::SharedMap;
12#[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
13use crate::certificate::ObfuscationBufferContainer;
14use crate::certificate::{ServerKeyPair, ServerSecret};
15use crate::crypto::{PAYLOAD_CRYPTO_OVERHEAD, ServerCryptoTool, UserCryptoState, UserServerState, verify_transcript_with_key};
16use crate::flow::decoy::{DecoyFactory, random_decoy_factory};
17use crate::flow::probe::ProbeFactory;
18use crate::flow::server::{RawReceivedPacket, ServerFlowManager};
19use crate::flow::{FlowConfig, FlowControllerError};
20use crate::session::SessionControllerError;
21use crate::session::server::{IncomingPacket, OutgoingRouter, ServerSessionManager};
22use crate::settings::{Settings, keys};
23use crate::socket::error::ServerSocketError;
24use crate::tailer::{IdentityType, PacketFlags, ReturnCode, ServerConnectionHandler, Tailer};
25use crate::utils::random::jittered_chunk_size;
26use crate::utils::socket::Socket;
27use crate::utils::sync::{AsyncExecutor, Mutex, NotifyQueueReceiver, NotifyQueueSender, RwLock, assert_runtime, create_bounded_notify_queue, create_notify_queue};
28use crate::utils::unix_timestamp_ms;
29
30pub struct ServerFlowConfiguration<T: IdentityType + Clone, AE: AsyncExecutor> {
32 socket: Option<Socket>,
33 address: Option<SocketAddr>,
34 config: FlowConfig,
35 reader_count: usize,
38 decoy_factory: Option<DecoyFactory<T, AE>>,
40 probe_factory: Option<ProbeFactory<AE>>,
42}
43
44impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ServerFlowConfiguration<T, AE> {
45 pub fn new(config: FlowConfig, socket: Socket) -> Self {
47 Self {
48 socket: Some(socket),
49 address: None,
50 config,
51 reader_count: 1,
52 decoy_factory: None,
53 probe_factory: None,
54 }
55 }
56
57 pub fn with_address(config: FlowConfig, address: SocketAddr) -> Self {
59 Self {
60 socket: None,
61 address: Some(address),
62 config,
63 reader_count: 1,
64 decoy_factory: None,
65 probe_factory: None,
66 }
67 }
68
69 pub fn with_reader_count(mut self, count: usize) -> Self {
74 self.reader_count = count.max(1);
75 self
76 }
77
78 pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
81 self.decoy_factory = Some(factory);
82 self
83 }
84
85 pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
87 self.decoy_factory = Some(crate::flow::decoy::decoy_factory::<T, AE, DP>());
88 self
89 }
90
91 pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
93 self.probe_factory = Some(factory);
94 self
95 }
96
97 pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
99 self.probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
100 self
101 }
102}
103
104pub struct ListenerBuilder<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T>> {
106 settings: Option<Arc<Settings<AE>>>,
107 flow_configs: Vec<ServerFlowConfiguration<T, AE>>,
108 secret: ServerSecret<'static>,
109 identity_generator: IG,
110 default_decoy_factory: DecoyFactory<T, AE>,
111 default_probe_factory: Option<ProbeFactory<AE>>,
112}
113
114impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> ListenerBuilder<T, AE, IG> {
115 pub fn new(key_pair: ServerKeyPair, identity_generator: IG) -> Self {
118 Self {
119 settings: None,
120 flow_configs: Vec::new(),
121 secret: key_pair.into_server_secret(),
122 identity_generator,
123 default_decoy_factory: random_decoy_factory(),
124 default_probe_factory: None,
125 }
126 }
127
128 pub fn with_settings(mut self, settings: Arc<Settings<AE>>) -> Self {
130 self.settings = Some(settings);
131 self
132 }
133
134 pub fn with_decoy_factory(mut self, factory: DecoyFactory<T, AE>) -> Self {
136 self.default_decoy_factory = factory;
137 self
138 }
139
140 pub fn with_decoy<DP: crate::flow::decoy::DecoyCommunicationMode<T, AE> + 'static>(mut self) -> Self {
142 self.default_decoy_factory = crate::flow::decoy::decoy_factory::<T, AE, DP>();
143 self
144 }
145
146 pub fn with_probe_factory(mut self, factory: ProbeFactory<AE>) -> Self {
148 self.default_probe_factory = Some(factory);
149 self
150 }
151
152 pub fn with_probe<PM: crate::flow::probe::ActiveProbeHandler<AE> + Default + 'static>(mut self) -> Self {
154 self.default_probe_factory = Some(crate::flow::probe::probe_factory::<AE, PM>());
155 self
156 }
157
158 pub fn add_flow(mut self, config: ServerFlowConfiguration<T, AE>) -> Self {
160 self.flow_configs.push(config);
161 self
162 }
163
164 pub fn with_flows(mut self, configs: Vec<ServerFlowConfiguration<T, AE>>) -> Self {
166 self.flow_configs = configs;
167 self
168 }
169
170 #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
172 pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
173 assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
174 if self.flow_configs.is_empty() {
175 return Err(ServerSocketError::NoFlows);
176 }
177
178 let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
179 let users: SharedMap<T, UserServerState> = SharedMap::new();
180 let mut flows = Vec::with_capacity(self.flow_configs.len());
181
182 let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
183 let mut max_data_payload = usize::MAX;
184
185 let obfs_buffer = self.secret.obfuscation_buffer();
186
187 for flow_config in self.flow_configs.drain(..) {
188 flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;
189
190 max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
191
192 let socks: Vec<Arc<Socket>> = if let Some(socket) = flow_config.socket {
193 vec![Arc::new(socket)]
194 } else {
195 let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
196 cfg_if::cfg_if! {
197 if #[cfg(target_os = "linux")] {
198 if flow_config.reader_count > 1 {
199 Socket::bind_reuse_port(address, flow_config.reader_count)
200 .map_err(ServerSocketError::SocketError)?
201 .into_iter().map(Arc::new).collect()
202 } else {
203 vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
204 }
205 } else {
206 vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
207 }
208 }
209 };
210
211 let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
212 let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
213 let crypto_send = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
214 let crypto_recv = ServerCryptoTool::new(users.create_cache(), obfs_buffer);
215 let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
216 flows.push(flow);
217 }
218 let max_data_payload = if max_data_payload == usize::MAX {
219 settings.mtu()
220 } else {
221 max_data_payload
222 };
223 info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
224
225 let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();
226
227 let router = Arc::new(Router {
228 flows,
229 sessions: RwLock::new(HashMap::new()),
230 users: Mutex::new(users),
231 });
232
233 Ok(Listener {
234 router,
235 secret: self.secret,
236 identity_generator: self.identity_generator,
237 accept_tx,
238 accept_rx: Mutex::new(accept_rx),
239 max_data_payload,
240 settings,
241 })
242 }
243
244 #[cfg(any(feature = "full_software", feature = "full_hardware"))]
246 pub async fn build(mut self) -> Result<Listener<T, AE, IG>, ServerSocketError> {
247 assert_runtime().map_err(ServerSocketError::UnsupportedRuntime)?;
248 if self.flow_configs.is_empty() {
249 return Err(ServerSocketError::NoFlows);
250 }
251
252 let settings = self.settings.take().unwrap_or_else(|| Arc::new(Settings::default()));
253 let users: SharedMap<T, UserServerState> = SharedMap::new();
254 let mut flows = Vec::with_capacity(self.flow_configs.len());
255
256 let tailer_wire_len = Tailer::<T>::encrypted_len_s2c();
257 let mut max_data_payload = usize::MAX;
258
259 let secret_arc = Arc::new(self.secret);
260
261 for flow_config in self.flow_configs.drain(..) {
262 flow_config.config.assert(settings.mtu()).map_err(ServerSocketError::FlowError)?;
263
264 max_data_payload = max_data_payload.min(flow_config.config.max_user_payload(settings.mtu(), PAYLOAD_CRYPTO_OVERHEAD, tailer_wire_len));
265
266 let socks: Vec<Arc<Socket>> = match flow_config.socket {
267 Some(socket) => vec![Arc::new(socket)],
268 None => {
269 let address = flow_config.address.expect("ServerFlowConfiguration must have either socket or address");
270 cfg_if::cfg_if! {
271 if #[cfg(target_os = "linux")] {
272 if flow_config.reader_count > 1 {
273 Socket::bind_reuse_port(address, flow_config.reader_count)
274 .map_err(ServerSocketError::SocketError)?
275 .into_iter().map(Arc::new).collect()
276 } else {
277 vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
278 }
279 } else {
280 vec![Arc::new(Socket::bind(address).await.map_err(ServerSocketError::SocketError)?)]
281 }
282 }
283 }
284 };
285
286 let decoy_factory = flow_config.decoy_factory.unwrap_or_else(|| Arc::clone(&self.default_decoy_factory));
287 let probe_factory = flow_config.probe_factory.as_ref().or(self.default_probe_factory.as_ref());
288 let crypto_send = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
289 let crypto_recv = ServerCryptoTool::new(users.create_cache(), Arc::clone(&secret_arc));
290 let flow = ServerFlowManager::new(flow_config.config, probe_factory, crypto_send, crypto_recv, settings.clone(), socks, decoy_factory).await;
291 flows.push(flow);
292 }
293 let max_data_payload = if max_data_payload == usize::MAX {
294 settings.mtu()
295 } else {
296 max_data_payload
297 };
298 info!("listener built: max_data_payload={}B (mtu={}B, {} flow(s))", max_data_payload, settings.mtu(), flows.len());
299
300 let (accept_tx, accept_rx) = create_notify_queue::<ClientHandle<T, AE>>();
301
302 let router = Arc::new(Router {
303 flows,
304 sessions: RwLock::new(HashMap::new()),
305 users: Mutex::new(users),
306 });
307
308 Ok(Listener {
309 router,
310 secret: secret_arc,
311 identity_generator: self.identity_generator,
312 accept_tx,
313 accept_rx: Mutex::new(accept_rx),
314 max_data_payload,
315 settings,
316 })
317 }
318}
319
320pub(crate) struct Router<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
322 flows: Vec<Arc<ServerFlowManager<T, AE>>>,
323 sessions: RwLock<HashMap<T, Arc<ServerSessionManager<T, AE>>>>,
324 users: Mutex<SharedMap<T, UserServerState>>,
325}
326
327impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Router<T, AE> {
328 #[inline]
330 pub(crate) fn flow_count(&self) -> usize {
331 self.flows.len()
332 }
333}
334
335pub struct Listener<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> {
338 router: Arc<Router<T, AE>>,
339 #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
340 secret: ServerSecret<'static>,
341 #[cfg(any(feature = "full_software", feature = "full_hardware"))]
342 secret: Arc<ServerSecret<'static>>,
343 identity_generator: IG,
344 accept_tx: NotifyQueueSender<ClientHandle<T, AE>>,
345 accept_rx: Mutex<NotifyQueueReceiver<ClientHandle<T, AE>>>,
346 max_data_payload: usize,
348 settings: Arc<Settings<AE>>,
349}
350
351impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static, IG: ServerConnectionHandler<T> + 'static> Listener<T, AE, IG> {
352 #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
354 #[inline]
355 fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
356 UserCryptoState::new(initial_key, self.secret.obfuscation_buffer())
357 }
358
359 #[cfg(any(feature = "full_software", feature = "full_hardware"))]
361 #[inline]
362 fn make_initial_crypto_state(&self, initial_key: &impl ByteBuffer) -> UserCryptoState {
363 UserCryptoState::new(initial_key)
364 }
365
366 #[cfg(any(feature = "fast_software", feature = "fast_hardware"))]
368 #[inline]
369 fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
370 user_state.upgrade_crypto(session_key, self.secret.obfuscation_buffer());
371 }
372
373 #[cfg(any(feature = "full_software", feature = "full_hardware"))]
375 #[inline]
376 fn upgrade_user_crypto(&self, user_state: &mut UserServerState, session_key: &impl ByteBuffer) {
377 user_state.upgrade_crypto(session_key);
378 }
379
380 pub fn start(self: &Arc<Self>) -> impl Future<Output = ()> {
391 let drain_capacity = self.settings.get(&keys::DRAIN_CHANNEL_CAPACITY) as usize;
392
393 for (index, flow) in self.router.flows.iter().enumerate() {
394 let (drain_tx, mut drain_rx) = create_bounded_notify_queue(drain_capacity);
395 let drain_tx = Arc::new(drain_tx);
396
397 for (sock_index, sock) in flow.recv_socks().iter().enumerate() {
398 let drain_tx = Arc::clone(&drain_tx);
399 let sock = Arc::clone(sock);
400 let flow_drain = Arc::clone(flow);
401 let settings_drain = Arc::clone(&self.settings);
402 self.settings.executor().spawn(async move {
403 loop {
404 let recv_buf = settings_drain.pool().allocate_for_recv();
405 match flow_drain.receive_raw(recv_buf, &sock).await {
406 Ok(raw_packet) => drain_tx.push(raw_packet),
407 Err(err) => {
408 warn!("flow manager {index} socket {sock_index}: receive error: {err}");
409 break;
410 }
411 }
412 }
413 });
414 }
415 drop(drain_tx);
416
417 let listener = Arc::clone(self);
418 self.settings.executor().spawn(async move {
419 while let Some(raw_packet) = drain_rx.recv().await {
420 listener.route_incoming(raw_packet, index).await;
421 }
422 });
423 }
424
425 async {}
426 }
427
428 async fn route_incoming(self: &Arc<Self>, raw_packet: RawReceivedPacket<T>, flow_index: usize) {
430 let identity = raw_packet.tailer.identity();
431
432 if raw_packet.tailer.flags().contains(PacketFlags::HANDSHAKE) {
433 self.handle_new_client(raw_packet, flow_index).await;
434 return;
435 }
436
437 let session = {
438 let sessions = self.router.sessions.read().await;
439 sessions.get(&identity).cloned()
440 };
441
442 if let Some(session) = session {
443 self.router.flows[flow_index].ensure_user(identity.clone(), session.counter()).await;
444 session.note_active_flow(flow_index);
445
446 let incoming = IncomingPacket {
447 body: raw_packet.body,
448 tailer: raw_packet.tailer,
449 };
450 if let Err(err) = session.process_incoming(incoming).await {
451 debug!("session processing error for {}: {}", identity.to_string(), err);
452 if matches!(err, SessionControllerError::ConnectionTerminated(_)) {
453 self.router.remove_session(&identity).await;
454 }
455 }
456 } else {
457 debug!("packet from unknown identity {}, dropping", identity.to_string());
458 }
459 }
460
461 async fn handle_new_client(self: &Arc<Self>, mut raw_packet: RawReceivedPacket<T>, flow_index: usize) {
463 let handshake_transcript = raw_packet.handshake_transcript.take();
464 let original_wire_packet = raw_packet.original_wire_packet.take();
465 let source_addr = raw_packet.source_addr;
466 let Some((server_data, initial_key, client_initial_data)) = self.secret.decapsulate_handshake_server(raw_packet.body, self.settings.pool()) else {
467 if let Some(packet) = original_wire_packet {
468 debug!("handshake decapsulation failed from {source_addr} (body too short for crypto header), forwarding to probe handler");
469 self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
470 } else {
471 debug!("handshake decapsulation failed from {source_addr} and original wire packet unavailable, dropping");
472 }
473 return;
474 };
475
476 let verified = matches!((&handshake_transcript, &original_wire_packet), (Some(transcript), Some(_)) if verify_transcript_with_key(&initial_key, transcript).is_ok());
478 if !verified {
479 if let Some(packet) = original_wire_packet {
480 debug!("handshake tailer verification failed from {source_addr}, forwarding to probe handler");
481 self.router.flows[flow_index].forward_to_probe(packet, source_addr).await;
482 } else {
483 debug!("handshake packet from {source_addr} missing deferred transcript or wire packet, dropping");
484 }
485 return;
486 }
487
488 let client_version_identity = raw_packet.tailer.identity();
489 let handshake_pn = raw_packet.tailer.packet_number();
490 if !self.identity_generator.verify_version(client_version_identity.to_bytes()) {
491 {
492 let mut users = self.router.users.lock().await;
493 let crypto_state = self.make_initial_crypto_state(&initial_key);
494 users.insert(client_version_identity.clone(), UserServerState::new(crypto_state)).await;
495 }
496 self.router.flows[flow_index].register_user_binding(client_version_identity.clone(), raw_packet.source_addr, handshake_pn).await;
497 let pn = ((unix_timestamp_ms() / 1000) as u64) << 32;
498 let buf = self.settings.pool().allocate(Some(T::length()));
499 let tailer = Tailer::termination(buf, &client_version_identity, ReturnCode::VersionMismatch, pn);
500 if let Err(err) = self.router.flows[flow_index].send_packet(tailer.into_buffer(), false, false).await {
501 warn!("failed to send version mismatch rejection: {err}");
502 }
503 {
504 let mut users = self.router.users.lock().await;
505 users.remove(&client_version_identity).await;
506 }
507 self.router.flows[flow_index].remove_user(&client_version_identity).await;
508 return;
509 }
510
511 let identity = self.identity_generator.generate(client_initial_data.slice());
512 let server_initial_data = self.identity_generator.initial_data(&identity);
513
514 let (incoming_tx, incoming_rx) = create_notify_queue::<DynamicByteBuffer>();
515 let router_weak: Weak<dyn OutgoingRouter<T>> = Arc::downgrade(&self.router) as Weak<dyn OutgoingRouter<T>>;
516
517 let (response_body, session_key) = self.secret.encapsulate_handshake_server(server_data, self.settings.pool(), server_initial_data.slice(), &initial_key);
518
519 let (session, response_packet, replacing) = {
520 let mut users = self.router.users.lock().await;
521 let replacing = users.contains_key(&identity);
522 if replacing {
523 debug!("re-handshake for {}: replacing existing session (last wins)", identity.to_string());
524 users.remove(&identity).await;
525 }
526 let initial_crypto_state = self.make_initial_crypto_state(&initial_key);
527 let result = ServerSessionManager::assemble_session(initial_crypto_state, response_body, raw_packet.tailer, identity.clone(), &mut users, incoming_tx, router_weak, self.router.flow_count(), self.settings.clone()).await;
528 match result {
529 Ok((session, response_packet)) => (session, response_packet, replacing),
530 Err(err) => {
531 warn!("handshake failed: {err}");
532 return;
533 }
534 }
535 };
536
537 if replacing {
538 self.router.sessions.write().await.remove(&identity);
539 for flow in &self.router.flows {
540 flow.remove_user(&identity).await;
541 }
542 }
543
544 self.router.flows[flow_index].register_user_binding(identity.clone(), raw_packet.source_addr, handshake_pn).await;
545 self.router.flows[flow_index].register_user(identity.clone(), session.counter()).await;
546
547 if let Err(err) = self.router.flows[flow_index].send_packet(response_packet, false, false).await {
548 warn!("failed to send handshake response: {err}");
549 self.router.users.lock().await.remove(&identity).await;
550 for flow in &self.router.flows {
551 flow.remove_user(&identity).await;
552 }
553 return;
554 }
555
556 {
557 let mut users = self.router.users.lock().await;
558 users
559 .modify(&identity, |user_state| {
560 self.upgrade_user_crypto(user_state, &session_key);
561 })
562 .await;
563 }
564
565 session.note_active_flow(flow_index);
566
567 {
568 let mut sessions = self.router.sessions.write().await;
569 if sessions.contains_key(&identity) {
570 debug!("concurrent handshake for {}: last wins, displacing earlier session", identity.to_string());
571 }
572 sessions.insert(identity.clone(), Arc::clone(&session));
573 }
574
575 let client_handle = ClientHandle {
576 session,
577 identity: identity.clone(),
578 incoming_rx: Mutex::new(incoming_rx),
579 max_data_payload: self.max_data_payload,
580 settings: self.settings.clone(),
581 router: Arc::clone(&self.router),
582 };
583 self.accept_tx.push(client_handle);
584
585 info!("new client connected: {}", identity.to_string());
586 }
587
588 pub async fn accept(&self) -> Result<ClientHandle<T, AE>, ServerSocketError> {
590 self.accept_rx.lock().await.recv().await.ok_or(ServerSocketError::ListenerStopped)
591 }
592}
593
594#[async_trait]
596impl<T: IdentityType + Clone + Eq + Hash + Send + Sync + ToString + 'static, AE: AsyncExecutor + 'static> OutgoingRouter<T> for Router<T, AE> {
597 async fn route_packet(&self, packet: DynamicByteBuffer, identity: &T) -> bool {
598 let session = {
599 let sessions = self.sessions.read().await;
600 sessions.get(identity).cloned()
601 };
602 let Some(session) = session else {
603 return false;
604 };
605 let flow_idx = session.select_active_flow(self.flows.len());
606 if flow_idx < self.flows.len() {
607 self.flows[flow_idx].send_packet(packet, false, false).await.is_ok()
608 } else {
609 false
610 }
611 }
612
613 async fn remove_session(&self, identity: &T) {
614 if self.sessions.write().await.remove(identity).is_none() {
615 return;
616 }
617 self.users.lock().await.remove(identity).await;
618 for flow in &self.flows {
619 flow.remove_user(identity).await;
620 }
621 info!("client session removed: {}", identity.to_string());
622 }
623}
624
625pub struct ClientHandle<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> {
628 session: Arc<ServerSessionManager<T, AE>>,
629 identity: T,
630 incoming_rx: Mutex<NotifyQueueReceiver<DynamicByteBuffer>>,
631 max_data_payload: usize,
633 settings: Arc<Settings<AE>>,
634 router: Arc<Router<T, AE>>,
635}
636
637impl<T: IdentityType + Clone + Eq + Hash + Send + ToString, AE: AsyncExecutor> ClientHandle<T, AE> {
638 pub async fn send(&self, packet: DynamicByteBuffer) -> Result<(), ServerSocketError> {
640 let wire = self.session.prepare_outgoing(packet, false).await.map_err(ServerSocketError::SessionError)?;
641 if !self.router.route_packet(wire, &self.identity).await {
642 return Err(ServerSocketError::SessionError(SessionControllerError::FlowError(FlowControllerError::UserNotFound {
643 identity: self.identity.to_string(),
644 })));
645 }
646 Ok(())
647 }
648
649 pub async fn send_bytes(&self, data: &[u8]) -> Result<(), ServerSocketError> {
655 let jitter = self.settings.get(&keys::SEND_BYTES_JITTER);
656 let chunk = self.settings.get(&keys::SEND_BYTES_CHUNK) as usize;
657 let mut offset = 0;
658 while offset < data.len() {
659 let remaining = data.len() - offset;
660 let chunk_size = if remaining <= self.max_data_payload {
661 remaining
662 } else {
663 jittered_chunk_size(self.max_data_payload, chunk, jitter)
664 };
665 let buffer = self.settings.pool().allocate(Some(chunk_size));
666 buffer.slice_mut().copy_from_slice(&data[offset..offset + chunk_size]);
667 self.send(buffer).await?;
668 offset += chunk_size;
669 }
670 Ok(())
671 }
672
673 pub fn max_data_payload(&self) -> usize {
675 self.max_data_payload
676 }
677
678 pub async fn receive(&self) -> Result<DynamicByteBuffer, ServerSocketError> {
680 let buf = self.incoming_rx.lock().await.recv().await.ok_or(ServerSocketError::ChannelClosed)?;
681 Ok(buf)
682 }
683
684 pub async fn receive_bytes(&self) -> Result<Vec<u8>, ServerSocketError> {
686 let buffer = self.receive().await?;
687 Ok(buffer.slice().to_vec())
688 }
689}
690
691impl<T: IdentityType + Clone + Eq + Hash + Send + ToString + 'static, AE: AsyncExecutor + 'static> Drop for ClientHandle<T, AE> {
692 fn drop(&mut self) {
694 let executor = self.settings.executor().clone();
695 let pn = (unix_timestamp_ms() / 1000) as u64 * (1u64 << 32);
696 let buf = self.settings.pool().allocate(Some(Tailer::<T>::len()));
697 let termination = Tailer::termination(buf, &self.identity, ReturnCode::Success, pn).into_buffer();
698 executor.block_on(async {
699 self.router.route_packet(termination, &self.identity).await;
700 self.router.remove_session(&self.identity).await;
701 });
702 }
703}