qiniu_http_client/client/chooser/
ip.rs

1use super::{
2    super::super::{
3        regions::{DomainWithPort, IpAddrWithPort},
4        spawn::spawn,
5    },
6    ChooseOptions, Chooser, ChooserFeedback, ChosenResults,
7};
8use dashmap::DashMap;
9use log::{info, warn};
10use std::{
11    mem::take,
12    sync::{Arc, Mutex},
13    time::{Duration, Instant},
14};
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17struct BlacklistKey {
18    ip: IpAddrWithPort,
19    domain: Option<DomainWithPort>,
20}
21
22#[derive(Debug, Clone)]
23struct BlacklistValue {
24    blocked_at: Instant,
25}
26
27type Blacklist = DashMap<BlacklistKey, BlacklistValue>;
28
29#[derive(Debug, Clone)]
30struct LockedData {
31    last_shrink_at: Instant,
32}
33
34impl Default for LockedData {
35    #[inline]
36    fn default() -> Self {
37        LockedData {
38            last_shrink_at: Instant::now(),
39        }
40    }
41}
42
43const DEFAULT_BLOCK_DURATION: Duration = Duration::from_secs(30);
44const DEFAULT_SHRINK_INTERVAL: Duration = Duration::from_secs(120);
45
46/// IP 地址选择器
47///
48/// 包含 IP 地址黑名单,一旦被反馈 API 调用失败,则将所有相关 IP 地址冻结一段时间
49#[derive(Debug, Clone)]
50pub struct IpChooser {
51    inner: Arc<IpChooserInner>,
52}
53
54#[derive(Debug, Default)]
55struct IpChooserInner {
56    blacklist: Blacklist,
57    lock: Mutex<LockedData>,
58    block_duration: Duration,
59    shrink_interval: Duration,
60}
61
62impl Default for IpChooser {
63    #[inline]
64    fn default() -> Self {
65        Self::builder().build()
66    }
67}
68
69impl IpChooser {
70    /// 创建 IP 地址选择构建器
71    #[inline]
72    pub fn builder() -> IpChooserBuilder {
73        Default::default()
74    }
75}
76
77impl Chooser for IpChooser {
78    fn choose(&self, ips: &[IpAddrWithPort], opts: ChooseOptions) -> ChosenResults {
79        let mut need_to_shrink = false;
80        let filtered_ips: Vec<_> = ips
81            .iter()
82            .copied()
83            .filter(|&ip| {
84                self.inner
85                    .blacklist
86                    .get(&BlacklistKey {
87                        ip,
88                        domain: opts.domain().cloned(),
89                    })
90                    .map_or(true, |r| {
91                        if r.value().blocked_at.elapsed() < self.inner.block_duration {
92                            false
93                        } else {
94                            need_to_shrink = true;
95                            true
96                        }
97                    })
98            })
99            .collect();
100        do_some_work_async(&self.inner, need_to_shrink);
101        filtered_ips.into()
102    }
103
104    fn feedback(&self, feedback: ChooserFeedback) {
105        if feedback.error().is_some() {
106            for &ip in feedback.ips().iter() {
107                self.inner.blacklist.insert(
108                    BlacklistKey {
109                        ip,
110                        domain: feedback.domain().cloned(),
111                    },
112                    BlacklistValue {
113                        blocked_at: Instant::now(),
114                    },
115                );
116            }
117        } else {
118            for &ip in feedback.ips().iter() {
119                self.inner.blacklist.remove(&BlacklistKey {
120                    ip,
121                    domain: feedback.domain().cloned(),
122                });
123            }
124        }
125    }
126}
127
128impl IpChooser {
129    #[allow(dead_code)]
130    fn len(&self) -> usize {
131        self.inner.blacklist.len()
132    }
133}
134
135fn do_some_work_async(inner: &Arc<IpChooserInner>, need_to_shrink: bool) {
136    if need_to_shrink && is_time_to_shrink(inner) {
137        let cloned = inner.to_owned();
138        if let Err(err) = spawn("qiniu.rust-sdk.http-client.chooser.IpChooser".into(), move || {
139            if is_time_to_shrink_mut(&cloned) {
140                info!("Ip Chooser spawns thread to do some housework");
141                shrink_cache(&cloned.blacklist, cloned.block_duration);
142            }
143        }) {
144            warn!("Ip Chooser was failed to spawn thread to do some housework: {}", err);
145        }
146    }
147
148    return;
149
150    fn is_time_to_shrink(inner: &Arc<IpChooserInner>) -> bool {
151        if let Ok(locked_data) = inner.lock.try_lock() {
152            _is_time_to_shrink(inner.shrink_interval, &locked_data)
153        } else {
154            false
155        }
156    }
157
158    fn is_time_to_shrink_mut(inner: &Arc<IpChooserInner>) -> bool {
159        if let Ok(mut locked_data) = inner.lock.try_lock() {
160            if _is_time_to_shrink(inner.shrink_interval, &locked_data) {
161                locked_data.last_shrink_at = Instant::now();
162                return true;
163            }
164        }
165        false
166    }
167
168    fn _is_time_to_shrink(shrink_interval: Duration, locked_data: &LockedData) -> bool {
169        locked_data.last_shrink_at.elapsed() >= shrink_interval
170    }
171
172    fn shrink_cache(blacklist: &Blacklist, block_duration: Duration) {
173        let old_size = blacklist.len();
174        blacklist.retain(|_, value| value.blocked_at.elapsed() < block_duration);
175        let new_size = blacklist.len();
176        info!("Blacklist is shrunken, from {} to {} entries", old_size, new_size);
177    }
178}
179
180/// IP 地址选择构建器
181#[derive(Debug)]
182pub struct IpChooserBuilder {
183    inner: IpChooserInner,
184}
185
186impl Default for IpChooserBuilder {
187    #[inline]
188    fn default() -> Self {
189        Self {
190            inner: IpChooserInner {
191                blacklist: Default::default(),
192                lock: Default::default(),
193                block_duration: DEFAULT_BLOCK_DURATION,
194                shrink_interval: DEFAULT_SHRINK_INTERVAL,
195            },
196        }
197    }
198}
199
200impl IpChooserBuilder {
201    /// 设置屏蔽时长
202    #[inline]
203    pub fn block_duration(&mut self, block_duration: Duration) -> &mut Self {
204        self.inner.block_duration = block_duration;
205        self
206    }
207
208    /// 设置清理间隔时长
209    #[inline]
210    pub fn shrink_interval(&mut self, shrink_interval: Duration) -> &mut Self {
211        self.inner.shrink_interval = shrink_interval;
212        self
213    }
214
215    /// 构建 IP 地址选择器
216    #[inline]
217    pub fn build(&mut self) -> IpChooser {
218        IpChooser {
219            inner: Arc::new(take(&mut self.inner)),
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::{
227        super::super::{ChooseOptionsBuilder, ResponseError, ResponseErrorKind},
228        *,
229    };
230    use std::net::{IpAddr, Ipv4Addr};
231
232    const IPS_WITHOUT_PORT: &[IpAddrWithPort] = &[
233        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
234        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
235        IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None),
236    ];
237
238    #[test]
239    fn test_ip_chooser() {
240        env_logger::builder().is_test(true).try_init().ok();
241
242        let ip_chooser = IpChooser::default();
243        let domain = DomainWithPort::new("fakedomain", None);
244        assert_eq!(
245            ip_chooser
246                .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
247                .into_ip_addrs(),
248            IPS_WITHOUT_PORT.to_vec()
249        );
250        ip_chooser.feedback(
251            ChooserFeedback::builder(&[
252                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
253                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
254            ])
255            .domain(&domain)
256            .error(&ResponseError::new_with_msg(
257                ResponseErrorKind::ParseResponseError,
258                "Test Error",
259            ))
260            .build(),
261        );
262        assert_eq!(
263            ip_chooser
264                .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
265                .into_ip_addrs(),
266            vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)],
267        );
268
269        ip_chooser.feedback(
270            ChooserFeedback::builder(IPS_WITHOUT_PORT)
271                .domain(&domain)
272                .error(&ResponseError::new_with_msg(
273                    ResponseErrorKind::ParseResponseError,
274                    "Test Error",
275                ))
276                .build(),
277        );
278        assert_eq!(
279            ip_chooser
280                .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
281                .into_ip_addrs(),
282            vec![]
283        );
284
285        ip_chooser.feedback(
286            ChooserFeedback::builder(&[
287                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
288                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
289            ])
290            .domain(&domain)
291            .build(),
292        );
293        assert_eq!(
294            ip_chooser
295                .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
296                .into_ip_addrs(),
297            vec![
298                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
299                IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
300            ]
301        );
302    }
303
304    #[cfg(feature = "async")]
305    #[tokio::test]
306    async fn test_ip_chooser_expiration_and_shrink() {
307        use futures_timer::Delay as AsyncDelay;
308
309        env_logger::builder().is_test(true).try_init().ok();
310
311        let ip_chooser = IpChooser::builder()
312            .block_duration(Duration::from_secs(1))
313            .shrink_interval(Duration::from_millis(500))
314            .build();
315
316        assert_eq!(
317            ip_chooser
318                .async_choose(IPS_WITHOUT_PORT, Default::default())
319                .await
320                .into_ip_addrs(),
321            IPS_WITHOUT_PORT.to_vec()
322        );
323        ip_chooser
324            .async_feedback(
325                ChooserFeedback::builder(&[
326                    IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
327                    IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
328                ])
329                .error(&ResponseError::new_with_msg(
330                    ResponseErrorKind::ParseResponseError,
331                    "Test Error",
332                ))
333                .build(),
334            )
335            .await;
336        assert_eq!(
337            ip_chooser
338                .async_choose(IPS_WITHOUT_PORT, Default::default())
339                .await
340                .into_ip_addrs(),
341            vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)],
342        );
343
344        AsyncDelay::new(Duration::from_secs(1)).await;
345        assert_eq!(
346            ip_chooser
347                .async_choose(IPS_WITHOUT_PORT, Default::default())
348                .await
349                .into_ip_addrs(),
350            IPS_WITHOUT_PORT.to_vec()
351        );
352
353        AsyncDelay::new(Duration::from_millis(500)).await;
354        assert_eq!(ip_chooser.len(), 0);
355    }
356}