tencent_sdk/client/
async_client.rs

1use crate::{
2    Error, Result,
3    auth::Auth,
4    client::{
5        common::{tencent_error_from_value, tencent_request_id_from_value},
6        config::{
7            DEFAULT_BODY_SNIPPET_MAX_BYTES, DEFAULT_CONNECT_TIMEOUT, DEFAULT_RETRY_BASE_DELAY,
8            DEFAULT_TIMEOUT, DEFAULT_USER_AGENT, EndpointConfig, EndpointMode, RequestDefaults,
9            RequestOptions, RetryConfig,
10        },
11        endpoint::Endpoint,
12    },
13    error::request_id_from_headers,
14    signing::{SigningInput, build_tc3_headers},
15    transport::{TransportConfig, async_transport::ReqwestAsyncTransport},
16    types::Region,
17    util::{body_snippet, build_url, canonical_query_string, retry_after_delay, retry_delay},
18};
19use chrono::Utc;
20use http::{HeaderValue, Method, StatusCode};
21use serde_json::Value;
22use std::{sync::Arc, time::Duration};
23
24#[derive(Clone)]
25pub struct Client {
26    inner: Arc<Inner>,
27}
28
29struct Inner {
30    auth: Auth,
31    endpoint: EndpointConfig,
32    default_region: Option<Region>,
33    transport: ReqwestAsyncTransport,
34    defaults: RequestDefaults,
35    retry: RetryConfig,
36}
37
38pub struct ClientBuilder {
39    auth: Auth,
40    endpoint: EndpointConfig,
41    default_region: Option<Region>,
42    transport: TransportConfig,
43    defaults: RequestDefaults,
44    retry: RetryConfig,
45}
46
47impl Client {
48    pub fn builder(base_url: impl AsRef<str>) -> Result<ClientBuilder> {
49        ClientBuilder::new(base_url.as_ref())
50    }
51
52    pub fn builder_tencent_cloud() -> Result<ClientBuilder> {
53        ClientBuilder::new("https://tencentcloudapi.com")
54    }
55
56    pub fn billing(&self) -> crate::api::billing::BillingService {
57        crate::api::billing::BillingService::new(self.clone())
58    }
59
60    pub fn tag(&self) -> crate::api::tag::TagService {
61        crate::api::tag::TagService::new(self.clone())
62    }
63
64    pub fn cvm(&self) -> crate::api::cvm::CvmService {
65        crate::api::cvm::CvmService::new(self.clone())
66    }
67
68    pub fn vpc(&self) -> crate::api::vpc::VpcService {
69        crate::api::vpc::VpcService::new(self.clone())
70    }
71
72    pub fn cdn(&self) -> crate::api::cdn::CdnService {
73        crate::api::cdn::CdnService::new(self.clone())
74    }
75
76    pub fn dns(&self) -> crate::api::dns::DnsService {
77        crate::api::dns::DnsService::new(self.clone())
78    }
79
80    pub fn ssl(&self) -> crate::api::ssl::SslService {
81        crate::api::ssl::SslService::new(self.clone())
82    }
83
84    pub(crate) async fn execute<E: Endpoint>(
85        &self,
86        endpoint: &E,
87        options: Option<&RequestOptions>,
88    ) -> Result<E::Output> {
89        let method = endpoint.method();
90        let service = endpoint.service();
91        let action = endpoint.action();
92        let version = endpoint.version();
93        let path_segments = endpoint.path_segments();
94
95        let region_owned = endpoint
96            .region()
97            .cloned()
98            .or_else(|| self.inner.default_region.clone());
99        let region = region_owned.as_ref().map(Region::as_str);
100
101        let query_params = endpoint.query();
102        let canonical_query = canonical_query_string(&query_params);
103        let host = self.inner.endpoint.authority_for_service(service);
104        let url = build_url(
105            &self.inner.endpoint.scheme,
106            &host,
107            path_segments,
108            &canonical_query,
109        )?;
110        let path = url.path().to_string();
111
112        let payload_value = endpoint.payload()?;
113        let payload_string = payload_value
114            .as_ref()
115            .map(serde_json::to_string)
116            .transpose()
117            .map_err(|source| {
118                Error::invalid_request_with_source(
119                    "failed to serialize request payload",
120                    Box::new(source),
121                )
122            })?;
123
124        let body = body_for_method(&method, payload_string.clone());
125        let signing_payload = body.as_deref().unwrap_or("");
126
127        let timeout = options
128            .and_then(|o| o.timeout)
129            .unwrap_or(self.inner.defaults.timeout);
130        let capture_body_snippet = options
131            .and_then(|o| o.capture_body_snippet)
132            .unwrap_or(self.inner.defaults.capture_body_snippet);
133        let idempotency_key = options.and_then(|o| o.idempotency_key.as_ref());
134        let retryable_request = endpoint.is_idempotent() || idempotency_key.is_some();
135
136        #[cfg(any(feature = "tracing", feature = "metrics"))]
137        let started = std::time::Instant::now();
138
139        #[cfg(feature = "tracing")]
140        let span = tracing::info_span!(
141            "tencent_sdk.request",
142            service,
143            action,
144            version,
145            method = %method,
146            host = %host,
147            path = %path,
148            region = region.unwrap_or(""),
149        );
150
151        let mut attempt = 0usize;
152        loop {
153            let timestamp = Utc::now().timestamp();
154
155            #[cfg(feature = "tracing")]
156            tracing::debug!(parent: &span, attempt = attempt + 1, "sending request");
157            let mut headers = build_tc3_headers(
158                &self.inner.auth,
159                &SigningInput {
160                    method: &method,
161                    service,
162                    host: &host,
163                    path: &path,
164                    canonical_query: &canonical_query,
165                    region,
166                    action,
167                    version,
168                    payload: signing_payload,
169                    timestamp,
170                },
171            )?;
172
173            headers.extend(endpoint.extra_headers()?);
174
175            if let Some(key) = idempotency_key {
176                headers.insert(
177                    "Idempotency-Key",
178                    HeaderValue::from_str(key.as_str()).map_err(|source| {
179                        Error::invalid_request_with_source("invalid idempotency key", source)
180                    })?,
181                );
182            }
183
184            let response = self
185                .inner
186                .transport
187                .send(method.clone(), url.clone(), headers, body.clone(), timeout)
188                .await;
189
190            let response = match response {
191                Ok(value) => value,
192                Err(err) => {
193                    if attempt < self.inner.retry.max_retries
194                        && retryable_request
195                        && err.is_retryable()
196                    {
197                        attempt += 1;
198                        let delay = retry_delay(self.inner.retry.base_delay, attempt);
199
200                        #[cfg(feature = "tracing")]
201                        tracing::warn!(
202                            parent: &span,
203                            attempt,
204                            delay = ?delay,
205                            kind = ?err.kind(),
206                            "retrying after transport error"
207                        );
208
209                        if !delay.is_zero() {
210                            tokio::time::sleep(delay).await;
211                        }
212                        continue;
213                    }
214
215                    #[cfg(feature = "tracing")]
216                    tracing::error!(
217                        parent: &span,
218                        kind = ?err.kind(),
219                        "request failed with transport error"
220                    );
221
222                    #[cfg(feature = "metrics")]
223                    super::metrics::record_error(service, action, &err, attempt, started.elapsed());
224                    return Err(err);
225                }
226            };
227
228            if !response.status.is_success() {
229                if attempt < self.inner.retry.max_retries
230                    && retryable_request
231                    && is_retryable_status(response.status)
232                {
233                    attempt += 1;
234                    let delay = retry_after_delay(&response.headers)
235                        .unwrap_or_else(|| retry_delay(self.inner.retry.base_delay, attempt));
236
237                    #[cfg(feature = "tracing")]
238                    tracing::warn!(
239                        parent: &span,
240                        attempt,
241                        status = %response.status,
242                        delay = ?delay,
243                        "retrying after retryable HTTP status"
244                    );
245
246                    if !delay.is_zero() {
247                        tokio::time::sleep(delay).await;
248                    }
249                    continue;
250                }
251
252                let request_id_header = request_id_from_headers(&response.headers);
253                let retry_after = retry_after_delay(&response.headers);
254                let (code, message, request_id_body) =
255                    serde_json::from_str::<Value>(&response.body)
256                        .ok()
257                        .and_then(|json| tencent_error_from_value(&json))
258                        .map(|(code, message, request_id)| (Some(code), Some(message), request_id))
259                        .unwrap_or((None, None, None));
260
261                let request_id = request_id_body.or(request_id_header);
262                let snippet = capture_body_snippet.then(|| {
263                    body_snippet(&response.body, self.inner.defaults.body_snippet_max_bytes)
264                });
265
266                let err = Error::api(
267                    Some(response.status),
268                    method,
269                    host,
270                    path,
271                    code,
272                    message,
273                    request_id,
274                    snippet,
275                    retry_after,
276                );
277
278                #[cfg(feature = "tracing")]
279                tracing::error!(
280                    parent: &span,
281                    kind = ?err.kind(),
282                    status = ?err.status(),
283                    request_id = ?err.request_id(),
284                    "request failed with HTTP status"
285                );
286
287                #[cfg(feature = "metrics")]
288                super::metrics::record_error(service, action, &err, attempt, started.elapsed());
289
290                return Err(err);
291            }
292
293            let json: Value = serde_json::from_str(&response.body).map_err(|source| {
294                let request_id = request_id_from_headers(&response.headers);
295                let snippet = capture_body_snippet.then(|| {
296                    body_snippet(&response.body, self.inner.defaults.body_snippet_max_bytes)
297                });
298                let err = Error::decode(
299                    Some(response.status),
300                    method.clone(),
301                    host.clone(),
302                    path.clone(),
303                    request_id,
304                    snippet,
305                    Box::new(source),
306                );
307
308                #[cfg(feature = "tracing")]
309                tracing::error!(
310                    parent: &span,
311                    kind = ?err.kind(),
312                    status = ?err.status(),
313                    request_id = ?err.request_id(),
314                    "request failed to decode response JSON"
315                );
316
317                #[cfg(feature = "metrics")]
318                super::metrics::record_error(service, action, &err, attempt, started.elapsed());
319
320                err
321            })?;
322
323            if let Some((code, message, request_id_body)) = tencent_error_from_value(&json) {
324                let request_id =
325                    request_id_body.or_else(|| request_id_from_headers(&response.headers));
326                let snippet = capture_body_snippet.then(|| {
327                    body_snippet(&response.body, self.inner.defaults.body_snippet_max_bytes)
328                });
329                let err = Error::api(
330                    Some(response.status),
331                    method.clone(),
332                    host.clone(),
333                    path.clone(),
334                    Some(code),
335                    Some(message),
336                    request_id,
337                    snippet,
338                    retry_after_delay(&response.headers),
339                );
340
341                if attempt < self.inner.retry.max_retries && retryable_request && err.is_retryable()
342                {
343                    attempt += 1;
344                    let delay = err
345                        .retry_after()
346                        .unwrap_or_else(|| retry_delay(self.inner.retry.base_delay, attempt));
347
348                    #[cfg(feature = "tracing")]
349                    tracing::warn!(
350                        parent: &span,
351                        attempt,
352                        delay = ?delay,
353                        kind = ?err.kind(),
354                        code = ?err.code(),
355                        "retrying after API error"
356                    );
357
358                    if !delay.is_zero() {
359                        tokio::time::sleep(delay).await;
360                    }
361                    continue;
362                }
363
364                #[cfg(feature = "tracing")]
365                tracing::error!(
366                    parent: &span,
367                    kind = ?err.kind(),
368                    status = ?err.status(),
369                    request_id = ?err.request_id(),
370                    code = ?err.code(),
371                    "request failed with API error"
372                );
373
374                #[cfg(feature = "metrics")]
375                super::metrics::record_error(service, action, &err, attempt, started.elapsed());
376
377                return Err(err);
378            }
379
380            let request_id = tencent_request_id_from_value(&json)
381                .or_else(|| request_id_from_headers(&response.headers));
382            let snippet = capture_body_snippet
383                .then(|| body_snippet(&response.body, self.inner.defaults.body_snippet_max_bytes));
384
385            let output = serde_json::from_value(json).map_err(|source| {
386                let err = Error::decode(
387                    Some(response.status),
388                    method.clone(),
389                    host.clone(),
390                    path.clone(),
391                    request_id.clone(),
392                    snippet.clone(),
393                    Box::new(source),
394                );
395
396                #[cfg(feature = "tracing")]
397                tracing::error!(
398                    parent: &span,
399                    kind = ?err.kind(),
400                    status = ?err.status(),
401                    request_id = ?err.request_id(),
402                    "request failed to decode response body"
403                );
404
405                #[cfg(feature = "metrics")]
406                super::metrics::record_error(service, action, &err, attempt, started.elapsed());
407
408                err
409            })?;
410
411            #[cfg(feature = "tracing")]
412            tracing::info!(
413                parent: &span,
414                status = %response.status,
415                retries = attempt,
416                elapsed = ?started.elapsed(),
417                request_id = ?request_id.as_deref(),
418                "request succeeded"
419            );
420
421            #[cfg(feature = "metrics")]
422            super::metrics::record_success(
423                service,
424                action,
425                response.status,
426                attempt,
427                started.elapsed(),
428            );
429
430            return Ok(output);
431        }
432    }
433}
434
435impl ClientBuilder {
436    fn new(base_url: &str) -> Result<Self> {
437        let endpoint = EndpointConfig::from_base_url(base_url, EndpointMode::ServiceSubdomain)?;
438
439        Ok(Self {
440            auth: Auth::none(),
441            endpoint,
442            default_region: None,
443            transport: TransportConfig {
444                user_agent: DEFAULT_USER_AGENT.to_string(),
445                accept_invalid_certs: false,
446                no_proxy: false,
447                connect_timeout: DEFAULT_CONNECT_TIMEOUT,
448                read_timeout: None,
449            },
450            defaults: RequestDefaults {
451                timeout: DEFAULT_TIMEOUT,
452                capture_body_snippet: true,
453                body_snippet_max_bytes: DEFAULT_BODY_SNIPPET_MAX_BYTES,
454            },
455            retry: RetryConfig {
456                max_retries: 0,
457                base_delay: DEFAULT_RETRY_BASE_DELAY,
458            },
459        })
460    }
461
462    pub fn auth(mut self, auth: Auth) -> Self {
463        self.auth = auth;
464        self
465    }
466
467    pub fn endpoint_mode(mut self, mode: EndpointMode) -> Self {
468        self.endpoint.mode = mode;
469        self
470    }
471
472    pub fn default_region(mut self, region: impl Into<Region>) -> Self {
473        self.default_region = Some(region.into());
474        self
475    }
476
477    pub fn timeout(mut self, timeout: Duration) -> Self {
478        self.defaults.timeout = timeout;
479        self
480    }
481
482    pub fn connect_timeout(mut self, timeout: Duration) -> Self {
483        self.transport.connect_timeout = timeout;
484        self
485    }
486
487    pub fn read_timeout(mut self, timeout: Duration) -> Self {
488        self.transport.read_timeout = Some(timeout);
489        self
490    }
491
492    pub fn capture_body_snippet(mut self, enabled: bool) -> Self {
493        self.defaults.capture_body_snippet = enabled;
494        self
495    }
496
497    pub fn body_snippet_max_bytes(mut self, max_bytes: usize) -> Self {
498        self.defaults.body_snippet_max_bytes = max_bytes;
499        self
500    }
501
502    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
503        self.transport.user_agent = user_agent.into();
504        self
505    }
506
507    pub fn danger_accept_invalid_certs(mut self, enabled: bool) -> Self {
508        self.transport.accept_invalid_certs = enabled;
509        self
510    }
511
512    pub fn no_system_proxy(mut self, enabled: bool) -> Self {
513        self.transport.no_proxy = enabled;
514        self
515    }
516
517    pub fn retry(mut self, max_retries: usize, base_delay: Duration) -> Self {
518        self.retry.max_retries = max_retries;
519        self.retry.base_delay = base_delay;
520        self
521    }
522
523    pub fn build(self) -> Result<Client> {
524        let transport = ReqwestAsyncTransport::new(&self.transport)?;
525
526        Ok(Client {
527            inner: Arc::new(Inner {
528                auth: self.auth,
529                endpoint: self.endpoint,
530                default_region: self.default_region,
531                transport,
532                defaults: self.defaults,
533                retry: self.retry,
534            }),
535        })
536    }
537}
538
539fn body_for_method(method: &Method, payload: Option<String>) -> Option<String> {
540    if *method == Method::GET || *method == Method::HEAD || *method == Method::OPTIONS {
541        None
542    } else {
543        payload
544    }
545}
546
547fn is_retryable_status(status: StatusCode) -> bool {
548    matches!(
549        status,
550        StatusCode::TOO_MANY_REQUESTS
551            | StatusCode::BAD_GATEWAY
552            | StatusCode::SERVICE_UNAVAILABLE
553            | StatusCode::GATEWAY_TIMEOUT
554    )
555}