sctp_async/queue/
pending_queue.rs

1use crate::chunk::chunk_payload_data::ChunkPayloadData;
2
3use std::collections::VecDeque;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use tokio::sync::Mutex;
6
7/// pendingBaseQueue
8pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
9
10// TODO: benchmark performance between multiple Atomic+Mutex vs one Mutex<PendingQueueInternal>
11
12/// pendingQueue
13#[derive(Debug, Default)]
14pub(crate) struct PendingQueue {
15    unordered_queue: Mutex<PendingBaseQueue>,
16    ordered_queue: Mutex<PendingBaseQueue>,
17    queue_len: AtomicUsize,
18    n_bytes: AtomicUsize,
19    selected: AtomicBool,
20    unordered_is_selected: AtomicBool,
21}
22
23impl PendingQueue {
24    pub(crate) fn new() -> Self {
25        PendingQueue::default()
26    }
27
28    pub(crate) async fn push(&self, c: ChunkPayloadData) {
29        self.n_bytes.fetch_add(c.user_data.len(), Ordering::SeqCst);
30        if c.unordered {
31            let mut unordered_queue = self.unordered_queue.lock().await;
32            unordered_queue.push_back(c);
33        } else {
34            let mut ordered_queue = self.ordered_queue.lock().await;
35            ordered_queue.push_back(c);
36        }
37        self.queue_len.fetch_add(1, Ordering::SeqCst);
38    }
39
40    pub(crate) async fn peek(&self) -> Option<ChunkPayloadData> {
41        if self.selected.load(Ordering::SeqCst) {
42            if self.unordered_is_selected.load(Ordering::SeqCst) {
43                let unordered_queue = self.unordered_queue.lock().await;
44                return unordered_queue.get(0).cloned();
45            } else {
46                let ordered_queue = self.ordered_queue.lock().await;
47                return ordered_queue.get(0).cloned();
48            }
49        }
50
51        let c = {
52            let unordered_queue = self.unordered_queue.lock().await;
53            unordered_queue.get(0).cloned()
54        };
55
56        if c.is_some() {
57            return c;
58        }
59
60        let ordered_queue = self.ordered_queue.lock().await;
61        ordered_queue.get(0).cloned()
62    }
63
64    pub(crate) async fn pop(
65        &self,
66        beginning_fragment: bool,
67        unordered: bool,
68    ) -> Option<ChunkPayloadData> {
69        let popped = if self.selected.load(Ordering::SeqCst) {
70            let popped = if self.unordered_is_selected.load(Ordering::SeqCst) {
71                let mut unordered_queue = self.unordered_queue.lock().await;
72                unordered_queue.pop_front()
73            } else {
74                let mut ordered_queue = self.ordered_queue.lock().await;
75                ordered_queue.pop_front()
76            };
77            if let Some(p) = &popped {
78                if p.ending_fragment {
79                    self.selected.store(false, Ordering::SeqCst);
80                }
81            }
82            popped
83        } else {
84            if !beginning_fragment {
85                return None;
86            }
87            if unordered {
88                let popped = {
89                    let mut unordered_queue = self.unordered_queue.lock().await;
90                    unordered_queue.pop_front()
91                };
92                if let Some(p) = &popped {
93                    if !p.ending_fragment {
94                        self.selected.store(true, Ordering::SeqCst);
95                        self.unordered_is_selected.store(true, Ordering::SeqCst);
96                    }
97                }
98                popped
99            } else {
100                let popped = {
101                    let mut ordered_queue = self.ordered_queue.lock().await;
102                    ordered_queue.pop_front()
103                };
104                if let Some(p) = &popped {
105                    if !p.ending_fragment {
106                        self.selected.store(true, Ordering::SeqCst);
107                        self.unordered_is_selected.store(false, Ordering::SeqCst);
108                    }
109                }
110                popped
111            }
112        };
113
114        if let Some(p) = &popped {
115            self.n_bytes.fetch_sub(p.user_data.len(), Ordering::SeqCst);
116            self.queue_len.fetch_sub(1, Ordering::SeqCst);
117        }
118
119        popped
120    }
121
122    pub(crate) fn get_num_bytes(&self) -> usize {
123        self.n_bytes.load(Ordering::SeqCst)
124    }
125
126    pub(crate) fn len(&self) -> usize {
127        self.queue_len.load(Ordering::SeqCst)
128    }
129
130    pub(crate) fn is_empty(&self) -> bool {
131        self.len() == 0
132    }
133}