playit_agent_core/network/udp/
packets.rs1use std::{sync::{atomic::Ordering, Arc}, task::{Poll, Waker}};
2
3use crossbeam::queue::ArrayQueue;
4
5pub const PACKET_LEN: usize = 2048;
6
7#[derive(Clone)]
8pub struct Packets {
9 inner: Arc<PacketsInner>,
10}
11
12struct PacketsInner {
13 _buffer: Vec<u8>,
14 packet_count: usize,
15 free_packets: ArrayQueue<*mut u8>,
16 waiting: ArrayQueue<Waker>,
17}
18
19unsafe impl Send for PacketsInner {}
20unsafe impl Sync for PacketsInner {}
21
22pub struct Packet {
23 ptr: *mut u8,
24 len: usize,
25 inner: Arc<PacketsInner>,
26}
27
28unsafe impl Send for Packet {}
29unsafe impl Sync for Packet {}
30
31impl Packets {
32 pub fn new(mut packet_count: usize) -> Self {
33 packet_count = packet_count.next_power_of_two();
34 let bytes = packet_count.next_power_of_two() * PACKET_LEN;
35
36 let mut buffer = vec![0u8; bytes];
37
38 let free_packets = ArrayQueue::new(packet_count);
39 let ptr = buffer.as_mut_ptr();
40
41 for i in 0..packet_count {
42 free_packets.push(unsafe { ptr.add(i * PACKET_LEN) }).expect("free packet queue too small");
43 }
44
45 Packets {
46 inner: Arc::new(PacketsInner {
47 _buffer: buffer,
48 packet_count,
49 free_packets,
50 waiting: ArrayQueue::new(1024),
51 })
52 }
53 }
54
55 pub fn packet_count(&self) -> usize {
56 self.inner.packet_count
57 }
58
59 pub fn allocate(&self) -> Option<Packet> {
60 let ptr = self.inner.free_packets.pop()?;
61 Some(Packet { ptr, len: PACKET_LEN, inner: self.inner.clone() })
62 }
63
64 pub async fn allocate_wait(&self) -> Packet {
65 std::future::poll_fn(|cx| {
66 std::sync::atomic::fence(Ordering::Acquire);
67
68 if let Some(ptr) = self.inner.free_packets.pop() {
69 return Poll::Ready(Packet { ptr, len: PACKET_LEN, inner: self.inner.clone() });
70 }
71
72 if let Err(error) = self.inner.waiting.push(cx.waker().clone()) {
73 error.wake();
74 }
75
76 Poll::Pending
77 }).await
78 }
79}
80
81impl Drop for Packet {
82 fn drop(&mut self) {
83 self.inner.free_packets.push(self.ptr)
84 .expect("free packet queue full");
85
86 if let Some(wake) = self.inner.waiting.pop() {
87 std::sync::atomic::fence(Ordering::Release);
88 wake.wake();
89 }
90 }
91}
92
93impl AsMut<[u8]> for Packet {
94 fn as_mut(&mut self) -> &mut [u8] {
95 unsafe {
96 std::slice::from_raw_parts_mut(self.ptr, self.len)
97 }
98 }
99}
100
101impl AsRef<[u8]> for Packet {
102 fn as_ref(&self) -> &[u8] {
103 unsafe {
104 std::slice::from_raw_parts(self.ptr, self.len)
105 }
106 }
107}
108
109impl Packet {
110 pub fn full_slice_mut(&mut self) -> &mut [u8] {
111 unsafe {
112 std::slice::from_raw_parts_mut(self.ptr, PACKET_LEN)
113 }
114 }
115
116 pub fn full_slice(&self) -> &[u8] {
117 unsafe {
118 std::slice::from_raw_parts(self.ptr, PACKET_LEN)
119 }
120 }
121
122 #[allow(clippy::len_without_is_empty)]
123 pub fn len(&self) -> usize {
124 self.len
125 }
126
127 pub fn set_len(&mut self, len: usize) -> std::io::Result<()> {
128 if PACKET_LEN < len {
129 return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "packet len too large"));
130 }
131
132 self.len = len;
133 Ok(())
134 }
135}