utp-socket 0.1.0

Socket used for uTP communication with io-uring support
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
#![allow(dead_code)]

use std::{
    cell::{Ref, RefCell, RefMut},
    net::SocketAddr,
    rc::{Rc, Weak},
    time::Duration,
};

use bytes::Bytes;
use tokio::sync::{oneshot::Receiver, Notify};
use tokio_uring::net::UdpSocket;

use crate::{
    reorder_buffer::ReorderBuffer,
    utp_packet::{get_microseconds, Packet, PacketHeader, PacketType, HEADER_SIZE},
};

#[derive(Debug)]
pub(crate) enum ConnectionState {
    Idle,
    SynReceived,
    SynSent {
        connect_notifier: tokio::sync::oneshot::Sender<()>,
    },
    Connected,
    FinSent,
}

impl PartialEq for ConnectionState {
    fn eq(&self, other: &Self) -> bool {
        core::mem::discriminant(self) == core::mem::discriminant(other)
    }
}

impl Eq for ConnectionState {}

// Could be moved to separate module
#[derive(Debug)]
pub(crate) struct StreamState {
    // Current socket state
    pub(crate) connection_state: ConnectionState,
    // Sequence number for next packet to be sent
    pub(crate) seq_nr: u16,
    // All sequence numbers up until and including this which have been
    // properly recived
    pub(crate) ack_nr: u16,
    // Connection id for packets I receive
    pub(crate) conn_id_recv: u16,
    // Connection id for packets I send
    pub(crate) conn_id_send: u16,
    // Current amount of bytes sent but not acked
    pub(crate) cur_window: u32,
    // Last received window this socket advertised in bytes
    pub(crate) max_window: u32,
    pub(crate) their_advertised_window: u32,
    // Last delay measurement from other endpoint
    // whenever a packet is received this state is updated
    // by subtracting timestamp_microseconds from the host current time
    pub(crate) reply_micro: u32,
    // Last packet in sequence, taken from the FIN packet
    pub(crate) eof_pkt: Option<u16>,
    // incoming buffer, used to reorder packets
    pub(crate) incoming_buffer: ReorderBuffer,
    // outgoing buffer (TODO does this need to be an ReorderBuffer?)
    pub(crate) outgoing_buffer: ReorderBuffer,
    // Receive buffer, used to store packet data before read requests
    // this is what's used to determine window size.
    // Have the same size like the initial our_advertised_window
    // TODO: There could be a provided read buffer from the user
    // before any data is read and that should be used instead in that case
    pub(crate) receive_buf: Box<[u8]>,
    receive_buf_cursor: usize,

    shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
}

impl StreamState {
    fn syn_header(&mut self) -> (PacketHeader, Receiver<()>) {
        let (tx, rc) = tokio::sync::oneshot::channel();
        // move to state method
        self.connection_state = ConnectionState::SynSent {
            connect_notifier: tx,
        };

        let header = PacketHeader {
            seq_nr: self.seq_nr,
            ack_nr: 0,
            conn_id: self.conn_id_recv,
            packet_type: PacketType::Syn,
            timestamp_microseconds: get_microseconds() as u32,
            timestamp_difference_microseconds: self.reply_micro,
            // Mimics libtorrent behavior
            wnd_size: 0,
            extension: 0,
        };
        (header, rc)
    }

    fn ack(&self) -> PacketHeader {
        // Move this closer to send time?
        let timestamp_microseconds = get_microseconds();
        PacketHeader {
            seq_nr: self.seq_nr,
            ack_nr: self.ack_nr,
            conn_id: self.conn_id_send,
            packet_type: PacketType::State,
            timestamp_microseconds: timestamp_microseconds as u32,
            timestamp_difference_microseconds: self.reply_micro,
            wnd_size: self.our_advertised_window(),
            extension: 0,
        }
    }

    fn data(&mut self) -> PacketHeader {
        // Move this closer to send time?
        let timestamp_microseconds = get_microseconds();
        self.seq_nr += 1;
        PacketHeader {
            seq_nr: self.seq_nr,
            ack_nr: self.ack_nr,
            conn_id: self.conn_id_send,
            packet_type: PacketType::Data,
            timestamp_microseconds: timestamp_microseconds as u32,
            timestamp_difference_microseconds: self.reply_micro,
            wnd_size: self.our_advertised_window(),
            extension: 0,
        }
    }

    fn try_consume(&mut self, data: &[u8]) -> bool {
        // Does the packet fit witin the receive buffer? otherwise drop it
        if data.len() <= (self.receive_buf.len() - self.receive_buf_cursor) {
            let cursor = self.receive_buf_cursor;
            // TODO perhaps more of a io_uring kind of approach would make sense
            // so copies can be avoided either here or in the read method
            self.receive_buf[cursor..cursor + data.len()].copy_from_slice(data);
            self.receive_buf_cursor += data.len();
            true
        } else {
            log::warn!("Receive buf full, packet dropped");
            false
        }
    }

    #[inline(always)]
    pub(crate) fn our_advertised_window(&self) -> u32 {
        let wnd_size = (self.receive_buf.len() - self.receive_buf_cursor) as i32
            - self.incoming_buffer.size() as i32;
        std::cmp::max(wnd_size, 0) as u32
    }

    #[inline(always)]
    fn stream_window_size(&self) -> u32 {
        std::cmp::min(self.max_window, self.their_advertised_window)
    }
}

// TODO should this really be publicly derived?
#[derive(Clone)]
pub struct UtpStream {
    inner: Rc<RefCell<StreamState>>,
    // The adder the stream is connected to
    addr: SocketAddr,
    weak_socket: Weak<UdpSocket>,
    // Used to notify pending readers that
    // there is data available to read
    // (This could be adapted to work single threaded but needs custom impl)
    data_available: Rc<Notify>,
}

// Used in UtpSocket so that dropped streams
// can be detected and everything can properly
// be destroyed.
pub(crate) struct WeakUtpStream {
    inner: Weak<RefCell<StreamState>>,
    // The adder the stream is connected to
    addr: SocketAddr,
    weak_socket: Weak<UdpSocket>,
    // Used to notify pending readers that
    // there is data available to read
    // (This could be adapted to work single threaded but needs custom impl)
    data_available: Rc<Notify>,
}

impl WeakUtpStream {
    pub(crate) fn try_upgrade(&self) -> Option<UtpStream> {
        self.inner.upgrade().map(|inner| UtpStream {
            inner,
            addr: self.addr,
            weak_socket: self.weak_socket.clone(),
            data_available: self.data_available.clone(),
        })
    }
}

impl From<UtpStream> for WeakUtpStream {
    fn from(stream: UtpStream) -> Self {
        WeakUtpStream {
            inner: Rc::downgrade(&stream.inner),
            addr: stream.addr,
            // Can't move because of drop impl
            weak_socket: stream.weak_socket.clone(),
            data_available: stream.data_available.clone(),
        }
    }
}

impl std::fmt::Debug for UtpStream {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("UtpStream")
            .field("state", &self.inner)
            .field("addr", &self.addr)
            .field("data_available", &self.data_available)
            .finish()
    }
}

const MTU: u32 = 1250;

impl UtpStream {
    pub(crate) fn new(conn_id: u16, addr: SocketAddr, weak_socket: Weak<UdpSocket>) -> Self {
        let (shutdown_signal, mut shutdown_receiver) = tokio::sync::oneshot::channel();
        let stream = UtpStream {
            inner: Rc::new(RefCell::new(StreamState {
                connection_state: ConnectionState::Idle,
                // start from 1 for compability with older clients but not as secure
                seq_nr: rand::random::<u16>(),
                conn_id_recv: conn_id,
                cur_window: 0,
                max_window: MTU,
                ack_nr: 0,
                conn_id_send: conn_id + 1,
                reply_micro: 0,
                eof_pkt: None,
                // mtu
                their_advertised_window: MTU,
                incoming_buffer: ReorderBuffer::new(256),
                outgoing_buffer: ReorderBuffer::new(256),
                receive_buf: vec![0; 1024 * 1024].into_boxed_slice(),
                receive_buf_cursor: 0,
                shutdown_signal: Some(shutdown_signal),
            })),
            weak_socket,
            data_available: Rc::new(Notify::new()),
            addr,
        };

        let stream_clone = stream.clone();
        // Send loop
        tokio_uring::spawn(async move {
            let mut tick_interval = tokio::time::interval(Duration::from_millis(250));
            tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
            loop {
                tokio::select! {
                    _ = tick_interval.tick() => {
                        if let Err(err) = stream_clone.flush_outbuf().await {
                            log::error!("Error: {err}, shutting down stream send loop");
                            break;
                        }
                    },
                    _ = &mut shutdown_receiver =>  {
                        log::info!("Shutting down stream send loop");
                        break;
                    },
                }
            }
        });
        stream
    }

    pub(crate) fn new_incoming(
        seq_nr: u16,
        conn_id: u16,
        addr: SocketAddr,
        weak_socket: Weak<UdpSocket>,
    ) -> Self {
        let (shutdown_signal, mut shutdown_receiver) = tokio::sync::oneshot::channel();
        let stream = UtpStream {
            inner: Rc::new(RefCell::new(StreamState {
                connection_state: ConnectionState::SynReceived,
                // start from 1 for compability with older clients but not as secure
                seq_nr: rand::random::<u16>(),
                conn_id_recv: conn_id + 1,
                cur_window: 0,
                max_window: MTU,
                // We have yet to ack the SYN packet
                ack_nr: seq_nr - 1,
                conn_id_send: conn_id,
                reply_micro: 0,
                eof_pkt: None,
                // mtu
                their_advertised_window: MTU,
                incoming_buffer: ReorderBuffer::new(256),
                outgoing_buffer: ReorderBuffer::new(256),
                receive_buf: vec![0; 1024 * 1024].into_boxed_slice(),
                receive_buf_cursor: 0,
                shutdown_signal: Some(shutdown_signal),
            })),
            weak_socket,
            data_available: Rc::new(Notify::new()),
            addr,
        };

        let stream_clone = stream.clone();
        // Send loop
        tokio_uring::spawn(async move {
            let mut tick_interval = tokio::time::interval(Duration::from_millis(250));
            tick_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
            loop {
                tokio::select! {
                    _ = tick_interval.tick() => {
                        if let Err(err) = stream_clone.flush_outbuf().await {
                            log::error!("Error: {err}, shutting down stream send loop");
                            break;
                        }
                    },
                    _ = &mut shutdown_receiver =>  {
                        log::info!("Shutting down stream send loop");
                        break;
                    },
                }
            }
        });

        stream
    }

    // Maybe take addr into this instead for a bit nicer api
    pub async fn connect(&self) -> anyhow::Result<()> {
        // Extra brackets to ensure state_mut is dropped pre .await
        let (header, rc) = { self.state_mut().syn_header() };

        log::debug!("Sending SYN");
        self.send_packet(
            Packet {
                header,
                data: Bytes::new(),
            },
            true,
        )
        .await?;
        rc.await?;
        Ok(())
    }

    #[cfg(test)]
    pub async fn send_syn(&self) -> anyhow::Result<()> {
        // Extra brackets to ensure state_mut is dropped pre .await
        let (header, _rc) = { self.state_mut().syn_header() };

        log::debug!("Sending SYN");
        self.send_packet(
            Packet {
                header,
                data: Bytes::new(),
            },
            true,
        )
        .await?;
        Ok(())
    }

    async fn flush_outbuf(&self) -> anyhow::Result<()> {
        // TODO avoid cloning here, perhaps an extra layer like "Outgoing packet"
        // which could also help with keeping track of resends etc. The reorder buffer needs
        // to support draining operations or a normal buffer is used instead
        let packets = {
            let state = self.state();
            if state.connection_state != ConnectionState::Connected
                // (Allow for initial syn to go out) 
                && !matches!(state.connection_state, ConnectionState::SynSent { .. })
            {
                // not connected yet
                log::debug!("Not yet connected, holding on to outgoing buffer");
                return Ok(());
            }
            let packets: Vec<Packet> = state.outgoing_buffer.iter().cloned().collect();
            log::debug!("Flushing outgoing buffer len: {}", packets.len());
            // TODO: Since there is no filtering based on rtt here we will
            // spam the receiver until everything is acked
            packets
        };
        if let Some(socket) = self.weak_socket.upgrade() {
            for packet in packets.into_iter() {
                {
                    let state = self.state();
                    if state.cur_window + packet.size() > state.stream_window_size() {
                        log::warn!("Window to small to send packet, skipping");
                        continue;
                    }
                }
                let mut packet_bytes = vec![0; HEADER_SIZE as usize + packet.data.len()];
                packet_bytes[..HEADER_SIZE as usize].copy_from_slice(&packet.header.to_bytes());
                packet_bytes[HEADER_SIZE as usize..].copy_from_slice(&packet.data);
                let bytes_sent = packet_bytes.len();
                log::debug!(
                    "Sending {:?} bytes: {} to addr: {}",
                    packet.header.packet_type,
                    bytes_sent,
                    self.addr,
                );
                // reuse buf?
                let (result, _buf) = socket.send_to(packet_bytes, self.addr).await;
                let _ = result?;
                let mut state = self.state_mut();
                // Note that only the data is appended here and not the entire packet size
                debug_assert!(bytes_sent < u32::MAX as usize);
                state.cur_window += packet.data.len() as u32;
            }
        } else {
            anyhow::bail!("Failed to send packets, socket dropped");
        }
        Ok(())
    }

    async fn send_packet(&self, packet: Packet, only_once: bool) -> anyhow::Result<()> {
        let seq_nr = packet.header.seq_nr;
        self.state_mut().outgoing_buffer.insert(packet);
        self.flush_outbuf().await?;
        // only used for sending initial syn so far
        if only_once {
            self.state_mut().outgoing_buffer.remove(seq_nr);
        }
        Ok(())
    }

    async fn ack_packet(&self, seq_nr: u16) -> anyhow::Result<()> {
        if let Some(socket) = self.weak_socket.upgrade() {
            // TODO: potentially have an buffer of pending acks here and send
            // at once much like in the flush outbuf impl, perhaps possible
            // to then also share more code.

            // No need to wrap the header in a packet struct
            // since the body is always empty here
            let ack_header = {
                let mut state = self.state_mut();
                state.ack_nr = seq_nr;
                state.ack()
            };
            let packet_bytes = ack_header.to_bytes();
            log::debug!(
                "Sending Ack bytes: {} to addr: {}",
                packet_bytes.len(),
                self.addr,
            );
            // reuse buf?
            let (result, _buf) = socket.send_to(packet_bytes, self.addr).await;
            let _ = result?;
        } else {
            anyhow::bail!("Failed to ack packets, socket dropped");
        }
        Ok(())
    }

    // TODO (do_ledbat)
    //fn adjust_max_window(&mut self) {}

    pub(crate) async fn process_incoming(&self, packet: Packet) -> anyhow::Result<()> {
        let packet_header = packet.header;

        // Special case where the connection have not yet
        // been fully established so the conn_id will be -1
        // for the initial SYN packet.
        let conn_id = if packet_header.packet_type == PacketType::Syn {
            packet_header.conn_id + 1
        } else {
            packet_header.conn_id
        };
        if self.state().conn_id_recv != conn_id && packet_header.packet_type != PacketType::Syn {
            anyhow::bail!(
                "Received invalid packet connection id: {}, expected: {}",
                packet_header.conn_id,
                self.state().conn_id_recv
            )
        }

        let dist_from_expected = {
            let mut state = self.state_mut();
            // Sequence number used to check that the ack is valid.
            // If we receive an ack for a packet past our seq_nr
            // we have received an ack for an unsent packet which is incorrect.
            if state.seq_nr < packet_header.ack_nr {
                // Don't kill the connection based on an invalid ack. It's possible for
                // 3rd party to inject packets into the stream for DDosing purposes
                log::warn!("Incoming ack_nr was invalid, packet acked has never been sent");
                return Ok(());
            }

            // TODO: handle eof

            let their_delay = if packet_header.timestamp_microseconds == 0 {
                // I supose this is for incoming traffic that wants to open
                // new connections?
                0
            } else {
                let time = get_microseconds();
                time - packet_header.timestamp_microseconds as u64
            };
            state.reply_micro = their_delay as u32;
            state.their_advertised_window = packet_header.wnd_size;

            if packet.header.packet_type == PacketType::State {
                // If it's an ack packet we always consider it to be in order since the only
                // out of order acks are for seq_nrs that have never been sent and that's checked
                // above
                0
            } else {
                // Ack nr should have been set after connection has been established
                debug_assert!(state.ack_nr != 0);
                // The number of packets past the expected packet. Diff between acked
                // up until and current -1 gives 0 the meaning of this being the next
                // expected packet in the sequence.
                packet_header.seq_nr as i32 - state.ack_nr as i32 - 1
            }
        };

        match dist_from_expected.cmp(&0) {
            std::cmp::Ordering::Less => {
                log::info!("Got packet already acked: {:?}", packet.header.packet_type);
                Ok(())
            }
            std::cmp::Ordering::Equal => {
                // In order data
                // Did we receive new data?
                let mut data_available = packet.header.packet_type == PacketType::Data;
                self.handle_inorder_packet(packet).await?;

                let mut seq_nr = packet_header.seq_nr;
                // Avoid borrowing across await point
                let get_next = |seq_nr: u16| self.state_mut().incoming_buffer.remove(seq_nr);
                while let Some(packet) = get_next(seq_nr) {
                    data_available |= packet.header.packet_type == PacketType::Data;
                    self.handle_inorder_packet(packet).await?;
                    seq_nr += 1;
                }
                if data_available {
                    self.data_available.notify_waiters();
                }
                Ok(())
            }
            std::cmp::Ordering::Greater => {
                log::debug!("Got out of order packet");
                let mut state = self.state_mut();
                if packet.data.len() <= state.our_advertised_window() as usize {
                    state.incoming_buffer.insert(packet);
                } else {
                    log::warn!("Stream window not respected, packet dropped");
                }
                // Out of order packet
                Ok(())
            }
        }
    }

    // Perhaps take ownership here instead?
    // Also since this risks reading one packet at a time a read_exact
    // method or equivalent should probably also be added
    pub async fn read(&self, buffer: &mut [u8]) -> usize {
        // If there exists data in the recieve buffer we return it
        // otherwise this should block until either a FIN, RESET or
        // new data is received.
        loop {
            let data_available = { self.state().receive_buf_cursor };
            // Check connection state here as well so connections can
            // be properly terminated
            if data_available == 0 {
                self.data_available.notified().await;
            } else {
                break;
            }
        }

        let mut state = self.state_mut();
        if buffer.len() <= state.receive_buf_cursor {
            let len = buffer.len();
            buffer[..].copy_from_slice(&state.receive_buf[..len]);
            state.receive_buf.copy_within(len.., 0);
            state.receive_buf_cursor -= len;
            buffer.len()
        } else {
            let data_read = state.receive_buf_cursor;
            buffer[0..state.receive_buf_cursor]
                .copy_from_slice(&state.receive_buf[..state.receive_buf_cursor]);
            state.receive_buf_cursor = 0;
            data_read
        }
    }

    pub async fn write(&self, data: Vec<u8>) -> anyhow::Result<()> {
        if (data.len() as i32 - HEADER_SIZE) > MTU as i32 {
            log::warn!("Fragmentation is not supported yet");
            Ok(())
        } else {
            let packet = {
                let mut state = self.state_mut();
                let header = state.data();
                Packet {
                    header,
                    data: data.into(),
                }
            };
            self.send_packet(packet, false).await
        }
    }

    async fn handle_inorder_packet(&self, packet: Packet) -> anyhow::Result<()> {
        let conn_state = std::mem::replace(
            &mut self.state_mut().connection_state,
            ConnectionState::Idle,
        );
        match (packet.header.packet_type, conn_state) {
            // Outgoing connection completion
            (PacketType::State, conn_state) => {
                let mut state = self.state_mut();
                state.ack_nr = packet.header.seq_nr;

                if let ConnectionState::SynSent { connect_notifier } = conn_state {
                    state.connection_state = ConnectionState::Connected;
                    if connect_notifier.send(()).is_err() {
                        log::warn!("Connect notify receiver dropped");
                    }
                    // Syn is only sent once so not currently present in outgoing buffer
                    log::debug!("SYN_ACK");
                } else {
                    if let Some(pkt) = state.outgoing_buffer.remove(packet.header.ack_nr) {
                        // Update cur_window to reflect that the outgoing packet has been acked
                        state.cur_window -= pkt.data.len() as u32;
                    } else {
                        log::error!("Recevied ack for packet not inside the outgoing_buffer");
                    }
                    // Reset connection state if it wasn't modified
                    state.connection_state = conn_state;
                }
            }
            (PacketType::Data, ConnectionState::Connected) => {
                let was_consumed = self.state_mut().try_consume(&packet.data);
                if was_consumed {
                    self.ack_packet(packet.header.seq_nr).await?;
                }
                // Reset connection state if it wasn't modified
                self.state_mut().connection_state = ConnectionState::Connected;
            }
            (PacketType::Fin, conn_state) => {
                let mut state = self.state_mut();
                log::trace!("Received FIN: {}", self.addr);
                state.eof_pkt = Some(packet.header.seq_nr);
                log::info!("Connection closed: {}", self.addr);

                // more stuff here
                //
                // Reset connection state if it wasn't modified
                state.connection_state = conn_state;
            }
            (PacketType::Syn, ConnectionState::SynReceived) => {
                // A bit confusing but SynReceived is the state
                // set when creating the stream because a syn was received
                // even though it has yet to be handled (acked)
                log::debug!("Acking received SYN");
                self.ack_packet(packet.header.seq_nr).await?;
                // Reset connection state if it wasn't modified
                self.state_mut().connection_state = ConnectionState::SynReceived;
            }
            (PacketType::Data, ConnectionState::SynReceived) => {
                // At this point we have received an _inorder_ data
                // packet which means the initial SYN (current seq_nr - 1) already has been
                // acked and presumably been received by the sender so we can now
                // transition into Connected state
                let was_consumed = {
                    let mut state = self.state_mut();
                    if state.try_consume(&packet.data) {
                        // We are now connected!
                        log::info!("Incoming connection established!");
                        state.connection_state = ConnectionState::Connected;
                        true
                    } else {
                        false
                    }
                };
                if was_consumed {
                    self.ack_packet(packet.header.seq_nr).await?;
                } else {
                    anyhow::bail!(
                        "Initial data packet doesn't fit receive buffer, stream is misconfigured"
                    );
                }
            }
            (p_type, conn_state) => {
                let mut state = self.state_mut();
                log::error!("Unhandled packet type!: {:?}", p_type);
                // Reset connection state if it wasn't modified
                state.connection_state = conn_state;
            }
        }
        Ok(())
    }

    pub(crate) fn state_mut(&self) -> RefMut<'_, StreamState> {
        self.inner.borrow_mut()
    }

    pub(crate) fn state(&self) -> Ref<'_, StreamState> {
        self.inner.borrow()
    }
}

impl Drop for UtpStream {
    fn drop(&mut self) {
        // Only shutdown if this + the stream used in the send loop are the last clone
        if Rc::strong_count(&self.inner) == 2 {
            // The socket will detect that the inner state have been dropped
            // after the send loop have shutdown and remove it from the map
            self.state_mut()
                .shutdown_signal
                .take()
                .unwrap()
                .send(())
                .unwrap();
        }
    }
}