port_alloc/
lib.rs

1use fnv::FnvHasher;
2use futures_util::stream::StreamExt;
3use signal_hook_tokio::v0_3::Signals;
4
5use std::{
6    collections::HashMap,
7    hash::Hasher,
8    io::Error,
9    net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
10    sync::{Arc, RwLock},
11    thread,
12    time::{Duration, Instant},
13};
14
15fn fnv1a(bytes: &[u8]) -> u64 {
16    let mut hasher = FnvHasher::default();
17    hasher.write(bytes);
18    hasher.finish()
19}
20
21#[derive(Debug)]
22pub struct PortEntry {
23    pub id: Vec<u8>,
24    pub port: u16,
25    pub start: Instant,
26}
27
28impl PortEntry {
29    pub fn new(id: &[u8], port: u16) -> Self {
30        Self {
31            id: id.to_vec(),
32            port,
33            start: Instant::now(),
34        }
35    }
36}
37
38struct Inner {
39    min: u64,
40    max: u64,
41    alloc_callback: Option<Box<dyn Fn(&[u8]) + Send + Sync + 'static>>,
42    dealloc_callback: Option<Box<dyn Fn(&[u8]) + Send + Sync + 'static>>,
43    entries: HashMap<u16, PortEntry>,
44    timeout: Duration,
45}
46
47impl Inner {
48    fn get_port(&self, id: &[u8]) -> u16 {
49        (fnv1a(id) % (self.max - self.min) + self.min) as u16
50    }
51
52    fn dealloc_timeout_ports(&mut self) {
53        let ids: Vec<Vec<u8>> = self
54            .entries
55            .values()
56            .filter(|e| e.start.elapsed() > self.timeout)
57            .map(|e| e.id.to_owned())
58            .collect();
59
60        for id in ids {
61            self.dealloc(&id);
62        }
63    }
64
65    fn dealloc_all_ports(&mut self) {
66        let ids: Vec<Vec<u8>> = self.entries.values().map(|e| e.id.to_owned()).collect();
67
68        for id in ids {
69            self.dealloc(&id);
70        }
71    }
72
73    fn is_open(port: u16) -> bool {
74        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
75        TcpStream::connect(&addr).is_ok()
76    }
77
78    fn dealloc(&mut self, id: &[u8]) -> Option<PortEntry> {
79        self.entries.remove(&self.get_port(id)).and_then(|e| {
80            self.dealloc_callback.as_ref().and_then(|cb| {
81                cb(&e.id);
82                Some(e)
83            })
84        })
85    }
86
87    fn alloc_timeout(&mut self, id: &[u8], timeout: Duration) -> Option<u16> {
88        let port = self.get_port(id);
89        self.entries.entry(port).or_insert(PortEntry::new(id, port));
90        self.entries
91            .get(&port)
92            .and_then(|e| if &e.id == id { Some(e.port) } else { None })
93            .and_then(|port| {
94                if !Inner::is_open(port) {
95                    if let Some(ref cb) = self.alloc_callback {
96                        cb(id);
97                    }
98
99                    let start = Instant::now();
100                    loop {
101                        if Inner::is_open(port) {
102                            return Some(port);
103                        } else if start.elapsed() > timeout {
104                            self.entries.remove(&port);
105                            return None;
106                        } else {
107                            thread::sleep(Duration::from_millis(250));
108                        }
109                    }
110                } else {
111                    return Some(port);
112                }
113            })
114    }
115}
116
117#[derive(Clone)]
118pub struct PortAlloc {
119    inner: Arc<RwLock<Inner>>,
120}
121
122impl PortAlloc {
123    pub fn new(min: u16, max: u16, timeout: Duration) -> Self {
124        let inner = Inner {
125            min: min as u64,
126            max: max as u64,
127            alloc_callback: None,
128            dealloc_callback: None,
129            entries: HashMap::new(),
130            timeout,
131        };
132
133        Self {
134            inner: Arc::new(RwLock::new(inner)),
135        }
136    }
137
138    pub fn dealloc_timeout_ports(&self) {
139        self.inner.write().unwrap().dealloc_timeout_ports()
140    }
141
142    pub fn dealloc_all_ports(&self) {
143        self.inner.write().unwrap().dealloc_all_ports()
144    }
145
146    pub fn set_alloc_callback<F>(&self, f: F)
147    where
148        F: Fn(&[u8]) + Send + Sync + 'static,
149    {
150        self.inner.write().unwrap().alloc_callback = Some(Box::new(f));
151    }
152
153    pub fn set_dealloc_callback<F>(&self, f: F)
154    where
155        F: Fn(&[u8]) + Send + Sync + 'static,
156    {
157        self.inner.write().unwrap().dealloc_callback = Some(Box::new(f));
158    }
159
160    pub fn alloc_timeout<T: AsRef<[u8]>>(&self, id: T, timeout: Duration) -> Option<u16> {
161        self.inner
162            .write()
163            .unwrap()
164            .alloc_timeout(id.as_ref(), timeout)
165    }
166
167    pub fn dealloc<T: AsRef<[u8]>>(&self, id: T) -> Option<PortEntry> {
168        self.inner.write().unwrap().dealloc(id.as_ref())
169    }
170
171    pub async fn handle_signals(&self, signals: Signals) {
172        let mut signals = signals.fuse();
173        while let Some(_signal) = signals.next().await {
174            self.dealloc_all_ports();
175        }
176    }
177
178    pub async fn wait_process_exit(&self) -> Result<(), Error> {
179        let signals = Signals::new(&[
180            signal_hook::SIGHUP,
181            signal_hook::SIGTERM,
182            signal_hook::SIGINT,
183            signal_hook::SIGQUIT,
184        ])?;
185
186        let handle = signals.handle();
187
188        let cloned = self.clone();
189        let signals_task = tokio::spawn(async move { cloned.handle_signals(signals).await });
190        handle.close();
191        signals_task.await?;
192        Ok(())
193    }
194}