remoc/chmux/
port_allocator.rs1use std::{
2 borrow::Borrow,
3 collections::HashSet,
4 fmt,
5 hash::Hash,
6 mem,
7 ops::Deref,
8 sync::{Arc, Mutex},
9};
10use tokio::sync::oneshot;
11
12struct PortAllocatorInner {
13 used: HashSet<u32>,
14 limit: u32,
15 notify_tx: Vec<oneshot::Sender<()>>,
16}
17
18impl PortAllocatorInner {
19 fn is_available(&self) -> bool {
20 self.used.len() <= self.limit as usize
21 }
22
23 fn try_allocate(&mut self, this: Arc<Mutex<PortAllocatorInner>>) -> Option<PortNumber> {
24 if self.is_available() {
25 let number = loop {
26 let cand = rand::random();
27 if !self.used.contains(&cand) {
28 break cand;
29 }
30 };
31
32 self.used.insert(number);
33 Some(PortNumber { number, allocator: this })
34 } else {
35 None
36 }
37 }
38}
39
40#[derive(Clone)]
44pub struct PortAllocator(Arc<Mutex<PortAllocatorInner>>);
45
46impl fmt::Debug for PortAllocator {
47 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48 let inner = self.0.lock().unwrap();
49 f.debug_struct("PortAllocator").field("used", &inner.used.len()).field("limit", &inner.limit).finish()
50 }
51}
52
53impl PortAllocator {
54 pub(crate) fn new(limit: u32) -> PortAllocator {
56 let inner = PortAllocatorInner { used: HashSet::new(), limit, notify_tx: Vec::new() };
57 PortAllocator(Arc::new(Mutex::new(inner)))
58 }
59
60 pub async fn allocate(&self) -> PortNumber {
65 loop {
66 let rx = {
67 let mut inner = self.0.lock().unwrap();
68 match inner.try_allocate(self.0.clone()) {
69 Some(number) => return number,
70 None => {
71 let (tx, rx) = oneshot::channel();
72 inner.notify_tx.push(tx);
73 rx
74 }
75 }
76 };
77
78 let _ = rx.await;
79 }
80 }
81
82 pub fn try_allocate(&self) -> Option<PortNumber> {
86 let mut inner = self.0.lock().unwrap();
87 inner.try_allocate(self.0.clone())
88 }
89}
90
91pub struct PortNumber {
95 number: u32,
96 allocator: Arc<Mutex<PortAllocatorInner>>,
97}
98
99impl fmt::Debug for PortNumber {
100 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101 write!(f, "{:?}", self.number)
102 }
103}
104
105impl fmt::Display for PortNumber {
106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107 write!(f, "{}", self.number)
108 }
109}
110
111impl Deref for PortNumber {
112 type Target = u32;
113
114 fn deref(&self) -> &Self::Target {
115 &self.number
116 }
117}
118
119impl PartialEq for PortNumber {
120 fn eq(&self, other: &Self) -> bool {
121 **self == **other
122 }
123}
124
125impl Eq for PortNumber {}
126
127impl PartialOrd for PortNumber {
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129 Some(self.cmp(other))
130 }
131}
132
133impl Ord for PortNumber {
134 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135 self.number.cmp(&other.number)
136 }
137}
138
139impl Hash for PortNumber {
140 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
141 (**self).hash(state)
142 }
143}
144
145impl Borrow<u32> for PortNumber {
146 fn borrow(&self) -> &u32 {
147 &self.number
148 }
149}
150
151impl Drop for PortNumber {
152 fn drop(&mut self) {
153 let notify_tx = {
154 let mut inner = self.allocator.lock().unwrap();
155 inner.used.remove(&self.number);
156 mem::take(&mut inner.notify_tx)
157 };
158
159 for tx in notify_tx {
160 let _ = tx.send(());
161 }
162 }
163}
164
165#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
170pub struct PortReq {
171 pub port: PortNumber,
173 pub id: u32,
175}
176
177impl From<PortNumber> for PortReq {
178 fn from(port: PortNumber) -> Self {
181 Self { id: port.number, port }
182 }
183}
184
185impl From<PortReq> for PortNumber {
186 fn from(req: PortReq) -> Self {
187 req.port
188 }
189}
190
191impl PortReq {
192 pub fn new(port: PortNumber) -> Self {
195 Self::from(port)
196 }
197
198 pub fn with_id(mut self, id: u32) -> Self {
200 self.id = id;
201 self
202 }
203}