1use fnv::FnvHasher;
2use futures_util::stream::StreamExt;
3use signal_hook_tokio::v0_3::Signals;
4
5use std::{
6 collections::HashMap,
7 hash::Hasher,
8 io::Error,
9 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
10 sync::{Arc, RwLock},
11 thread,
12 time::{Duration, Instant},
13};
14
15fn fnv1a(bytes: &[u8]) -> u64 {
16 let mut hasher = FnvHasher::default();
17 hasher.write(bytes);
18 hasher.finish()
19}
20
21#[derive(Debug)]
22pub struct PortEntry {
23 pub id: Vec<u8>,
24 pub port: u16,
25 pub start: Instant,
26}
27
28impl PortEntry {
29 pub fn new(id: &[u8], port: u16) -> Self {
30 Self {
31 id: id.to_vec(),
32 port,
33 start: Instant::now(),
34 }
35 }
36}
37
38struct Inner {
39 min: u64,
40 max: u64,
41 alloc_callback: Option<Box<dyn Fn(&[u8]) + Send + Sync + 'static>>,
42 dealloc_callback: Option<Box<dyn Fn(&[u8]) + Send + Sync + 'static>>,
43 entries: HashMap<u16, PortEntry>,
44 timeout: Duration,
45}
46
47impl Inner {
48 fn get_port(&self, id: &[u8]) -> u16 {
49 (fnv1a(id) % (self.max - self.min) + self.min) as u16
50 }
51
52 fn dealloc_timeout_ports(&mut self) {
53 let ids: Vec<Vec<u8>> = self
54 .entries
55 .values()
56 .filter(|e| e.start.elapsed() > self.timeout)
57 .map(|e| e.id.to_owned())
58 .collect();
59
60 for id in ids {
61 self.dealloc(&id);
62 }
63 }
64
65 fn dealloc_all_ports(&mut self) {
66 let ids: Vec<Vec<u8>> = self.entries.values().map(|e| e.id.to_owned()).collect();
67
68 for id in ids {
69 self.dealloc(&id);
70 }
71 }
72
73 fn is_open(port: u16) -> bool {
74 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
75 TcpStream::connect(&addr).is_ok()
76 }
77
78 fn dealloc(&mut self, id: &[u8]) -> Option<PortEntry> {
79 self.entries.remove(&self.get_port(id)).and_then(|e| {
80 self.dealloc_callback.as_ref().and_then(|cb| {
81 cb(&e.id);
82 Some(e)
83 })
84 })
85 }
86
87 fn alloc_timeout(&mut self, id: &[u8], timeout: Duration) -> Option<u16> {
88 let port = self.get_port(id);
89 self.entries.entry(port).or_insert(PortEntry::new(id, port));
90 self.entries
91 .get(&port)
92 .and_then(|e| if &e.id == id { Some(e.port) } else { None })
93 .and_then(|port| {
94 if !Inner::is_open(port) {
95 if let Some(ref cb) = self.alloc_callback {
96 cb(id);
97 }
98
99 let start = Instant::now();
100 loop {
101 if Inner::is_open(port) {
102 return Some(port);
103 } else if start.elapsed() > timeout {
104 self.entries.remove(&port);
105 return None;
106 } else {
107 thread::sleep(Duration::from_millis(250));
108 }
109 }
110 } else {
111 return Some(port);
112 }
113 })
114 }
115}
116
117#[derive(Clone)]
118pub struct PortAlloc {
119 inner: Arc<RwLock<Inner>>,
120}
121
122impl PortAlloc {
123 pub fn new(min: u16, max: u16, timeout: Duration) -> Self {
124 let inner = Inner {
125 min: min as u64,
126 max: max as u64,
127 alloc_callback: None,
128 dealloc_callback: None,
129 entries: HashMap::new(),
130 timeout,
131 };
132
133 Self {
134 inner: Arc::new(RwLock::new(inner)),
135 }
136 }
137
138 pub fn dealloc_timeout_ports(&self) {
139 self.inner.write().unwrap().dealloc_timeout_ports()
140 }
141
142 pub fn dealloc_all_ports(&self) {
143 self.inner.write().unwrap().dealloc_all_ports()
144 }
145
146 pub fn set_alloc_callback<F>(&self, f: F)
147 where
148 F: Fn(&[u8]) + Send + Sync + 'static,
149 {
150 self.inner.write().unwrap().alloc_callback = Some(Box::new(f));
151 }
152
153 pub fn set_dealloc_callback<F>(&self, f: F)
154 where
155 F: Fn(&[u8]) + Send + Sync + 'static,
156 {
157 self.inner.write().unwrap().dealloc_callback = Some(Box::new(f));
158 }
159
160 pub fn alloc_timeout<T: AsRef<[u8]>>(&self, id: T, timeout: Duration) -> Option<u16> {
161 self.inner
162 .write()
163 .unwrap()
164 .alloc_timeout(id.as_ref(), timeout)
165 }
166
167 pub fn dealloc<T: AsRef<[u8]>>(&self, id: T) -> Option<PortEntry> {
168 self.inner.write().unwrap().dealloc(id.as_ref())
169 }
170
171 pub async fn handle_signals(&self, signals: Signals) {
172 let mut signals = signals.fuse();
173 while let Some(_signal) = signals.next().await {
174 self.dealloc_all_ports();
175 }
176 }
177
178 pub async fn wait_process_exit(&self) -> Result<(), Error> {
179 let signals = Signals::new(&[
180 signal_hook::SIGHUP,
181 signal_hook::SIGTERM,
182 signal_hook::SIGINT,
183 signal_hook::SIGQUIT,
184 ])?;
185
186 let handle = signals.handle();
187
188 let cloned = self.clone();
189 let signals_task = tokio::spawn(async move { cloned.handle_signals(signals).await });
190 handle.close();
191 signals_task.await?;
192 Ok(())
193 }
194}