qiniu_http_client/client/chooser/
ip.rs1use super::{
2 super::super::{
3 regions::{DomainWithPort, IpAddrWithPort},
4 spawn::spawn,
5 },
6 ChooseOptions, Chooser, ChooserFeedback, ChosenResults,
7};
8use dashmap::DashMap;
9use log::{info, warn};
10use std::{
11 mem::take,
12 sync::{Arc, Mutex},
13 time::{Duration, Instant},
14};
15
16#[derive(Debug, Clone, PartialEq, Eq, Hash)]
17struct BlacklistKey {
18 ip: IpAddrWithPort,
19 domain: Option<DomainWithPort>,
20}
21
22#[derive(Debug, Clone)]
23struct BlacklistValue {
24 blocked_at: Instant,
25}
26
27type Blacklist = DashMap<BlacklistKey, BlacklistValue>;
28
29#[derive(Debug, Clone)]
30struct LockedData {
31 last_shrink_at: Instant,
32}
33
34impl Default for LockedData {
35 #[inline]
36 fn default() -> Self {
37 LockedData {
38 last_shrink_at: Instant::now(),
39 }
40 }
41}
42
43const DEFAULT_BLOCK_DURATION: Duration = Duration::from_secs(30);
44const DEFAULT_SHRINK_INTERVAL: Duration = Duration::from_secs(120);
45
46#[derive(Debug, Clone)]
50pub struct IpChooser {
51 inner: Arc<IpChooserInner>,
52}
53
54#[derive(Debug, Default)]
55struct IpChooserInner {
56 blacklist: Blacklist,
57 lock: Mutex<LockedData>,
58 block_duration: Duration,
59 shrink_interval: Duration,
60}
61
62impl Default for IpChooser {
63 #[inline]
64 fn default() -> Self {
65 Self::builder().build()
66 }
67}
68
69impl IpChooser {
70 #[inline]
72 pub fn builder() -> IpChooserBuilder {
73 Default::default()
74 }
75}
76
77impl Chooser for IpChooser {
78 fn choose(&self, ips: &[IpAddrWithPort], opts: ChooseOptions) -> ChosenResults {
79 let mut need_to_shrink = false;
80 let filtered_ips: Vec<_> = ips
81 .iter()
82 .copied()
83 .filter(|&ip| {
84 self.inner
85 .blacklist
86 .get(&BlacklistKey {
87 ip,
88 domain: opts.domain().cloned(),
89 })
90 .map_or(true, |r| {
91 if r.value().blocked_at.elapsed() < self.inner.block_duration {
92 false
93 } else {
94 need_to_shrink = true;
95 true
96 }
97 })
98 })
99 .collect();
100 do_some_work_async(&self.inner, need_to_shrink);
101 filtered_ips.into()
102 }
103
104 fn feedback(&self, feedback: ChooserFeedback) {
105 if feedback.error().is_some() {
106 for &ip in feedback.ips().iter() {
107 self.inner.blacklist.insert(
108 BlacklistKey {
109 ip,
110 domain: feedback.domain().cloned(),
111 },
112 BlacklistValue {
113 blocked_at: Instant::now(),
114 },
115 );
116 }
117 } else {
118 for &ip in feedback.ips().iter() {
119 self.inner.blacklist.remove(&BlacklistKey {
120 ip,
121 domain: feedback.domain().cloned(),
122 });
123 }
124 }
125 }
126}
127
128impl IpChooser {
129 #[allow(dead_code)]
130 fn len(&self) -> usize {
131 self.inner.blacklist.len()
132 }
133}
134
135fn do_some_work_async(inner: &Arc<IpChooserInner>, need_to_shrink: bool) {
136 if need_to_shrink && is_time_to_shrink(inner) {
137 let cloned = inner.to_owned();
138 if let Err(err) = spawn("qiniu.rust-sdk.http-client.chooser.IpChooser".into(), move || {
139 if is_time_to_shrink_mut(&cloned) {
140 info!("Ip Chooser spawns thread to do some housework");
141 shrink_cache(&cloned.blacklist, cloned.block_duration);
142 }
143 }) {
144 warn!("Ip Chooser was failed to spawn thread to do some housework: {}", err);
145 }
146 }
147
148 return;
149
150 fn is_time_to_shrink(inner: &Arc<IpChooserInner>) -> bool {
151 if let Ok(locked_data) = inner.lock.try_lock() {
152 _is_time_to_shrink(inner.shrink_interval, &locked_data)
153 } else {
154 false
155 }
156 }
157
158 fn is_time_to_shrink_mut(inner: &Arc<IpChooserInner>) -> bool {
159 if let Ok(mut locked_data) = inner.lock.try_lock() {
160 if _is_time_to_shrink(inner.shrink_interval, &locked_data) {
161 locked_data.last_shrink_at = Instant::now();
162 return true;
163 }
164 }
165 false
166 }
167
168 fn _is_time_to_shrink(shrink_interval: Duration, locked_data: &LockedData) -> bool {
169 locked_data.last_shrink_at.elapsed() >= shrink_interval
170 }
171
172 fn shrink_cache(blacklist: &Blacklist, block_duration: Duration) {
173 let old_size = blacklist.len();
174 blacklist.retain(|_, value| value.blocked_at.elapsed() < block_duration);
175 let new_size = blacklist.len();
176 info!("Blacklist is shrunken, from {} to {} entries", old_size, new_size);
177 }
178}
179
180#[derive(Debug)]
182pub struct IpChooserBuilder {
183 inner: IpChooserInner,
184}
185
186impl Default for IpChooserBuilder {
187 #[inline]
188 fn default() -> Self {
189 Self {
190 inner: IpChooserInner {
191 blacklist: Default::default(),
192 lock: Default::default(),
193 block_duration: DEFAULT_BLOCK_DURATION,
194 shrink_interval: DEFAULT_SHRINK_INTERVAL,
195 },
196 }
197 }
198}
199
200impl IpChooserBuilder {
201 #[inline]
203 pub fn block_duration(&mut self, block_duration: Duration) -> &mut Self {
204 self.inner.block_duration = block_duration;
205 self
206 }
207
208 #[inline]
210 pub fn shrink_interval(&mut self, shrink_interval: Duration) -> &mut Self {
211 self.inner.shrink_interval = shrink_interval;
212 self
213 }
214
215 #[inline]
217 pub fn build(&mut self) -> IpChooser {
218 IpChooser {
219 inner: Arc::new(take(&mut self.inner)),
220 }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::{
227 super::super::{ChooseOptionsBuilder, ResponseError, ResponseErrorKind},
228 *,
229 };
230 use std::net::{IpAddr, Ipv4Addr};
231
232 const IPS_WITHOUT_PORT: &[IpAddrWithPort] = &[
233 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
234 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
235 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None),
236 ];
237
238 #[test]
239 fn test_ip_chooser() {
240 env_logger::builder().is_test(true).try_init().ok();
241
242 let ip_chooser = IpChooser::default();
243 let domain = DomainWithPort::new("fakedomain", None);
244 assert_eq!(
245 ip_chooser
246 .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
247 .into_ip_addrs(),
248 IPS_WITHOUT_PORT.to_vec()
249 );
250 ip_chooser.feedback(
251 ChooserFeedback::builder(&[
252 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
253 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
254 ])
255 .domain(&domain)
256 .error(&ResponseError::new_with_msg(
257 ResponseErrorKind::ParseResponseError,
258 "Test Error",
259 ))
260 .build(),
261 );
262 assert_eq!(
263 ip_chooser
264 .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
265 .into_ip_addrs(),
266 vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)],
267 );
268
269 ip_chooser.feedback(
270 ChooserFeedback::builder(IPS_WITHOUT_PORT)
271 .domain(&domain)
272 .error(&ResponseError::new_with_msg(
273 ResponseErrorKind::ParseResponseError,
274 "Test Error",
275 ))
276 .build(),
277 );
278 assert_eq!(
279 ip_chooser
280 .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
281 .into_ip_addrs(),
282 vec![]
283 );
284
285 ip_chooser.feedback(
286 ChooserFeedback::builder(&[
287 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
288 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
289 ])
290 .domain(&domain)
291 .build(),
292 );
293 assert_eq!(
294 ip_chooser
295 .choose(IPS_WITHOUT_PORT, ChooseOptionsBuilder::new().domain(&domain).build())
296 .into_ip_addrs(),
297 vec![
298 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
299 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
300 ]
301 );
302 }
303
304 #[cfg(feature = "async")]
305 #[tokio::test]
306 async fn test_ip_chooser_expiration_and_shrink() {
307 use futures_timer::Delay as AsyncDelay;
308
309 env_logger::builder().is_test(true).try_init().ok();
310
311 let ip_chooser = IpChooser::builder()
312 .block_duration(Duration::from_secs(1))
313 .shrink_interval(Duration::from_millis(500))
314 .build();
315
316 assert_eq!(
317 ip_chooser
318 .async_choose(IPS_WITHOUT_PORT, Default::default())
319 .await
320 .into_ip_addrs(),
321 IPS_WITHOUT_PORT.to_vec()
322 );
323 ip_chooser
324 .async_feedback(
325 ChooserFeedback::builder(&[
326 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), None),
327 IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)), None),
328 ])
329 .error(&ResponseError::new_with_msg(
330 ResponseErrorKind::ParseResponseError,
331 "Test Error",
332 ))
333 .build(),
334 )
335 .await;
336 assert_eq!(
337 ip_chooser
338 .async_choose(IPS_WITHOUT_PORT, Default::default())
339 .await
340 .into_ip_addrs(),
341 vec![IpAddrWithPort::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 3)), None)],
342 );
343
344 AsyncDelay::new(Duration::from_secs(1)).await;
345 assert_eq!(
346 ip_chooser
347 .async_choose(IPS_WITHOUT_PORT, Default::default())
348 .await
349 .into_ip_addrs(),
350 IPS_WITHOUT_PORT.to_vec()
351 );
352
353 AsyncDelay::new(Duration::from_millis(500)).await;
354 assert_eq!(ip_chooser.len(), 0);
355 }
356}