webrtc_sctp/queue/
payload_queue.rs1use std::collections::{HashMap, VecDeque};
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use portable_atomic::AtomicUsize;
6
7use crate::chunk::chunk_payload_data::ChunkPayloadData;
8use crate::chunk::chunk_selective_ack::GapAckBlock;
9use crate::util::*;
10
11#[derive(Default, Debug)]
12pub(crate) struct PayloadQueue {
13 pub(crate) length: Arc<AtomicUsize>,
14 pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
15 pub(crate) sorted: VecDeque<u32>,
16 pub(crate) dup_tsn: Vec<u32>,
17 pub(crate) n_bytes: usize,
18}
19
20impl PayloadQueue {
21 pub(crate) fn new(length: Arc<AtomicUsize>) -> Self {
22 length.store(0, Ordering::SeqCst);
23 PayloadQueue {
24 length,
25 ..Default::default()
26 }
27 }
28
29 pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
30 !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
31 }
32
33 pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
34 let tsn = p.tsn;
35 self.n_bytes += p.user_data.len();
36 self.chunk_map.insert(tsn, p);
37 self.length.fetch_add(1, Ordering::SeqCst);
38
39 if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) {
40 self.sorted.push_back(tsn);
41 } else if sna32lt(tsn, *self.sorted.front().unwrap()) {
42 self.sorted.push_front(tsn);
43 } else {
44 fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering {
45 if sna32lt(a, b) {
46 std::cmp::Ordering::Less
47 } else {
48 std::cmp::Ordering::Greater
49 }
50 }
51 let pos = match self
52 .sorted
53 .binary_search_by(|element| compare_tsn(*element, tsn))
54 {
55 Ok(pos) => pos,
56 Err(pos) => pos,
57 };
58 self.sorted.insert(pos, tsn);
59 }
60 }
61
62 pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool {
66 let ok = self.chunk_map.contains_key(&p.tsn);
67 if ok || sna32lte(p.tsn, cumulative_tsn) {
68 self.dup_tsn.push(p.tsn);
70 return false;
71 }
72
73 self.push_no_check(p);
74 true
75 }
76
77 pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
79 if Some(&tsn) == self.sorted.front() {
80 self.sorted.pop_front();
81 if let Some(c) = self.chunk_map.remove(&tsn) {
82 self.length.fetch_sub(1, Ordering::SeqCst);
83 self.n_bytes -= c.user_data.len();
84 return Some(c);
85 }
86 }
87
88 None
89 }
90
91 pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> {
93 self.chunk_map.get(&tsn)
94 }
95 pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> {
96 self.chunk_map.get_mut(&tsn)
97 }
98
99 pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> {
101 self.dup_tsn.drain(..).collect()
102 }
103
104 pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> {
105 if self.chunk_map.is_empty() {
106 return vec![];
107 }
108
109 let mut b = GapAckBlock::default();
110 let mut gap_ack_blocks = vec![];
111 for (i, tsn) in self.sorted.iter().enumerate() {
112 let diff = if *tsn >= cumulative_tsn {
113 (*tsn - cumulative_tsn) as u16
114 } else {
115 0
116 };
117
118 if i == 0 {
119 b.start = diff;
120 b.end = b.start;
121 } else if b.end + 1 == diff {
122 b.end += 1;
123 } else {
124 gap_ack_blocks.push(b);
125
126 b.start = diff;
127 b.end = diff;
128 }
129 }
130
131 gap_ack_blocks.push(b);
132
133 gap_ack_blocks
134 }
135
136 pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String {
137 let mut s = format!("cumTSN={cumulative_tsn}");
138 for b in self.get_gap_ack_blocks(cumulative_tsn) {
139 s += format!(",{}-{}", b.start, b.end).as_str();
140 }
141 s
142 }
143
144 pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize {
145 let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) {
146 c.acked = true;
147 c.retransmit = false;
148 let n = c.user_data.len();
149 self.n_bytes -= n;
150 c.user_data.clear();
151 n
152 } else {
153 0
154 };
155
156 n_bytes_acked
157 }
158
159 pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
160 self.sorted.back()
161 }
162
163 pub(crate) fn mark_all_to_retrasmit(&mut self) {
164 for c in self.chunk_map.values_mut() {
165 if c.acked || c.abandoned() {
166 continue;
167 }
168 c.retransmit = true;
169 }
170 }
171
172 pub(crate) fn get_num_bytes(&self) -> usize {
173 self.n_bytes
174 }
175
176 pub(crate) fn len(&self) -> usize {
177 assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst));
178 self.chunk_map.len()
179 }
180
181 pub(crate) fn is_empty(&self) -> bool {
182 self.len() == 0
183 }
184}