qiniu_http_client/client/chooser/
subnet.rs

1use super::{
2    super::super::{
3        regions::{DomainWithPort, IpAddrWithPort},
4        spawn::spawn,
5    },
6    ChooseOptions, Chooser, ChooserFeedback, ChosenResults,
7};
8use dashmap::DashMap;
9use ipnet::{Ipv4Net, Ipv6Net, PrefixLenError};
10use log::{info, warn};
11use std::{
12    collections::HashMap,
13    mem::take,
14    net::{IpAddr, Ipv4Addr, Ipv6Addr},
15    sync::{Arc, Mutex},
16    time::{Duration, Instant},
17};
18use typenum::{IsLess, Le, NonZero, Unsigned, U128, U32};
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21struct BlacklistKey {
22    ip: IpAddrWithPort,
23    domain: Option<DomainWithPort>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27struct Subnet(IpAddrWithPort);
28
29#[derive(Debug, Clone)]
30struct BlacklistValue {
31    blocked_at: Instant,
32}
33
34type Blacklist = DashMap<BlacklistKey, BlacklistValue>;
35
36#[derive(Debug, Clone)]
37struct LockedData {
38    last_shrink_at: Instant,
39}
40
41impl Default for LockedData {
42    #[inline]
43    fn default() -> Self {
44        LockedData {
45            last_shrink_at: Instant::now(),
46        }
47    }
48}
49
50const DEFAULT_BLOCK_DURATION: Duration = Duration::from_secs(30);
51const DEFAULT_SHRINK_INTERVAL: Duration = Duration::from_secs(120);
52const DEFAULT_IPV4_NETMASK_PREFIX_LENGTH: u8 = 24;
53const DEFAULT_IPV6_NETMASK_PREFIX_LENGTH: u8 = 64;
54
55/// 子网选择器
56///
57/// 包含子网黑名单,一旦被反馈 API 调用失败,则将所有相关子网内 IP 地址冻结一段时间
58#[derive(Debug, Clone)]
59pub struct SubnetChooser {
60    inner: Arc<SubnetChooserInner>,
61}
62
63#[derive(Debug, Default)]
64struct SubnetChooserInner {
65    blacklist: Blacklist,
66    lock: Mutex<LockedData>,
67    block_duration: Duration,
68    shrink_interval: Duration,
69    ipv4_netmask_prefix_length: u8,
70    ipv6_netmask_prefix_length: u8,
71}
72
73impl Default for SubnetChooser {
74    #[inline]
75    fn default() -> Self {
76        Self::builder().build()
77    }
78}
79
80impl SubnetChooser {
81    /// 创建子网选择构建器
82    #[inline]
83    pub fn builder() -> SubnetChooserBuilder {
84        Default::default()
85    }
86}
87
88impl Chooser for SubnetChooser {
89    fn choose(&self, ips: &[IpAddrWithPort], opts: ChooseOptions) -> ChosenResults {
90        let mut need_to_shrink = false;
91        let mut subnets_map: HashMap<Subnet, Vec<IpAddrWithPort>> = Default::default();
92        for &ip in ips.iter() {
93            let is_blocked = self
94                .inner
95                .blacklist
96                .get(&BlacklistKey {
97                    ip,
98                    domain: opts.domain().cloned(),
99                })
100                .map_or(false, |r| {
101                    if r.value().blocked_at.elapsed() < self.inner.block_duration {
102                        true
103                    } else {
104                        need_to_shrink = true;
105                        false
106                    }
107                });
108            if !is_blocked {
109                let subnet = self.get_network_address(ip);
110                if let Some(ips) = subnets_map.get_mut(&subnet) {
111                    ips.push(ip);
112                } else {
113                    subnets_map.insert(subnet, vec![ip]);
114                }
115            }
116        }
117        let chosen_ips = choose_group(subnets_map.into_values()).unwrap_or_default();
118        do_some_work_async(&self.inner, need_to_shrink);
119        return chosen_ips.into();
120
121        /// For production, choose any subnet by random
122        #[cfg(not(test))]
123        fn choose_group(iter: impl Iterator<Item = Vec<IpAddrWithPort>>) -> Option<Vec<IpAddrWithPort>> {
124            use rand::prelude::IteratorRandom;
125
126            iter.choose(&mut rand::thread_rng())
127        }
128
129        /// For Test cases, always choose the biggest subnet
130        #[cfg(test)]
131        fn choose_group(iter: impl Iterator<Item = Vec<IpAddrWithPort>>) -> Option<Vec<IpAddrWithPort>> {
132            iter.max_by_key(|ips| ips.len())
133        }
134    }
135
136    fn feedback(&self, feedback: ChooserFeedback) {
137        if feedback.error().is_some() {
138            for &ip in feedback.ips().iter() {
139                self.inner.blacklist.insert(
140                    BlacklistKey {
141                        ip,
142                        domain: feedback.domain().cloned(),
143                    },
144                    BlacklistValue {
145                        blocked_at: Instant::now(),
146                    },
147                );
148            }
149        } else {
150            for &ip in feedback.ips().iter() {
151                self.inner.blacklist.remove(&BlacklistKey {
152                    ip,
153                    domain: feedback.domain().cloned(),
154                });
155            }
156        }
157    }
158}
159
160impl SubnetChooser {
161    fn get_network_address(&self, addr: IpAddrWithPort) -> Subnet {
162        let subnet = match addr.ip_addr() {
163            IpAddr::V4(ipv4_addr) => {
164                let ipv4_network_addr =
165                    get_network_address_of_ipv4_addr(ipv4_addr, self.inner.ipv4_netmask_prefix_length);
166                IpAddr::V4(ipv4_network_addr)
167            }
168            IpAddr::V6(ipv6_addr) => {
169                let ipv6_network_addr =
170                    get_network_address_of_ipv6_addr(ipv6_addr, self.inner.ipv6_netmask_prefix_length);
171                IpAddr::V6(ipv6_network_addr)
172            }
173        };
174        return Subnet(IpAddrWithPort::new(subnet, addr.port()));
175
176        fn get_network_address_of_ipv4_addr(addr: Ipv4Addr, prefix: u8) -> Ipv4Addr {
177            Ipv4Net::new(addr, prefix).unwrap().network()
178        }
179
180        fn get_network_address_of_ipv6_addr(addr: Ipv6Addr, prefix: u8) -> Ipv6Addr {
181            Ipv6Net::new(addr, prefix).unwrap().network()
182        }
183    }
184
185    #[allow(dead_code)]
186    fn len(&self) -> usize {
187        self.inner.blacklist.len()
188    }
189}
190
191fn do_some_work_async(inner: &Arc<SubnetChooserInner>, need_to_shrink: bool) {
192    if need_to_shrink && is_time_to_shrink(inner) {
193        let cloned = inner.to_owned();
194        if let Err(err) = spawn("qiniu.rust-sdk.http-client.chooser.SubnetChooser".into(), move || {
195            if is_time_to_shrink_mut(&cloned) {
196                info!("Subnet Chooser spawns thread to do some housework");
197                shrink_cache(&cloned.blacklist, cloned.block_duration);
198            }
199        }) {
200            warn!(
201                "Subnet Chooser was failed to spawn thread to do some housework: {}",
202                err
203            );
204        }
205    }
206
207    return;
208
209    fn is_time_to_shrink(inner: &Arc<SubnetChooserInner>) -> bool {
210        if let Ok(locked_data) = inner.lock.try_lock() {
211            _is_time_to_shrink(inner.shrink_interval, &locked_data)
212        } else {
213            false
214        }
215    }
216
217    fn is_time_to_shrink_mut(inner: &Arc<SubnetChooserInner>) -> bool {
218        if let Ok(mut locked_data) = inner.lock.try_lock() {
219            if _is_time_to_shrink(inner.shrink_interval, &locked_data) {
220                locked_data.last_shrink_at = Instant::now();
221                return true;
222            }
223        }
224        false
225    }
226
227    fn _is_time_to_shrink(shrink_interval: Duration, locked_data: &LockedData) -> bool {
228        locked_data.last_shrink_at.elapsed() >= shrink_interval
229    }
230
231    fn shrink_cache(blacklist: &Blacklist, block_duration: Duration) {
232        let old_size = blacklist.len();
233        blacklist.retain(|_, value| value.blocked_at.elapsed() < block_duration);
234        let new_size = blacklist.len();
235        info!("Blacklist is shrunken, from {} to {} entries", old_size, new_size);
236    }
237}
238
239/// 子网选择构建器
240#[derive(Debug)]
241pub struct SubnetChooserBuilder {
242    inner: SubnetChooserInner,
243}
244
245impl Default for SubnetChooserBuilder {
246    #[inline]
247    fn default() -> Self {
248        Self {
249            inner: SubnetChooserInner {
250                blacklist: Default::default(),
251                lock: Default::default(),
252                block_duration: DEFAULT_BLOCK_DURATION,
253                shrink_interval: DEFAULT_SHRINK_INTERVAL,
254                ipv4_netmask_prefix_length: DEFAULT_IPV4_NETMASK_PREFIX_LENGTH,
255                ipv6_netmask_prefix_length: DEFAULT_IPV6_NETMASK_PREFIX_LENGTH,
256            },
257        }
258    }
259}
260
261impl SubnetChooserBuilder {
262    /// 设置屏蔽时长
263    #[inline]
264    pub fn block_duration(&mut self, block_duration: Duration) -> &mut Self {
265        self.inner.block_duration = block_duration;
266        self
267    }
268
269    /// 设置清理间隔时长
270    #[inline]
271    pub fn shrink_interval(&mut self, shrink_interval: Duration) -> &mut Self {
272        self.inner.shrink_interval = shrink_interval;
273        self
274    }
275
276    /// 用安全的方式设置 IPv4 地址子网掩码前缀长度
277    #[inline]
278    pub fn safe_ipv4_netmask_prefix_length<N>(&mut self) -> &mut Self
279    where
280        N: Unsigned + IsLess<U32>,
281        Le<N, U32>: NonZero,
282    {
283        self.inner.ipv4_netmask_prefix_length = N::to_u8();
284        self
285    }
286
287    /// 用安全的方式设置 IPv6 地址子网掩码前缀长度
288    #[inline]
289    pub fn safe_ipv6_netmask_prefix_length<N>(&mut self) -> &mut Self
290    where
291        N: Unsigned + IsLess<U128>,
292        Le<N, U128>: NonZero,
293    {
294        self.inner.ipv6_netmask_prefix_length = N::to_u8();
295        self
296    }
297
298    /// 设置 IPv4 地址子网掩码前缀长度
299    #[inline]
300    pub fn ipv4_netmask_prefix_length(&mut self, ipv4_netmask_prefix_length: u8) -> Result<&mut Self, PrefixLenError> {
301        if ipv4_netmask_prefix_length > 32 {
302            return Err(PrefixLenError);
303        }
304        self.inner.ipv4_netmask_prefix_length = ipv4_netmask_prefix_length;
305        Ok(self)
306    }
307
308    /// 设置 IPv6 地址子网掩码前缀长度
309    #[inline]
310    pub fn ipv6_netmask_prefix_length(&mut self, ipv6_netmask_prefix_length: u8) -> Result<&mut Self, PrefixLenError> {
311        if ipv6_netmask_prefix_length > 128 {
312            return Err(PrefixLenError);
313        }
314        self.inner.ipv6_netmask_prefix_length = ipv6_netmask_prefix_length;
315        Ok(self)
316    }
317
318    /// 构建子网选择器
319    #[inline]
320    pub fn build(&mut self) -> SubnetChooser {
321        SubnetChooser {
322            inner: Arc::new(take(&mut self.inner)),
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::{
330        super::super::{ChooseOptionsBuilder, ResponseError, ResponseErrorKind},
331        *,
332    };
333    use std::{
334        net::{IpAddr, Ipv4Addr},
335        thread::sleep,
336    };
337
338    const SUBNET_1: &[IpAddrWithPort] = &[
339        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
340        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
341        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None),
342    ];
343    const SUBNET_2: &[IpAddrWithPort] = &[
344        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None),
345        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 2)), None),
346    ];
347
348    #[test]
349    fn test_subnet_chooser() {
350        env_logger::builder().is_test(true).try_init().ok();
351        let all_ips = [SUBNET_1, SUBNET_2].concat();
352        let domain = DomainWithPort::new("fakedomain", None);
353
354        let subnet_chooser = SubnetChooser::default();
355        assert_eq!(
356            subnet_chooser
357                .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
358                .into_ip_addrs(),
359            SUBNET_1.to_vec()
360        );
361        subnet_chooser.feedback(
362            ChooserFeedback::builder(&[
363                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
364                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
365            ])
366            .domain(&domain)
367            .error(&ResponseError::new_with_msg(
368                ResponseErrorKind::ParseResponseError,
369                "Test Error",
370            ))
371            .build(),
372        );
373        assert_eq!(
374            subnet_chooser
375                .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
376                .into_ip_addrs(),
377            vec![
378                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None),
379                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 2)), None),
380            ]
381        );
382        subnet_chooser.feedback(
383            ChooserFeedback::builder(&[
384                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None),
385                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 2)), None),
386            ])
387            .domain(&domain)
388            .error(&ResponseError::new_with_msg(
389                ResponseErrorKind::ParseResponseError,
390                "Test Error",
391            ))
392            .build(),
393        );
394        assert_eq!(
395            subnet_chooser
396                .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
397                .into_ip_addrs(),
398            vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)]
399        );
400
401        subnet_chooser.feedback(
402            ChooserFeedback::builder(&[IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)])
403                .domain(&domain)
404                .error(&ResponseError::new_with_msg(
405                    ResponseErrorKind::ParseResponseError,
406                    "Test Error",
407                ))
408                .build(),
409        );
410        subnet_chooser.feedback(
411            ChooserFeedback::builder(&[IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None)])
412                .domain(&domain)
413                .build(),
414        );
415        assert_eq!(
416            subnet_chooser
417                .choose(&all_ips, ChooseOptionsBuilder::new().domain(&domain).build())
418                .into_ip_addrs(),
419            vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 2, 1)), None)],
420        );
421    }
422
423    #[test]
424    fn test_subnet_chooser_expiration_and_shrink() {
425        env_logger::builder().is_test(true).try_init().ok();
426        let all_ips = [SUBNET_1, SUBNET_2].concat();
427
428        let subnet_chooser = SubnetChooser::builder()
429            .block_duration(Duration::from_secs(1))
430            .shrink_interval(Duration::from_millis(500))
431            .build();
432
433        assert_eq!(
434            subnet_chooser.choose(&all_ips, Default::default()).into_ip_addrs(),
435            SUBNET_1.to_vec()
436        );
437        subnet_chooser.feedback(
438            ChooserFeedback::builder(&[
439                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
440                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
441            ])
442            .error(&ResponseError::new_with_msg(
443                ResponseErrorKind::ParseResponseError,
444                "Test Error",
445            ))
446            .build(),
447        );
448        assert_eq!(
449            subnet_chooser.choose(&all_ips, Default::default()).into_ip_addrs(),
450            SUBNET_2.to_vec(),
451        );
452
453        sleep(Duration::from_secs(1));
454        assert_eq!(
455            subnet_chooser.choose(&all_ips, Default::default()).into_ip_addrs(),
456            SUBNET_1.to_vec()
457        );
458
459        sleep(Duration::from_millis(500));
460        assert_eq!(subnet_chooser.len(), 0);
461    }
462}