qiniu_http_client/client/resolver/
timeout.rs

1use super::{super::ResponseError, ResolveOptions, ResolveResult, Resolver};
2use qiniu_http::ResponseErrorKind as HttpResponseErrorKind;
3use std::time::Duration;
4
5#[cfg(feature = "async")]
6use {
7    futures::future::{BoxFuture, FutureExt},
8    futures_timer::Delay as AsyncDelay,
9};
10
11const DEFAULT_RESOLVE_TIMEOUT: Duration = Duration::from_secs(5);
12
13/// 超时域名解析器
14///
15/// 为一个域名解析器实例提供超时功能
16///
17/// 默认超时时间为 5 秒
18#[derive(Debug, Clone)]
19pub struct TimeoutResolver<R: ?Sized> {
20    timeout: Duration,
21    resolver: R,
22}
23
24impl<R> TimeoutResolver<R> {
25    /// 创建超时解析器
26    #[inline]
27    pub fn new(resolver: R, timeout: Duration) -> Self {
28        Self { timeout, resolver }
29    }
30}
31
32impl<R: Default> Default for TimeoutResolver<R> {
33    #[inline]
34    fn default() -> Self {
35        Self::new(Default::default(), DEFAULT_RESOLVE_TIMEOUT)
36    }
37}
38
39impl<R: Resolver + Clone + 'static> Resolver for TimeoutResolver<R> {
40    #[inline]
41    fn resolve(&self, domain: &str, opts: ResolveOptions) -> ResolveResult {
42        return _resolve(self, domain, opts);
43
44        #[cfg(feature = "async")]
45        fn _resolve<R: Resolver + Clone + 'static>(
46            resolver: &TimeoutResolver<R>,
47            domain: &str,
48            opts: ResolveOptions,
49        ) -> ResolveResult {
50            async_std::task::block_on(async move { resolver.async_resolve(domain, opts).await })
51        }
52
53        #[cfg(not(feature = "async"))]
54        fn _resolve<R: Resolver + Clone + 'static>(
55            resolver: &TimeoutResolver<R>,
56            domain: &str,
57            opts: ResolveOptions,
58        ) -> ResolveResult {
59            use super::{super::super::spawn::spawn, owned_resolver_options::OwnedResolveOptions};
60            use crossbeam_channel::{bounded, Select, SelectTimeoutError};
61            use log::warn;
62
63            let (sender, receiver) = bounded(0);
64            {
65                let inner = resolver.to_owned();
66                let domain = domain.to_owned();
67                let opts = OwnedResolveOptions::from(opts);
68                if let Err(err) = spawn(
69                    "qiniu.rust-sdk.http-client.resolver.TimeoutResolver.resolve".into(),
70                    move || {
71                        let opts = ResolveOptions::from(&opts);
72                        sender.send(inner.resolver.resolve(&domain, opts)).ok();
73                    },
74                ) {
75                    warn!("Timeout Resolver was failed to spawn thread to resolve domain: {}", err);
76                }
77            }
78            let mut sel = Select::new();
79            let op1 = sel.recv(&receiver);
80            let oper = sel.select_timeout(resolver.timeout);
81            return match oper {
82                Ok(op) => match op.index() {
83                    i if i == op1 => op.recv(&receiver).unwrap(),
84                    _ => unreachable!(),
85                },
86                Err(err) => Err(make_timeout_error(err, opts)),
87            };
88
89            fn make_timeout_error(err: SelectTimeoutError, opts: ResolveOptions) -> ResponseError {
90                let mut err = ResponseError::new(HttpResponseErrorKind::TimeoutError.into(), err);
91                if let Some(retried) = opts.retried() {
92                    err = err.retried(retried);
93                }
94                err
95            }
96        }
97    }
98
99    #[cfg(feature = "async")]
100    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
101    fn async_resolve<'a>(&'a self, domain: &'a str, opts: ResolveOptions<'a>) -> BoxFuture<'a, ResolveResult> {
102        use futures::{pin_mut, select};
103
104        return Box::pin(async move {
105            let resolve_task = self.resolver.async_resolve(domain, opts).fuse();
106            let timeout_task = AsyncDelay::new(self.timeout).fuse();
107            pin_mut!(resolve_task);
108            pin_mut!(timeout_task);
109            select! {
110                resolve_result = resolve_task => resolve_result,
111                _ = timeout_task => Err(make_timeout_error(self.timeout, opts)),
112            }
113        });
114
115        fn make_timeout_error(timeout: Duration, opts: ResolveOptions) -> ResponseError {
116            let mut err = ResponseError::new_with_msg(
117                HttpResponseErrorKind::TimeoutError.into(),
118                format!("Failed to resolve domain in {timeout:?}"),
119            );
120            if let Some(retried) = opts.retried() {
121                err = err.retried(retried);
122            }
123            err
124        }
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use std::{
132        error::Error,
133        net::{IpAddr, Ipv4Addr},
134        thread::sleep,
135    };
136
137    const IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))];
138
139    #[derive(Clone, Copy, Debug)]
140    struct WaitResolver(Duration);
141
142    impl Resolver for WaitResolver {
143        #[inline]
144        fn resolve(&self, _domain: &str, _opts: ResolveOptions) -> ResolveResult {
145            sleep(self.0);
146            Ok(IPS.to_owned().into_boxed_slice().into())
147        }
148
149        #[inline]
150        #[cfg(feature = "async")]
151        fn async_resolve<'a>(&'a self, _domain: &'a str, _opts: ResolveOptions) -> BoxFuture<'a, ResolveResult> {
152            Box::pin(async move {
153                AsyncDelay::new(self.0).await;
154                Ok(IPS.to_owned().into_boxed_slice().into())
155            })
156        }
157    }
158
159    #[test]
160    fn test_timeout_resolver() -> Result<(), Box<dyn Error>> {
161        let resolver = TimeoutResolver::new(WaitResolver(Duration::from_secs(1)), Duration::from_secs(2));
162
163        let answers = resolver.resolve("fake.domain", Default::default())?;
164        assert_eq!(answers.ip_addrs(), IPS);
165
166        let resolver = TimeoutResolver::new(WaitResolver(Duration::from_secs(2)), Duration::from_secs(1));
167        resolver.resolve("fake.domain", Default::default()).unwrap_err();
168
169        Ok(())
170    }
171
172    #[cfg(feature = "async")]
173    #[tokio::test]
174    async fn test_async_timeout_resolver() -> Result<(), Box<dyn Error>> {
175        let resolver = TimeoutResolver::new(WaitResolver(Duration::from_millis(100)), Duration::from_millis(200));
176
177        let answers = resolver.async_resolve("fake.domain", Default::default()).await?;
178        assert_eq!(answers.ip_addrs(), IPS);
179
180        let resolver = TimeoutResolver::new(WaitResolver(Duration::from_millis(200)), Duration::from_millis(100));
181        resolver
182            .async_resolve("fake.domain", Default::default())
183            .await
184            .unwrap_err();
185
186        Ok(())
187    }
188}