remoc/chmux/
port_allocator.rs

1use std::{
2    borrow::Borrow,
3    collections::HashSet,
4    fmt,
5    hash::Hash,
6    mem,
7    ops::Deref,
8    sync::{Arc, Mutex},
9};
10use tokio::sync::oneshot;
11
12struct PortAllocatorInner {
13    used: HashSet<u32>,
14    limit: u32,
15    notify_tx: Vec<oneshot::Sender<()>>,
16}
17
18impl PortAllocatorInner {
19    fn is_available(&self) -> bool {
20        self.used.len() <= self.limit as usize
21    }
22
23    fn try_allocate(&mut self, this: Arc<Mutex<PortAllocatorInner>>) -> Option<PortNumber> {
24        if self.is_available() {
25            let number = loop {
26                let cand = rand::random();
27                if !self.used.contains(&cand) {
28                    break cand;
29                }
30            };
31
32            self.used.insert(number);
33            Some(PortNumber { number, allocator: this })
34        } else {
35            None
36        }
37    }
38}
39
40/// Local port number allocator.
41///
42/// State is shared between clones of this type.
43#[derive(Clone)]
44pub struct PortAllocator(Arc<Mutex<PortAllocatorInner>>);
45
46impl fmt::Debug for PortAllocator {
47    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48        let inner = self.0.lock().unwrap();
49        f.debug_struct("PortAllocator").field("used", &inner.used.len()).field("limit", &inner.limit).finish()
50    }
51}
52
53impl PortAllocator {
54    /// Creates a new port number allocator.
55    pub(crate) fn new(limit: u32) -> PortAllocator {
56        let inner = PortAllocatorInner { used: HashSet::new(), limit, notify_tx: Vec::new() };
57        PortAllocator(Arc::new(Mutex::new(inner)))
58    }
59
60    /// Allocates a local port number.
61    ///
62    /// Port numbers are allocated randomly.
63    /// If all ports are currently in use, this waits for a port number to become available.
64    pub async fn allocate(&self) -> PortNumber {
65        loop {
66            let rx = {
67                let mut inner = self.0.lock().unwrap();
68                match inner.try_allocate(self.0.clone()) {
69                    Some(number) => return number,
70                    None => {
71                        let (tx, rx) = oneshot::channel();
72                        inner.notify_tx.push(tx);
73                        rx
74                    }
75                }
76            };
77
78            let _ = rx.await;
79        }
80    }
81
82    /// Tries to allocate a local port number.
83    ///
84    /// If all port are currently in use, this returns [None].
85    pub fn try_allocate(&self) -> Option<PortNumber> {
86        let mut inner = self.0.lock().unwrap();
87        inner.try_allocate(self.0.clone())
88    }
89}
90
91/// An allocated local port number.
92///
93/// When this is dropped, the allocated is automatically released.
94pub struct PortNumber {
95    number: u32,
96    allocator: Arc<Mutex<PortAllocatorInner>>,
97}
98
99impl fmt::Debug for PortNumber {
100    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101        write!(f, "{:?}", self.number)
102    }
103}
104
105impl fmt::Display for PortNumber {
106    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
107        write!(f, "{}", self.number)
108    }
109}
110
111impl Deref for PortNumber {
112    type Target = u32;
113
114    fn deref(&self) -> &Self::Target {
115        &self.number
116    }
117}
118
119impl PartialEq for PortNumber {
120    fn eq(&self, other: &Self) -> bool {
121        **self == **other
122    }
123}
124
125impl Eq for PortNumber {}
126
127impl PartialOrd for PortNumber {
128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
129        Some(self.cmp(other))
130    }
131}
132
133impl Ord for PortNumber {
134    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
135        self.number.cmp(&other.number)
136    }
137}
138
139impl Hash for PortNumber {
140    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
141        (**self).hash(state)
142    }
143}
144
145impl Borrow<u32> for PortNumber {
146    fn borrow(&self) -> &u32 {
147        &self.number
148    }
149}
150
151impl Drop for PortNumber {
152    fn drop(&mut self) {
153        let notify_tx = {
154            let mut inner = self.allocator.lock().unwrap();
155            inner.used.remove(&self.number);
156            mem::take(&mut inner.notify_tx)
157        };
158
159        for tx in notify_tx {
160            let _ = tx.send(());
161        }
162    }
163}
164
165/// A port connection request by the local endpoint.
166///
167/// The id can be set freely by the user.
168/// It is initialized to the [port number](Self::port).
169#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
170pub struct PortReq {
171    /// The allocated, local port number.
172    pub port: PortNumber,
173    /// A user-specified id.
174    pub id: u32,
175}
176
177impl From<PortNumber> for PortReq {
178    /// Create a new port connection request with [`id`](Self::id) set to
179    /// the [port number](Self::port).
180    fn from(port: PortNumber) -> Self {
181        Self { id: port.number, port }
182    }
183}
184
185impl From<PortReq> for PortNumber {
186    fn from(req: PortReq) -> Self {
187        req.port
188    }
189}
190
191impl PortReq {
192    /// Create a new port connection request with [`id`](Self::id) set to
193    /// the [port number](Self::port).
194    pub fn new(port: PortNumber) -> Self {
195        Self::from(port)
196    }
197
198    /// Sets the id to the specified value.
199    pub fn with_id(mut self, id: u32) -> Self {
200        self.id = id;
201        self
202    }
203}