sosistab2_obfsudp/
lib.rs

1mod batchtimer;
2mod connect;
3mod crypt;
4mod defrag;
5mod fec;
6mod frame;
7mod listener;
8mod listener_table;
9mod recfilter;
10mod stats;
11
12use async_trait::async_trait;
13use batchtimer::BatchTimer;
14use bytes::Bytes;
15pub use listener::ObfsUdpListener;
16use parking_lot::Mutex;
17use priority_queue::PriorityQueue;
18use rand::rngs::OsRng;
19
20use serde::{Deserialize, Serialize};
21use smol::{
22    channel::{Receiver, Sender},
23    future::FutureExt,
24};
25
26use std::{
27    cmp::Reverse,
28    convert::Infallible,
29    net::SocketAddr,
30    sync::Arc,
31    time::{Duration, Instant},
32};
33
34/// Represents an unreliable datagram connection. Generally, this is not to be used directly, but fed into [crate::Multiplex] instances to be used as the underlying transport.
35pub struct ObfsUdpPipe {
36    send_upraw: Sender<Bytes>,
37    recv_downraw: Receiver<Bytes>,
38
39    _task: smol::Task<Infallible>,
40
41    remote_addr: SocketAddr,
42
43    peer_metadata: String,
44}
45
46const FEC_TIMEOUT_MS: u64 = 10;
47use self::{
48    defrag::Defragmenter,
49    fec::{FecDecoder, FecEncoder, ParitySpaceKey},
50    frame::{fragment, ObfsUdpFrame},
51    stats::StatsCalculator,
52};
53
54use sosistab2::Pipe;
55
56const BURST_SIZE: usize = 20;
57
58/// A server public key for the obfuscated UDP pipe.
59#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
60pub struct ObfsUdpPublic(pub(crate) x25519_dalek::PublicKey);
61
62impl ObfsUdpPublic {
63    /// Returns the bytes representation.
64    pub fn as_bytes(&self) -> &[u8; 32] {
65        self.0.as_bytes()
66    }
67
68    /// Convert from bytes.
69    pub fn from_bytes(b: [u8; 32]) -> Self {
70        Self(x25519_dalek::PublicKey::from(b))
71    }
72}
73
74/// A server secret key for the obfuscated UDP pipe.
75#[derive(Clone, Serialize, Deserialize)]
76pub struct ObfsUdpSecret(pub(crate) x25519_dalek::StaticSecret);
77
78impl ObfsUdpSecret {
79    /// Returns the bytes representation.
80    pub fn to_bytes(&self) -> [u8; 32] {
81        self.0.to_bytes()
82    }
83
84    /// Convert from bytes.
85    pub fn from_bytes(b: [u8; 32]) -> Self {
86        Self(x25519_dalek::StaticSecret::from(b))
87    }
88
89    /// Generate.
90    pub fn generate() -> Self {
91        Self(x25519_dalek::StaticSecret::new(OsRng {}))
92    }
93
94    /// Convert to a public key.
95    pub fn to_public(&self) -> ObfsUdpPublic {
96        ObfsUdpPublic((&self.0).into())
97    }
98}
99
100impl ObfsUdpPipe {
101    /// Creates a new Pipe that receives messages from `recv_downcoded` and send messages to `send_upcoded`. This should only be used if you are creating your own underlying, UDP-like transport; otherwise use the functions provided in this crate to create Pipes backed by an obfuscated, packet loss-resistant UDP transport.
102    ///
103    /// The caller must arrange to drain the other end of `send_upcoded` promptly; otherwise the Pipe itself will get stuck.
104    pub fn with_custom_transport(
105        recv_downcoded: Receiver<ObfsUdpFrame>,
106        send_upcoded: Sender<ObfsUdpFrame>,
107        remote_addr: SocketAddr,
108        peer_metadata: &str,
109    ) -> Self {
110        let (send_upraw, recv_upraw) = smol::channel::bounded(1000);
111        let (send_downraw, recv_downraw) = smol::channel::bounded(1000);
112        let stats_calculator = Arc::new(Mutex::new(StatsCalculator::new()));
113
114        let pipe_loop_future = pipe_loop(
115            recv_upraw,
116            send_upcoded,
117            recv_downcoded,
118            send_downraw,
119            stats_calculator,
120        );
121
122        Self {
123            send_upraw,
124            recv_downraw,
125
126            _task: smolscale::spawn(pipe_loop_future),
127            remote_addr,
128            peer_metadata: peer_metadata.into(),
129        }
130    }
131
132    /// Establishes a pipe to the server_addr, using the obfuscated UDP transport.
133    pub async fn connect(
134        server_addr: SocketAddr,
135        server_pk: ObfsUdpPublic,
136        metadata: &str,
137    ) -> anyhow::Result<ObfsUdpPipe> {
138        connect::client_connect(server_addr, server_pk, metadata).await
139    }
140}
141
142#[async_trait]
143impl Pipe for ObfsUdpPipe {
144    fn send(&self, to_send: Bytes) {
145        let _ = self.send_upraw.try_send(to_send);
146    }
147
148    async fn recv(&self) -> std::io::Result<Bytes> {
149        self.recv_downraw.recv().await.map_err(|_| {
150            std::io::Error::new(
151                std::io::ErrorKind::BrokenPipe,
152                "obfsudp task somehow failed",
153            )
154        })
155    }
156
157    fn protocol(&self) -> &str {
158        "obfsudp-1"
159    }
160
161    fn peer_addr(&self) -> String {
162        self.remote_addr.to_string()
163    }
164
165    fn peer_metadata(&self) -> &str {
166        &self.peer_metadata
167    }
168}
169
170/// Main processing loop for the Pipe
171async fn pipe_loop(
172    recv_upraw: Receiver<Bytes>,
173    send_upcoded: Sender<ObfsUdpFrame>,
174    recv_downcoded: Receiver<ObfsUdpFrame>,
175    send_downraw: Sender<Bytes>,
176    stats_calculator: Arc<Mutex<StatsCalculator>>,
177) -> Infallible {
178    let mut next_seqno = 0;
179
180    let mut fec_encoder = FecEncoder::new(Duration::from_millis(FEC_TIMEOUT_MS), BURST_SIZE);
181    let mut fec_decoder = FecDecoder::new(100); // arbitrary size
182    let mut defrag = Defragmenter::default();
183    let mut out_frag_buff = Vec::new();
184    let mut ack_timer = BatchTimer::new(Duration::from_millis(200), 100);
185    let mut probably_lost_incoming = PriorityQueue::new();
186    let mut unacked_incoming = Vec::new();
187    let mut last_incoming_seqno = 0;
188
189    let mut loss = 0.0;
190    let mut loss_time: Option<Instant> = None;
191
192    loop {
193        let loss = if loss_time.map(|t| t.elapsed().as_secs() > 5).unwrap_or(true) {
194            loss = stats_calculator.lock().get_stats().loss;
195            loss_time = Some(Instant::now());
196            loss
197        } else {
198            loss
199        };
200
201        let event = Event::fec_timeout(&mut fec_encoder, loss)
202            .or(Event::ack_timeout(&mut ack_timer))
203            .or(Event::new_in_packet(&recv_downcoded))
204            .or(Event::new_out_payload(&recv_upraw))
205            .await;
206
207        if let Ok(event) = event {
208            match event {
209                Event::NewOutPayload(bts) => {
210                    out_frag_buff.clear();
211                    fragment(bts, &mut out_frag_buff);
212                    for bts in out_frag_buff.drain(..) {
213                        let seqno = next_seqno;
214
215                        next_seqno += 1;
216                        fec_encoder.add_unfecked(seqno, bts.clone());
217
218                        stats_calculator.lock().add_sent(seqno);
219
220                        let msg = ObfsUdpFrame::Data { seqno, body: bts };
221                        let _ = send_upcoded.try_send(msg);
222                    }
223                }
224                Event::NewInPacket(pipe_frame) => match pipe_frame {
225                    ObfsUdpFrame::Data { seqno, body } => {
226                        stats_calculator.lock().set_dead(false);
227                        fec_decoder.insert_data(seqno, body.clone());
228                        if let Some(whole) = defrag.insert(seqno, body) {
229                            let _ = send_downraw.try_send(whole); // TODO why??
230                        }
231                        if seqno > last_incoming_seqno + 1 {
232                            log::trace!("gap in sequence numbers: {}", seqno);
233                            for gap_seqno in (last_incoming_seqno + 1)..seqno {
234                                probably_lost_incoming.push(
235                                    gap_seqno,
236                                    Reverse(Instant::now() + Duration::from_millis(500)),
237                                );
238                            }
239                        }
240                        last_incoming_seqno = seqno;
241                        ack_timer.increment();
242                        unacked_incoming.push((seqno, Instant::now()));
243                        probably_lost_incoming.remove(&seqno);
244                    }
245                    ObfsUdpFrame::Parity {
246                        data_frame_first,
247                        data_count,
248                        parity_count,
249                        parity_index,
250                        pad_size,
251                        body,
252                    } => {
253                        log::debug!("got parity; data_frame_first = {data_frame_first}, data_count = {data_count}, parity_count = {parity_count}, parity_index = {parity_index}");
254                        let parity_info = ParitySpaceKey {
255                            data_frame_first,
256                            data_count,
257                            parity_count,
258                            pad_size,
259                        };
260                        let reconstructed =
261                            fec_decoder.insert_parity(parity_info, parity_index, body);
262                        if !reconstructed.is_empty() {
263                            for (seqno, p) in reconstructed {
264                                log::debug!("reconstructed outer {seqno}");
265                                if let Some(p) = defrag.insert(seqno, p) {
266                                    let _ = send_downraw.try_send(p);
267                                }
268                            }
269                        }
270                    }
271                    ObfsUdpFrame::Acks { acks, naks } => {
272                        let mut stats = stats_calculator.lock();
273                        for (seqno, offset) in acks {
274                            stats.add_ack(seqno, Duration::from_millis(offset as _));
275                        }
276                        for seqno in naks {
277                            stats.add_nak(seqno);
278                        }
279                    }
280                },
281
282                Event::AckTimeout => {
283                    ack_timer.reset();
284                    log::trace!(
285                        "ack timer fired, must send back {} acks",
286                        unacked_incoming.len()
287                    );
288                    let naks = {
289                        let mut vv = Vec::new();
290                        let now = Instant::now();
291                        while let Some((seqno, lost_date)) = probably_lost_incoming.pop() {
292                            if lost_date.0 < now {
293                                vv.push(seqno);
294                            } else {
295                                probably_lost_incoming.push(seqno, lost_date);
296                                break;
297                            }
298                        }
299                        vv
300                    };
301                    let _ = send_upcoded
302                        .send(ObfsUdpFrame::Acks {
303                            acks: unacked_incoming
304                                .drain(..)
305                                .map(|(k, v)| (k, v.elapsed().as_millis() as _))
306                                .collect(),
307                            naks,
308                        })
309                        .await;
310                }
311
312                Event::FecTimeout(parity_frames) => {
313                    if !parity_frames.is_empty() {
314                        log::debug!("FecTimeout; sending {} parities", parity_frames.len());
315                    }
316                    for parity_frame in parity_frames {
317                        let _ = send_upcoded.try_send(parity_frame);
318                    }
319                }
320            }
321        } else {
322            // stop the pipe
323            return smol::future::pending().await;
324        }
325    }
326}
327
328#[derive(Debug)]
329enum Event {
330    NewOutPayload(Bytes),
331    NewInPacket(ObfsUdpFrame), // either data or parity or ack request packet or acks
332    FecTimeout(Vec<ObfsUdpFrame>),
333    AckTimeout,
334}
335
336impl Event {
337    /// Waits for a new payload to send out
338    pub async fn new_out_payload(recv: &Receiver<Bytes>) -> anyhow::Result<Self> {
339        Ok(Event::NewOutPayload(recv.recv().await?))
340    }
341
342    pub async fn new_in_packet(recv: &Receiver<ObfsUdpFrame>) -> anyhow::Result<Self> {
343        let in_pkt = recv.recv().await?;
344        Ok(Event::NewInPacket(in_pkt))
345    }
346    pub async fn fec_timeout(fec_machine: &mut FecEncoder, loss: f64) -> anyhow::Result<Self> {
347        let parity = fec_machine.wait_parity(loss).await;
348
349        Ok(Event::FecTimeout(parity))
350    }
351
352    pub async fn ack_timeout(ack_timer: &mut BatchTimer) -> anyhow::Result<Self> {
353        ack_timer.wait().await;
354        Ok(Event::AckTimeout)
355    }
356}