sctp_async/queue/
pending_queue.rs1use crate::chunk::chunk_payload_data::ChunkPayloadData;
2
3use std::collections::VecDeque;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use tokio::sync::Mutex;
6
7pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
9
10#[derive(Debug, Default)]
14pub(crate) struct PendingQueue {
15 unordered_queue: Mutex<PendingBaseQueue>,
16 ordered_queue: Mutex<PendingBaseQueue>,
17 queue_len: AtomicUsize,
18 n_bytes: AtomicUsize,
19 selected: AtomicBool,
20 unordered_is_selected: AtomicBool,
21}
22
23impl PendingQueue {
24 pub(crate) fn new() -> Self {
25 PendingQueue::default()
26 }
27
28 pub(crate) async fn push(&self, c: ChunkPayloadData) {
29 self.n_bytes.fetch_add(c.user_data.len(), Ordering::SeqCst);
30 if c.unordered {
31 let mut unordered_queue = self.unordered_queue.lock().await;
32 unordered_queue.push_back(c);
33 } else {
34 let mut ordered_queue = self.ordered_queue.lock().await;
35 ordered_queue.push_back(c);
36 }
37 self.queue_len.fetch_add(1, Ordering::SeqCst);
38 }
39
40 pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> {
41 if self.selected.load(Ordering::SeqCst) {
42 if self.unordered_is_selected.load(Ordering::SeqCst) {
43 let unordered_queue = self.unordered_queue.lock().await;
44 return unordered_queue.get(0).cloned();
45 } else {
46 let ordered_queue = self.ordered_queue.lock().await;
47 return ordered_queue.get(0).cloned();
48 }
49 }
50
51 let c = {
52 let unordered_queue = self.unordered_queue.lock().await;
53 unordered_queue.get(0).cloned()
54 };
55
56 if c.is_some() {
57 return c;
58 }
59
60 let ordered_queue = self.ordered_queue.lock().await;
61 ordered_queue.get(0).cloned()
62 }
63
64 pub(crate) async fn pop(
65 &self,
66 beginning_fragment: bool,
67 unordered: bool,
68 ) -> Option<ChunkPayloadData> {
69 let popped = if self.selected.load(Ordering::SeqCst) {
70 let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
71 let mut unordered_queue = self.unordered_queue.lock().await;
72 unordered_queue.pop_front()
73 } else {
74 let mut ordered_queue = self.ordered_queue.lock().await;
75 ordered_queue.pop_front()
76 };
77 if let Some(p) = &popped {
78 if p.ending_fragment {
79 self.selected.store(false, Ordering::SeqCst);
80 }
81 }
82 popped
83 } else {
84 if !beginning_fragment {
85 return None;
86 }
87 if unordered {
88 let popped = {
89 let mut unordered_queue = self.unordered_queue.lock().await;
90 unordered_queue.pop_front()
91 };
92 if let Some(p) = &popped {
93 if !p.ending_fragment {
94 self.selected.store(true, Ordering::SeqCst);
95 self.unordered_is_selected.store(true, Ordering::SeqCst);
96 }
97 }
98 popped
99 } else {
100 let popped = {
101 let mut ordered_queue = self.ordered_queue.lock().await;
102 ordered_queue.pop_front()
103 };
104 if let Some(p) = &popped {
105 if !p.ending_fragment {
106 self.selected.store(true, Ordering::SeqCst);
107 self.unordered_is_selected.store(false, Ordering::SeqCst);
108 }
109 }
110 popped
111 }
112 };
113
114 if let Some(p) = &popped {
115 self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst);
116 self.queue_len.fetch_sub(1, Ordering::SeqCst);
117 }
118
119 popped
120 }
121
122 pub(crate) fn get_num_bytes(&self) -> usize {
123 self.n_bytes.load(Ordering::SeqCst)
124 }
125
126 pub(crate) fn len(&self) -> usize {
127 self.queue_len.load(Ordering::SeqCst)
128 }
129
130 pub(crate) fn is_empty(&self) -> bool {
131 self.len() == 0
132 }
133}