webrtc_sctp/queue/
payload_queue.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use portable_atomic::AtomicUsize;
6
7use crate::chunk::chunk_payload_data::ChunkPayloadData;
8use crate::chunk::chunk_selective_ack::GapAckBlock;
9use crate::util::*;
10
11#[derive(Default, Debug)]
12pub(crate) struct PayloadQueue {
13    pub(crate) length: Arc<AtomicUsize>,
14    pub(crate) chunk_map: HashMap<u32, ChunkPayloadData>,
15    pub(crate) sorted: VecDeque<u32>,
16    pub(crate) dup_tsn: Vec<u32>,
17    pub(crate) n_bytes: usize,
18}
19
20impl PayloadQueue {
21    pub(crate) fn new(length: Arc<AtomicUsize>) -> Self {
22        length.store(0, Ordering::SeqCst);
23        PayloadQueue {
24            length,
25            ..Default::default()
26        }
27    }
28
29    pub(crate) fn can_push(&self, p: &ChunkPayloadData, cumulative_tsn: u32) -> bool {
30        !(self.chunk_map.contains_key(&p.tsn) || sna32lte(p.tsn, cumulative_tsn))
31    }
32
33    pub(crate) fn push_no_check(&mut self, p: ChunkPayloadData) {
34        let tsn = p.tsn;
35        self.n_bytes += p.user_data.len();
36        self.chunk_map.insert(tsn, p);
37        self.length.fetch_add(1, Ordering::SeqCst);
38
39        if self.sorted.is_empty() || sna32gt(tsn, *self.sorted.back().unwrap()) {
40            self.sorted.push_back(tsn);
41        } else if sna32lt(tsn, *self.sorted.front().unwrap()) {
42            self.sorted.push_front(tsn);
43        } else {
44            fn compare_tsn(a: u32, b: u32) -> std::cmp::Ordering {
45                if sna32lt(a, b) {
46                    std::cmp::Ordering::Less
47                } else {
48                    std::cmp::Ordering::Greater
49                }
50            }
51            let pos = match self
52                .sorted
53                .binary_search_by(|element| compare_tsn(*element, tsn))
54            {
55                Ok(pos) => pos,
56                Err(pos) => pos,
57            };
58            self.sorted.insert(pos, tsn);
59        }
60    }
61
62    /// push pushes a payload data. If the payload data is already in our queue or
63    /// older than our cumulative_tsn marker, it will be recorded as duplications,
64    /// which can later be retrieved using popDuplicates.
65    pub(crate) fn push(&mut self, p: ChunkPayloadData, cumulative_tsn: u32) -> bool {
66        let ok = self.chunk_map.contains_key(&p.tsn);
67        if ok || sna32lte(p.tsn, cumulative_tsn) {
68            // Found the packet, log in dups
69            self.dup_tsn.push(p.tsn);
70            return false;
71        }
72
73        self.push_no_check(p);
74        true
75    }
76
77    /// pop pops only if the oldest chunk's TSN matches the given TSN.
78    pub(crate) fn pop(&mut self, tsn: u32) -> Option<ChunkPayloadData> {
79        if Some(&tsn) == self.sorted.front() {
80            self.sorted.pop_front();
81            if let Some(c) = self.chunk_map.remove(&tsn) {
82                self.length.fetch_sub(1, Ordering::SeqCst);
83                self.n_bytes -= c.user_data.len();
84                return Some(c);
85            }
86        }
87
88        None
89    }
90
91    /// get returns reference to chunkPayloadData with the given TSN value.
92    pub(crate) fn get(&self, tsn: u32) -> Option<&ChunkPayloadData> {
93        self.chunk_map.get(&tsn)
94    }
95    pub(crate) fn get_mut(&mut self, tsn: u32) -> Option<&mut ChunkPayloadData> {
96        self.chunk_map.get_mut(&tsn)
97    }
98
99    /// popDuplicates returns an array of TSN values that were found duplicate.
100    pub(crate) fn pop_duplicates(&mut self) -> Vec<u32> {
101        self.dup_tsn.drain(..).collect()
102    }
103
104    pub(crate) fn get_gap_ack_blocks(&self, cumulative_tsn: u32) -> Vec<GapAckBlock> {
105        if self.chunk_map.is_empty() {
106            return vec![];
107        }
108
109        let mut b = GapAckBlock::default();
110        let mut gap_ack_blocks = vec![];
111        for (i, tsn) in self.sorted.iter().enumerate() {
112            let diff = if *tsn >= cumulative_tsn {
113                (*tsn - cumulative_tsn) as u16
114            } else {
115                0
116            };
117
118            if i == 0 {
119                b.start = diff;
120                b.end = b.start;
121            } else if b.end + 1 == diff {
122                b.end += 1;
123            } else {
124                gap_ack_blocks.push(b);
125
126                b.start = diff;
127                b.end = diff;
128            }
129        }
130
131        gap_ack_blocks.push(b);
132
133        gap_ack_blocks
134    }
135
136    pub(crate) fn get_gap_ack_blocks_string(&self, cumulative_tsn: u32) -> String {
137        let mut s = format!("cumTSN={cumulative_tsn}");
138        for b in self.get_gap_ack_blocks(cumulative_tsn) {
139            s += format!(",{}-{}", b.start, b.end).as_str();
140        }
141        s
142    }
143
144    pub(crate) fn mark_as_acked(&mut self, tsn: u32) -> usize {
145        let n_bytes_acked = if let Some(c) = self.chunk_map.get_mut(&tsn) {
146            c.acked = true;
147            c.retransmit = false;
148            let n = c.user_data.len();
149            self.n_bytes -= n;
150            c.user_data.clear();
151            n
152        } else {
153            0
154        };
155
156        n_bytes_acked
157    }
158
159    pub(crate) fn get_last_tsn_received(&self) -> Option<&u32> {
160        self.sorted.back()
161    }
162
163    pub(crate) fn mark_all_to_retrasmit(&mut self) {
164        for c in self.chunk_map.values_mut() {
165            if c.acked || c.abandoned() {
166                continue;
167            }
168            c.retransmit = true;
169        }
170    }
171
172    pub(crate) fn get_num_bytes(&self) -> usize {
173        self.n_bytes
174    }
175
176    pub(crate) fn len(&self) -> usize {
177        assert_eq!(self.chunk_map.len(), self.length.load(Ordering::SeqCst));
178        self.chunk_map.len()
179    }
180
181    pub(crate) fn is_empty(&self) -> bool {
182        self.len() == 0
183    }
184}