playit_agent_core/network/udp/
packets.rs

1use std::{sync::{atomic::Ordering, Arc}, task::{Poll, Waker}};
2
3use crossbeam::queue::ArrayQueue;
4
5pub const PACKET_LEN: usize = 2048;
6
7#[derive(Clone)]
8pub struct Packets {
9    inner: Arc<PacketsInner>,
10}
11
12struct PacketsInner {
13    _buffer: Vec<u8>,
14    packet_count: usize,
15    free_packets: ArrayQueue<*mut u8>,
16    waiting: ArrayQueue<Waker>,
17}
18
19unsafe impl Send for PacketsInner {}
20unsafe impl Sync for PacketsInner {}
21
22pub struct Packet {
23    ptr: *mut u8,
24    len: usize,
25    inner: Arc<PacketsInner>,
26}
27
28unsafe impl Send for Packet {}
29unsafe impl Sync for Packet {}
30
31impl Packets {
32    pub fn new(mut packet_count: usize) -> Self {
33        packet_count = packet_count.next_power_of_two();
34        let bytes = packet_count.next_power_of_two() * PACKET_LEN;
35
36        let mut buffer = vec![0u8; bytes];
37
38        let free_packets = ArrayQueue::new(packet_count);
39        let ptr = buffer.as_mut_ptr();
40
41        for i in 0..packet_count {
42            free_packets.push(unsafe { ptr.add(i * PACKET_LEN) }).expect("free packet queue too small");
43        }
44
45        Packets {
46            inner: Arc::new(PacketsInner {
47                _buffer: buffer,
48                packet_count,
49                free_packets,
50                waiting: ArrayQueue::new(1024),
51            })
52        }
53    }
54
55    pub fn packet_count(&self) -> usize {
56        self.inner.packet_count
57    }
58
59    pub fn allocate(&self) -> Option<Packet> {
60        let ptr = self.inner.free_packets.pop()?;
61        Some(Packet { ptr, len: PACKET_LEN, inner: self.inner.clone() })
62    }
63
64    pub async fn allocate_wait(&self) -> Packet {
65        std::future::poll_fn(|cx| {
66            std::sync::atomic::fence(Ordering::Acquire);
67
68            if let Some(ptr) = self.inner.free_packets.pop() {
69                return Poll::Ready(Packet { ptr, len: PACKET_LEN, inner: self.inner.clone() });
70            }
71
72            if let Err(error) = self.inner.waiting.push(cx.waker().clone()) {
73                error.wake();
74            }
75
76            Poll::Pending
77        }).await
78    }
79}
80
81impl Drop for Packet {
82    fn drop(&mut self) {
83        self.inner.free_packets.push(self.ptr)
84            .expect("free packet queue full");
85
86        if let Some(wake) = self.inner.waiting.pop() {
87            std::sync::atomic::fence(Ordering::Release);
88            wake.wake();
89        }
90    }
91}
92
93impl AsMut<[u8]> for Packet {
94    fn as_mut(&mut self) -> &mut [u8] {
95        unsafe {
96            std::slice::from_raw_parts_mut(self.ptr, self.len)
97        }
98    }
99}
100
101impl AsRef<[u8]> for Packet {
102    fn as_ref(&self) -> &[u8] {
103        unsafe {
104            std::slice::from_raw_parts(self.ptr, self.len)
105        }
106    }
107}
108
109impl Packet {
110    pub fn full_slice_mut(&mut self) -> &mut [u8] {
111        unsafe {
112            std::slice::from_raw_parts_mut(self.ptr, PACKET_LEN)
113        }
114    }
115
116    pub fn full_slice(&self) -> &[u8] {
117        unsafe {
118            std::slice::from_raw_parts(self.ptr, PACKET_LEN)
119        }
120    }
121
122    #[allow(clippy::len_without_is_empty)]
123    pub fn len(&self) -> usize {
124        self.len
125    }
126
127    pub fn set_len(&mut self, len: usize) -> std::io::Result<()> {
128        if PACKET_LEN < len {
129            return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "packet len too large"));
130        }
131
132        self.len = len;
133        Ok(())
134    }
135}