1mod batchtimer;
2mod connect;
3mod crypt;
4mod defrag;
5mod fec;
6mod frame;
7mod listener;
8mod listener_table;
9mod recfilter;
10mod stats;
11
12use async_trait::async_trait;
13use batchtimer::BatchTimer;
14use bytes::Bytes;
15pub use listener::ObfsUdpListener;
16use parking_lot::Mutex;
17use priority_queue::PriorityQueue;
18use rand::rngs::OsRng;
19
20use serde::{Deserialize, Serialize};
21use smol::{
22 channel::{Receiver, Sender},
23 future::FutureExt,
24};
25
26use std::{
27 cmp::Reverse,
28 convert::Infallible,
29 net::SocketAddr,
30 sync::Arc,
31 time::{Duration, Instant},
32};
33
34pub struct ObfsUdpPipe {
36 send_upraw: Sender<Bytes>,
37 recv_downraw: Receiver<Bytes>,
38
39 _task: smol::Task<Infallible>,
40
41 remote_addr: SocketAddr,
42
43 peer_metadata: String,
44}
45
46const FEC_TIMEOUT_MS: u64 = 10;
47use self::{
48 defrag::Defragmenter,
49 fec::{FecDecoder, FecEncoder, ParitySpaceKey},
50 frame::{fragment, ObfsUdpFrame},
51 stats::StatsCalculator,
52};
53
54use sosistab2::Pipe;
55
56const BURST_SIZE: usize = 20;
57
58#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
60pub struct ObfsUdpPublic(pub(crate) x25519_dalek::PublicKey);
61
62impl ObfsUdpPublic {
63 pub fn as_bytes(&self) -> &[u8; 32] {
65 self.0.as_bytes()
66 }
67
68 pub fn from_bytes(b: [u8; 32]) -> Self {
70 Self(x25519_dalek::PublicKey::from(b))
71 }
72}
73
74#[derive(Clone, Serialize, Deserialize)]
76pub struct ObfsUdpSecret(pub(crate) x25519_dalek::StaticSecret);
77
78impl ObfsUdpSecret {
79 pub fn to_bytes(&self) -> [u8; 32] {
81 self.0.to_bytes()
82 }
83
84 pub fn from_bytes(b: [u8; 32]) -> Self {
86 Self(x25519_dalek::StaticSecret::from(b))
87 }
88
89 pub fn generate() -> Self {
91 Self(x25519_dalek::StaticSecret::new(OsRng {}))
92 }
93
94 pub fn to_public(&self) -> ObfsUdpPublic {
96 ObfsUdpPublic((&self.0).into())
97 }
98}
99
100impl ObfsUdpPipe {
101 pub fn with_custom_transport(
105 recv_downcoded: Receiver<ObfsUdpFrame>,
106 send_upcoded: Sender<ObfsUdpFrame>,
107 remote_addr: SocketAddr,
108 peer_metadata: &str,
109 ) -> Self {
110 let (send_upraw, recv_upraw) = smol::channel::bounded(1000);
111 let (send_downraw, recv_downraw) = smol::channel::bounded(1000);
112 let stats_calculator = Arc::new(Mutex::new(StatsCalculator::new()));
113
114 let pipe_loop_future = pipe_loop(
115 recv_upraw,
116 send_upcoded,
117 recv_downcoded,
118 send_downraw,
119 stats_calculator,
120 );
121
122 Self {
123 send_upraw,
124 recv_downraw,
125
126 _task: smolscale::spawn(pipe_loop_future),
127 remote_addr,
128 peer_metadata: peer_metadata.into(),
129 }
130 }
131
132 pub async fn connect(
134 server_addr: SocketAddr,
135 server_pk: ObfsUdpPublic,
136 metadata: &str,
137 ) -> anyhow::Result<ObfsUdpPipe> {
138 connect::client_connect(server_addr, server_pk, metadata).await
139 }
140}
141
142#[async_trait]
143impl Pipe for ObfsUdpPipe {
144 fn send(&self, to_send: Bytes) {
145 let _ = self.send_upraw.try_send(to_send);
146 }
147
148 async fn recv(&self) -> std::io::Result<Bytes> {
149 self.recv_downraw.recv().await.map_err(|_| {
150 std::io::Error::new(
151 std::io::ErrorKind::BrokenPipe,
152 "obfsudp task somehow failed",
153 )
154 })
155 }
156
157 fn protocol(&self) -> &str {
158 "obfsudp-1"
159 }
160
161 fn peer_addr(&self) -> String {
162 self.remote_addr.to_string()
163 }
164
165 fn peer_metadata(&self) -> &str {
166 &self.peer_metadata
167 }
168}
169
170async fn pipe_loop(
172 recv_upraw: Receiver<Bytes>,
173 send_upcoded: Sender<ObfsUdpFrame>,
174 recv_downcoded: Receiver<ObfsUdpFrame>,
175 send_downraw: Sender<Bytes>,
176 stats_calculator: Arc<Mutex<StatsCalculator>>,
177) -> Infallible {
178 let mut next_seqno = 0;
179
180 let mut fec_encoder = FecEncoder::new(Duration::from_millis(FEC_TIMEOUT_MS), BURST_SIZE);
181 let mut fec_decoder = FecDecoder::new(100); let mut defrag = Defragmenter::default();
183 let mut out_frag_buff = Vec::new();
184 let mut ack_timer = BatchTimer::new(Duration::from_millis(200), 100);
185 let mut probably_lost_incoming = PriorityQueue::new();
186 let mut unacked_incoming = Vec::new();
187 let mut last_incoming_seqno = 0;
188
189 let mut loss = 0.0;
190 let mut loss_time: Option<Instant> = None;
191
192 loop {
193 let loss = if loss_time.map(|t| t.elapsed().as_secs() > 5).unwrap_or(true) {
194 loss = stats_calculator.lock().get_stats().loss;
195 loss_time = Some(Instant::now());
196 loss
197 } else {
198 loss
199 };
200
201 let event = Event::fec_timeout(&mut fec_encoder, loss)
202 .or(Event::ack_timeout(&mut ack_timer))
203 .or(Event::new_in_packet(&recv_downcoded))
204 .or(Event::new_out_payload(&recv_upraw))
205 .await;
206
207 if let Ok(event) = event {
208 match event {
209 Event::NewOutPayload(bts) => {
210 out_frag_buff.clear();
211 fragment(bts, &mut out_frag_buff);
212 for bts in out_frag_buff.drain(..) {
213 let seqno = next_seqno;
214
215 next_seqno += 1;
216 fec_encoder.add_unfecked(seqno, bts.clone());
217
218 stats_calculator.lock().add_sent(seqno);
219
220 let msg = ObfsUdpFrame::Data { seqno, body: bts };
221 let _ = send_upcoded.try_send(msg);
222 }
223 }
224 Event::NewInPacket(pipe_frame) => match pipe_frame {
225 ObfsUdpFrame::Data { seqno, body } => {
226 stats_calculator.lock().set_dead(false);
227 fec_decoder.insert_data(seqno, body.clone());
228 if let Some(whole) = defrag.insert(seqno, body) {
229 let _ = send_downraw.try_send(whole); }
231 if seqno > last_incoming_seqno + 1 {
232 log::trace!("gap in sequence numbers: {}", seqno);
233 for gap_seqno in (last_incoming_seqno + 1)..seqno {
234 probably_lost_incoming.push(
235 gap_seqno,
236 Reverse(Instant::now() + Duration::from_millis(500)),
237 );
238 }
239 }
240 last_incoming_seqno = seqno;
241 ack_timer.increment();
242 unacked_incoming.push((seqno, Instant::now()));
243 probably_lost_incoming.remove(&seqno);
244 }
245 ObfsUdpFrame::Parity {
246 data_frame_first,
247 data_count,
248 parity_count,
249 parity_index,
250 pad_size,
251 body,
252 } => {
253 log::debug!("got parity; data_frame_first = {data_frame_first}, data_count = {data_count}, parity_count = {parity_count}, parity_index = {parity_index}");
254 let parity_info = ParitySpaceKey {
255 data_frame_first,
256 data_count,
257 parity_count,
258 pad_size,
259 };
260 let reconstructed =
261 fec_decoder.insert_parity(parity_info, parity_index, body);
262 if !reconstructed.is_empty() {
263 for (seqno, p) in reconstructed {
264 log::debug!("reconstructed outer {seqno}");
265 if let Some(p) = defrag.insert(seqno, p) {
266 let _ = send_downraw.try_send(p);
267 }
268 }
269 }
270 }
271 ObfsUdpFrame::Acks { acks, naks } => {
272 let mut stats = stats_calculator.lock();
273 for (seqno, offset) in acks {
274 stats.add_ack(seqno, Duration::from_millis(offset as _));
275 }
276 for seqno in naks {
277 stats.add_nak(seqno);
278 }
279 }
280 },
281
282 Event::AckTimeout => {
283 ack_timer.reset();
284 log::trace!(
285 "ack timer fired, must send back {} acks",
286 unacked_incoming.len()
287 );
288 let naks = {
289 let mut vv = Vec::new();
290 let now = Instant::now();
291 while let Some((seqno, lost_date)) = probably_lost_incoming.pop() {
292 if lost_date.0 < now {
293 vv.push(seqno);
294 } else {
295 probably_lost_incoming.push(seqno, lost_date);
296 break;
297 }
298 }
299 vv
300 };
301 let _ = send_upcoded
302 .send(ObfsUdpFrame::Acks {
303 acks: unacked_incoming
304 .drain(..)
305 .map(|(k, v)| (k, v.elapsed().as_millis() as _))
306 .collect(),
307 naks,
308 })
309 .await;
310 }
311
312 Event::FecTimeout(parity_frames) => {
313 if !parity_frames.is_empty() {
314 log::debug!("FecTimeout; sending {} parities", parity_frames.len());
315 }
316 for parity_frame in parity_frames {
317 let _ = send_upcoded.try_send(parity_frame);
318 }
319 }
320 }
321 } else {
322 return smol::future::pending().await;
324 }
325 }
326}
327
328#[derive(Debug)]
329enum Event {
330 NewOutPayload(Bytes),
331 NewInPacket(ObfsUdpFrame), FecTimeout(Vec<ObfsUdpFrame>),
333 AckTimeout,
334}
335
336impl Event {
337 pub async fn new_out_payload(recv: &Receiver<Bytes>) -> anyhow::Result<Self> {
339 Ok(Event::NewOutPayload(recv.recv().await?))
340 }
341
342 pub async fn new_in_packet(recv: &Receiver<ObfsUdpFrame>) -> anyhow::Result<Self> {
343 let in_pkt = recv.recv().await?;
344 Ok(Event::NewInPacket(in_pkt))
345 }
346 pub async fn fec_timeout(fec_machine: &mut FecEncoder, loss: f64) -> anyhow::Result<Self> {
347 let parity = fec_machine.wait_parity(loss).await;
348
349 Ok(Event::FecTimeout(parity))
350 }
351
352 pub async fn ack_timeout(ack_timer: &mut BatchTimer) -> anyhow::Result<Self> {
353 ack_timer.wait().await;
354 Ok(Event::AckTimeout)
355 }
356}