1use std::future::Future;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::atomic::AtomicU32;
6use std::sync::{Arc, Weak};
7
8use crate::bytes::{ByteBuffer, ByteBufferMut, DynamicByteBuffer};
9use crate::cache::CachedValue;
10use crate::capture::{CaptureContext, record_flow_config};
11use crate::crypto::ClientCryptoTool;
12use crate::defaults::NoopProbeHandler;
13use crate::flow::common::{FlowManager, FlowReceiveInternal, FlowSendInternal, ProcessIncomingResult};
14use crate::flow::config::FlowConfig;
15use crate::flow::decoy::{DecoyFactory, DecoyFlowSender, DecoyProvider};
16use crate::flow::error::FlowControllerError;
17use crate::flow::probe::{ActiveProbeHandler, ProbeFactory, ProbeFlowSender};
18use crate::settings::Settings;
19use crate::tailer::{IdentityType, Tailer};
20use crate::utils::socket::{Socket, SocketError};
21use crate::utils::sync::{AsyncExecutor, Mutex};
22
23pub struct ClientFlowManager<T: IdentityType + Clone, AE: AsyncExecutor> {
25 decoy_provider: Box<dyn DecoyProvider>,
26 send_internal: Mutex<FlowSendInternal<T>>,
27 receive_internal: Mutex<FlowReceiveInternal<T>>,
28 sock: Socket,
29 mtu: usize,
30 settings: Arc<Settings<AE>>,
31 probe_handler: Mutex<Box<dyn ActiveProbeHandler<AE>>>,
32}
33
34impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ClientFlowManager<T, AE> {
35 pub(crate) async fn new(config: FlowConfig, cipher: CachedValue<ClientCryptoTool<T>>, settings: Arc<Settings<AE>>, sock: Socket, probe_factory: Option<&ProbeFactory<AE>>, decoy_factory: &DecoyFactory<T, AE>, counter: Arc<AtomicU32>, addr: SocketAddr) -> Result<Arc<Self>, FlowControllerError> {
37 let identity = cipher.derive(ClientCryptoTool::<T>::identity).map_err(FlowControllerError::MissingCache)?;
38 let send_provider = cipher.create_sibling().map_err(FlowControllerError::MissingCache)?;
39 let receive_provider = cipher.create_sibling().map_err(FlowControllerError::MissingCache)?;
40 let handler_factory = probe_factory.cloned();
41 let settings_for_start = Arc::clone(&settings);
42
43 let manager_ref = Arc::new_cyclic(|m: &Weak<ClientFlowManager<T, AE>>| {
44 let mgr: Weak<dyn DecoyFlowSender> = m.clone();
45 let decoy = decoy_factory(mgr, settings.clone(), identity, counter);
46 let probe_handler: Box<dyn ActiveProbeHandler<AE>> = match &handler_factory {
47 Some(f) => f(),
48 None => Box::new(NoopProbeHandler),
49 };
50 let mtu = settings.mtu();
51 record_flow_config(addr, "c2s", || (config.fake_body_mode.description(), config.fake_header_mode.len(), decoy.name()));
52 ClientFlowManager {
53 decoy_provider: decoy,
54 send_internal: Mutex::new(FlowSendInternal {
55 provider: send_provider,
56 config,
57 capture: CaptureContext::new(addr),
58 }),
59 receive_internal: Mutex::new(FlowReceiveInternal {
60 provider: receive_provider,
61 }),
62 sock,
63 mtu,
64 settings,
65 probe_handler: Mutex::new(probe_handler),
66 }
67 });
68 manager_ref.decoy_provider.start().await;
69 let weak: Weak<dyn ProbeFlowSender> = Arc::downgrade(&manager_ref) as Weak<dyn ProbeFlowSender>;
70 manager_ref.probe_handler.lock().await.start(weak, settings_for_start).await;
71 Ok(manager_ref)
72 }
73}
74
75impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> ProbeFlowSender for ClientFlowManager<T, AE> {
76 fn send_raw<'a>(&'a self, packet: DynamicByteBuffer, _target: SocketAddr) -> Pin<Box<dyn Future<Output = Result<(), SocketError>> + Send + 'a>> {
77 Box::pin(async move { self.sock.send(packet).await.map(|_| ()) })
78 }
79}
80
81impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> DecoyFlowSender for ClientFlowManager<T, AE> {
82 fn send_decoy_packet<'a>(&'a self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Pin<Box<dyn Future<Output = Result<(), FlowControllerError>> + Send + 'a>> {
83 Box::pin(<Self as FlowManager>::send_packet(self, packet, fallthrough, is_maintenance))
84 }
85}
86
87impl<T: IdentityType + Clone + 'static, AE: AsyncExecutor + 'static> FlowManager for ClientFlowManager<T, AE> {
88 async fn send_packet(&self, packet: DynamicByteBuffer, fallthrough: bool, is_maintenance: bool) -> Result<(), FlowControllerError> {
89 let tailer_len = Tailer::<T>::len();
90 let (body, tailer_buf) = packet.split_buf_end(tailer_len);
91
92 let Some(notified_body) = self.decoy_provider.feed_output(body, tailer_buf.clone()).await else {
93 return Ok(());
94 };
95
96 let mut lock = self.send_internal.lock().await;
97 let full_packet = lock.prepare_outgoing(notified_body.expand_end(tailer_buf.len()), self.mtu, self.settings.pool(), fallthrough, is_maintenance)?;
98 if full_packet.len() > 0 {
99 self.sock.send(full_packet).await.map_err(FlowControllerError::SocketError)?;
100 }
101 Ok(())
102 }
103
104 async fn receive_packet(&self, packet: DynamicByteBuffer) -> Result<DynamicByteBuffer, FlowControllerError> {
105 loop {
106 let wire_packet = self.sock.recv(packet.clone()).await.map_err(FlowControllerError::SocketError)?;
107
108 let (body, tailer_buf) = {
109 let mut lock = self.receive_internal.lock().await;
110 match lock.deobfuscate_incoming(wire_packet.clone(), self.settings.pool())? {
111 None => {
112 self.probe_handler.lock().await.process(wire_packet, None).await;
113 continue;
114 }
115 Some(pair) => pair,
116 }
117 };
118
119 let Some(notified_body) = self.decoy_provider.feed_input(body.clone(), tailer_buf.clone()).await else {
120 continue;
121 };
122
123 let incoming_packet = {
124 let lock = self.receive_internal.lock().await;
125 lock.process_with_tailer(notified_body, tailer_buf)
126 };
127 match incoming_packet {
128 ProcessIncomingResult::Decoy => {}
129 ProcessIncomingResult::Valid(result) => return Ok(result),
130 }
131 }
132 }
133}