qiniu_http_client/client/resolver/
timeout.rs1use super::{super::ResponseError, ResolveOptions, ResolveResult, Resolver};
2use qiniu_http::ResponseErrorKind as HttpResponseErrorKind;
3use std::time::Duration;
4
5#[cfg(feature = "async")]
6use {
7 futures::future::{BoxFuture, FutureExt},
8 futures_timer::Delay as AsyncDelay,
9};
10
11const DEFAULT_RESOLVE_TIMEOUT: Duration = Duration::from_secs(5);
12
13#[derive(Debug, Clone)]
19pub struct TimeoutResolver<R: ?Sized> {
20 timeout: Duration,
21 resolver: R,
22}
23
24impl<R> TimeoutResolver<R> {
25 #[inline]
27 pub fn new(resolver: R, timeout: Duration) -> Self {
28 Self { timeout, resolver }
29 }
30}
31
32impl<R: Default> Default for TimeoutResolver<R> {
33 #[inline]
34 fn default() -> Self {
35 Self::new(Default::default(), DEFAULT_RESOLVE_TIMEOUT)
36 }
37}
38
39impl<R: Resolver + Clone + 'static> Resolver for TimeoutResolver<R> {
40 #[inline]
41 fn resolve(&self, domain: &str, opts: ResolveOptions) -> ResolveResult {
42 return _resolve(self, domain, opts);
43
44 #[cfg(feature = "async")]
45 fn _resolve<R: Resolver + Clone + 'static>(
46 resolver: &TimeoutResolver<R>,
47 domain: &str,
48 opts: ResolveOptions,
49 ) -> ResolveResult {
50 async_std::task::block_on(async move { resolver.async_resolve(domain, opts).await })
51 }
52
53 #[cfg(not(feature = "async"))]
54 fn _resolve<R: Resolver + Clone + 'static>(
55 resolver: &TimeoutResolver<R>,
56 domain: &str,
57 opts: ResolveOptions,
58 ) -> ResolveResult {
59 use super::{super::super::spawn::spawn, owned_resolver_options::OwnedResolveOptions};
60 use crossbeam_channel::{bounded, Select, SelectTimeoutError};
61 use log::warn;
62
63 let (sender, receiver) = bounded(0);
64 {
65 let inner = resolver.to_owned();
66 let domain = domain.to_owned();
67 let opts = OwnedResolveOptions::from(opts);
68 if let Err(err) = spawn(
69 "qiniu.rust-sdk.http-client.resolver.TimeoutResolver.resolve".into(),
70 move || {
71 let opts = ResolveOptions::from(&opts);
72 sender.send(inner.resolver.resolve(&domain, opts)).ok();
73 },
74 ) {
75 warn!("Timeout Resolver was failed to spawn thread to resolve domain: {}", err);
76 }
77 }
78 let mut sel = Select::new();
79 let op1 = sel.recv(&receiver);
80 let oper = sel.select_timeout(resolver.timeout);
81 return match oper {
82 Ok(op) => match op.index() {
83 i if i == op1 => op.recv(&receiver).unwrap(),
84 _ => unreachable!(),
85 },
86 Err(err) => Err(make_timeout_error(err, opts)),
87 };
88
89 fn make_timeout_error(err: SelectTimeoutError, opts: ResolveOptions) -> ResponseError {
90 let mut err = ResponseError::new(HttpResponseErrorKind::TimeoutError.into(), err);
91 if let Some(retried) = opts.retried() {
92 err = err.retried(retried);
93 }
94 err
95 }
96 }
97 }
98
99 #[cfg(feature = "async")]
100 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
101 fn async_resolve<'a>(&'a self, domain: &'a str, opts: ResolveOptions<'a>) -> BoxFuture<'a, ResolveResult> {
102 use futures::{pin_mut, select};
103
104 return Box::pin(async move {
105 let resolve_task = self.resolver.async_resolve(domain, opts).fuse();
106 let timeout_task = AsyncDelay::new(self.timeout).fuse();
107 pin_mut!(resolve_task);
108 pin_mut!(timeout_task);
109 select! {
110 resolve_result = resolve_task => resolve_result,
111 _ = timeout_task => Err(make_timeout_error(self.timeout, opts)),
112 }
113 });
114
115 fn make_timeout_error(timeout: Duration, opts: ResolveOptions) -> ResponseError {
116 let mut err = ResponseError::new_with_msg(
117 HttpResponseErrorKind::TimeoutError.into(),
118 format!("Failed to resolve domain in {timeout:?}"),
119 );
120 if let Some(retried) = opts.retried() {
121 err = err.retried(retried);
122 }
123 err
124 }
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use std::{
132 error::Error,
133 net::{IpAddr, Ipv4Addr},
134 thread::sleep,
135 };
136
137 const IPS: &[IpAddr] = &[IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1))];
138
139 #[derive(Clone, Copy, Debug)]
140 struct WaitResolver(Duration);
141
142 impl Resolver for WaitResolver {
143 #[inline]
144 fn resolve(&self, _domain: &str, _opts: ResolveOptions) -> ResolveResult {
145 sleep(self.0);
146 Ok(IPS.to_owned().into_boxed_slice().into())
147 }
148
149 #[inline]
150 #[cfg(feature = "async")]
151 fn async_resolve<'a>(&'a self, _domain: &'a str, _opts: ResolveOptions) -> BoxFuture<'a, ResolveResult> {
152 Box::pin(async move {
153 AsyncDelay::new(self.0).await;
154 Ok(IPS.to_owned().into_boxed_slice().into())
155 })
156 }
157 }
158
159 #[test]
160 fn test_timeout_resolver() -> Result<(), Box<dyn Error>> {
161 let resolver = TimeoutResolver::new(WaitResolver(Duration::from_secs(1)), Duration::from_secs(2));
162
163 let answers = resolver.resolve("fake.domain", Default::default())?;
164 assert_eq!(answers.ip_addrs(), IPS);
165
166 let resolver = TimeoutResolver::new(WaitResolver(Duration::from_secs(2)), Duration::from_secs(1));
167 resolver.resolve("fake.domain", Default::default()).unwrap_err();
168
169 Ok(())
170 }
171
172 #[cfg(feature = "async")]
173 #[tokio::test]
174 async fn test_async_timeout_resolver() -> Result<(), Box<dyn Error>> {
175 let resolver = TimeoutResolver::new(WaitResolver(Duration::from_millis(100)), Duration::from_millis(200));
176
177 let answers = resolver.async_resolve("fake.domain", Default::default()).await?;
178 assert_eq!(answers.ip_addrs(), IPS);
179
180 let resolver = TimeoutResolver::new(WaitResolver(Duration::from_millis(200)), Duration::from_millis(100));
181 resolver
182 .async_resolve("fake.domain", Default::default())
183 .await
184 .unwrap_err();
185
186 Ok(())
187 }
188}