1use super::{
2 super::super::{
3 regions::{DomainWithPort, IpAddrWithPort},
4 spawn::spawn,
5 },
6 ChooseOptions, Chooser, ChooserFeedback, ChosenResults,
7};
8use dashmap::DashMap;
9use ipnet::{Ipv4Net, Ipv6Net, PrefixLenError};
10use log::{info, warn};
11use std::{
12 collections::HashMap,
13 mem::take,
14 net::{IpAddr, Ipv4Addr, Ipv6Addr},
15 sync::{Arc, Mutex},
16 time::{Duration, Instant},
17};
18use typenum::{IsLess, Le, NonZero, Unsigned, U128, U32};
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21struct BlacklistKey {
22 ip: IpAddrWithPort,
23 domain: Option<DomainWithPort>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27struct Subnet(IpAddrWithPort);
28
29#[derive(Debug, Clone)]
30struct BlacklistValue {
31 blocked_at: Instant,
32}
33
34type Blacklist = DashMap<BlacklistKey, BlacklistValue>;
35
36#[derive(Debug, Clone)]
37struct LockedData {
38 last_shrink_at: Instant,
39}
40
41impl Default for LockedData {
42 #[inline]
43 fn default() -> Self {
44 LockedData {
45 last_shrink_at: Instant::now(),
46 }
47 }
48}
49
50const DEFAULT_BLOCK_DURATION: Duration = Duration::from_secs(30);
51const DEFAULT_SHRINK_INTERVAL: Duration = Duration::from_secs(120);
52const DEFAULT_IPV4_NETMASK_PREFIX_LENGTH: u8 = 24;
53const DEFAULT_IPV6_NETMASK_PREFIX_LENGTH: u8 = 64;
54
55#[derive(Debug, Clone)]
59pub struct SubnetChooser {
60 inner: Arc<SubnetChooserInner>,
61}
62
63#[derive(Debug, Default)]
64struct SubnetChooserInner {
65 blacklist: Blacklist,
66 lock: Mutex<LockedData>,
67 block_duration: Duration,
68 shrink_interval: Duration,
69 ipv4_netmask_prefix_length: u8,
70 ipv6_netmask_prefix_length: u8,
71}
72
73impl Default for SubnetChooser {
74 #[inline]
75 fn default() -> Self {
76 Self::builder().build()
77 }
78}
79
80impl SubnetChooser {
81 #[inline]
83 pub fn builder() -> SubnetChooserBuilder {
84 Default::default()
85 }
86}
87
88impl Chooser for SubnetChooser {
89 fn choose(&self, ips: &[IpAddrWithPort], opts: ChooseOptions) -> ChosenResults {
90 let mut need_to_shrink = false;
91 let mut subnets_map: HashMap<Subnet, Vec<IpAddrWithPort>> = Default::default();
92 for &ip in ips.iter() {
93 let is_blocked = self
94 .inner
95 .blacklist
96 .get(&BlacklistKey {
97 ip,
98 domain: opts.domain().cloned(),
99 })
100 .map_or(false, |r| {
101 if r.value().blocked_at.elapsed() < self.inner.block_duration {
102 true
103 } else {
104 need_to_shrink = true;
105 false
106 }
107 });
108 if !is_blocked {
109 let subnet = self.get_network_address(ip);
110 if let Some(ips) = subnets_map.get_mut(&subnet) {
111 ips.push(ip);
112 } else {
113 subnets_map.insert(subnet, vec![ip]);
114 }
115 }
116 }
117 let chosen_ips = choose_group(subnets_map.into_values()).unwrap_or_default();
118 do_some_work_async(&self.inner, need_to_shrink);
119 return chosen_ips.into();
120
121 #[cfg(not(test))]
123 fn choose_group(iter: impl Iterator<Item = Vec<IpAddrWithPort>>) -> Option<Vec<IpAddrWithPort>> {
124 use rand::prelude::IteratorRandom;
125
126 iter.choose(&mut rand::thread_rng())
127 }
128
129 #[cfg(test)]
131 fn choose_group(iter: impl Iterator<Item = Vec<IpAddrWithPort>>) -> Option<Vec<IpAddrWithPort>> {
132 iter.max_by_key(|ips| ips.len())
133 }
134 }
135
136 fn feedback(&self, feedback: ChooserFeedback) {
137 if feedback.error().is_some() {
138 for &ip in feedback.ips().iter() {
139 self.inner.blacklist.insert(
140 BlacklistKey {
141 ip,
142 domain: feedback.domain().cloned(),
143 },
144 BlacklistValue {
145 blocked_at: Instant::now(),
146 },
147 );
148 }
149 } else {
150 for &ip in feedback.ips().iter() {
151 self.inner.blacklist.remove(&BlacklistKey {
152 ip,
153 domain: feedback.domain().cloned(),
154 });
155 }
156 }
157 }
158}
159
160impl SubnetChooser {
161 fn get_network_address(&self, addr: IpAddrWithPort) -> Subnet {
162 let subnet = match addr.ip_addr() {
163 IpAddr::V4(ipv4_addr) => {
164 let ipv4_network_addr =
165 get_network_address_of_ipv4_addr(ipv4_addr, self.inner.ipv4_netmask_prefix_length);
166 IpAddr::V4(ipv4_network_addr)
167 }
168 IpAddr::V6(ipv6_addr) => {
169 let ipv6_network_addr =
170 get_network_address_of_ipv6_addr(ipv6_addr, self.inner.ipv6_netmask_prefix_length);
171 IpAddr::V6(ipv6_network_addr)
172 }
173 };
174 return Subnet(IpAddrWithPort::new(subnet, addr.port()));
175
176 fn get_network_address_of_ipv4_addr(addr: Ipv4Addr, prefix: u8) -> Ipv4Addr {
177 Ipv4Net::new(addr, prefix).unwrap().network()
178 }
179
180 fn get_network_address_of_ipv6_addr(addr: Ipv6Addr, prefix: u8) -> Ipv6Addr {
181 Ipv6Net::new(addr, prefix).unwrap().network()
182 }
183 }
184
185 #[allow(dead_code)]
186 fn len(&self) -> usize {
187 self.inner.blacklist.len()
188 }
189}
190
191fn do_some_work_async(inner: &Arc<SubnetChooserInner>, need_to_shrink: bool) {
192 if need_to_shrink && is_time_to_shrink(inner) {
193 let cloned = inner.to_owned();
194 if let Err(err) = spawn("qiniu.rust-sdk.http-client.chooser.SubnetChooser".into(), move || {
195 if is_time_to_shrink_mut(&cloned) {
196 info!("Subnet Chooser spawns thread to do some housework");
197 shrink_cache(&cloned.blacklist, cloned.block_duration);
198 }
199 }) {
200 warn!(
201 "Subnet Chooser was failed to spawn thread to do some housework: {}",
202 err
203 );
204 }
205 }
206
207 return;
208
209 fn is_time_to_shrink(inner: &Arc<SubnetChooserInner>) -> bool {
210 if let Ok(locked_data) = inner.lock.try_lock() {
211 _is_time_to_shrink(inner.shrink_interval, &locked_data)
212 } else {
213 false
214 }
215 }
216
217 fn is_time_to_shrink_mut(inner: &Arc<SubnetChooserInner>) -> bool {
218 if let Ok(mut locked_data) = inner.lock.try_lock() {
219 if _is_time_to_shrink(inner.shrink_interval, &locked_data) {
220 locked_data.last_shrink_at = Instant::now();
221 return true;
222 }
223 }
224 false
225 }
226
227 fn _is_time_to_shrink(shrink_interval: Duration, locked_data: &LockedData) -> bool {
228 locked_data.last_shrink_at.elapsed() >= shrink_interval
229 }
230
231 fn shrink_cache(blacklist: &Blacklist, block_duration: Duration) {
232 let old_size = blacklist.len();
233 blacklist.retain(|_, value| value.blocked_at.elapsed() < block_duration);
234 let new_size = blacklist.len();
235 info!("Blacklist is shrunken, from {} to {} entries", old_size, new_size);
236 }
237}
238
239#[derive(Debug)]
241pub struct SubnetChooserBuilder {
242 inner: SubnetChooserInner,
243}
244
245impl Default for SubnetChooserBuilder {
246 #[inline]
247 fn default() -> Self {
248 Self {
249 inner: SubnetChooserInner {
250 blacklist: Default::default(),
251 lock: Default::default(),
252 block_duration: DEFAULT_BLOCK_DURATION,
253 shrink_interval: DEFAULT_SHRINK_INTERVAL,
254 ipv4_netmask_prefix_length: DEFAULT_IPV4_NETMASK_PREFIX_LENGTH,
255 ipv6_netmask_prefix_length: DEFAULT_IPV6_NETMASK_PREFIX_LENGTH,
256 },
257 }
258 }
259}
260
261impl SubnetChooserBuilder {
262 #[inline]
264 pub fn block_duration(&mut self, block_duration: Duration) -> &mut Self {
265 self.inner.block_duration = block_duration;
266 self
267 }
268
269 #[inline]
271 pub fn shrink_interval(&mut self, shrink_interval: Duration) -> &mut Self {
272 self.inner.shrink_interval = shrink_interval;
273 self
274 }
275
276 #[inline]
278 pub fn safe_ipv4_netmask_prefix_length<N>(&mut self) -> &mut Self
279 where
280 N: Unsigned + IsLess<U32>,
281 Le<N, U32>: NonZero,
282 {
283 self.inner.ipv4_netmask_prefix_length = N::to_u8();
284 self
285 }
286
287 #[inline]
289 pub fn safe_ipv6_netmask_prefix_length<N>(&mut self) -> &mut Self
290 where
291 N: Unsigned + IsLess<U128>,
292 Le<N, U128>: NonZero,
293 {
294 self.inner.ipv6_netmask_prefix_length = N::to_u8();
295 self
296 }
297
298 #[inline]
300 pub fn ipv4_netmask_prefix_length(&mut self, ipv4_netmask_prefix_length: u8) -> Result<&mut Self, PrefixLenError> {
301 if ipv4_netmask_prefix_length > 32 {
302 return Err(PrefixLenError);
303 }
304 self.inner.ipv4_netmask_prefix_length = ipv4_netmask_prefix_length;
305 Ok(self)
306 }
307
308 #[inline]
310 pub fn ipv6_netmask_prefix_length(&mut self, ipv6_netmask_prefix_length: u8) -> Result<&mut Self, PrefixLenError> {
311 if ipv6_netmask_prefix_length > 128 {
312 return Err(PrefixLenError);
313 }
314 self.inner.ipv6_netmask_prefix_length = ipv6_netmask_prefix_length;
315 Ok(self)
316 }
317
318 #[inline]
320 pub fn build(&mut self) -> SubnetChooser {
321 SubnetChooser {
322 inner: Arc::new(take(&mut self.inner)),
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::{
330 super::super::{ChooseOptionsBuilder, ResponseError, ResponseErrorKind},
331 *,
332 };
333 use std::{
334 net::{IpAddr, Ipv4Addr},
335 thread::sleep,
336 };
337
338 const SUBNET_1: &[IpAddrWithPort] = &[
339 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
340 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
341 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None),
342 ];
343 const SUBNET_2: &[IpAddrWithPort] = &[
344 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None),
345 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 2)), None),
346 ];
347
348 #[test]
349 fn test_subnet_chooser() {
350 env_logger::builder().is_test(true).try_init().ok();
351 let all_ips = [SUBNET_1, SUBNET_2].concat();
352 let domain = DomainWithPort::new("fakedomain", None);
353
354 let subnet_chooser = SubnetChooser::default();
355 assert_eq!(
356 subnet_chooser
357 .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
358 .into_ip_addrs(),
359 SUBNET_1.to_vec()
360 );
361 subnet_chooser.feedback(
362 ChooserFeedback::builder(&[
363 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
364 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
365 ])
366 .domain(&domain)
367 .error(&ResponseError::new_with_msg(
368 ResponseErrorKind::ParseResponseError,
369 "Test Error",
370 ))
371 .build(),
372 );
373 assert_eq!(
374 subnet_chooser
375 .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
376 .into_ip_addrs(),
377 vec![
378 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None),
379 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 2)), None),
380 ]
381 );
382 subnet_chooser.feedback(
383 ChooserFeedback::builder(&[
384 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None),
385 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 2)), None),
386 ])
387 .domain(&domain)
388 .error(&ResponseError::new_with_msg(
389 ResponseErrorKind::ParseResponseError,
390 "Test Error",
391 ))
392 .build(),
393 );
394 assert_eq!(
395 subnet_chooser
396 .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
397 .into_ip_addrs(),
398 vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)]
399 );
400
401 subnet_chooser.feedback(
402 ChooserFeedback::builder(&[IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)])
403 .domain(&domain)
404 .error(&ResponseError::new_with_msg(
405 ResponseErrorKind::ParseResponseError,
406 "Test Error",
407 ))
408 .build(),
409 );
410 subnet_chooser.feedback(
411 ChooserFeedback::builder(&[IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None)])
412 .domain(&domain)
413 .build(),
414 );
415 assert_eq!(
416 subnet_chooser
417 .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
418 .into_ip_addrs(),
419 vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None)],
420 );
421 }
422
423 #[test]
424 fn test_subnet_chooser_expiration_and_shrink() {
425 env_logger::builder().is_test(true).try_init().ok();
426 let all_ips = [SUBNET_1, SUBNET_2].concat();
427
428 let subnet_chooser = SubnetChooser::builder()
429 .block_duration(Duration::from_secs(1))
430 .shrink_interval(Duration::from_millis(500))
431 .build();
432
433 assert_eq!(
434 subnet_chooser.choose(&all_ips, Default::default()).into_ip_addrs(),
435 SUBNET_1.to_vec()
436 );
437 subnet_chooser.feedback(
438 ChooserFeedback::builder(&[
439 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
440 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
441 ])
442 .error(&ResponseError::new_with_msg(
443 ResponseErrorKind::ParseResponseError,
444 "Test Error",
445 ))
446 .build(),
447 );
448 assert_eq!(
449 subnet_chooser.choose(&all_ips, Default::default()).into_ip_addrs(),
450 SUBNET_2.to_vec(),
451 );
452
453 sleep(Duration::from_secs(1));
454 assert_eq!(
455 subnet_chooser.choose(&all_ips, Default::default()).into_ip_addrs(),
456 SUBNET_1.to_vec()
457 );
458
459 sleep(Duration::from_millis(500));
460 assert_eq!(subnet_chooser.len(), 0);
461 }
462}