Skip to main content

dingtalk_sdk/transport/
mod.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, RwLock},
4    time::{Duration, Instant, SystemTime},
5};
6
7use serde::de::DeserializeOwned;
8use url::Url;
9
10use crate::{
11    auth::AppCredentials,
12    error::{Error, HttpError, Result, TransportError},
13    types::{
14        enterprise::ApprovalProcessInstance,
15        internal::{
16            ApprovalCreateProcessInstanceResponse, ApprovalGetProcessInstanceResponse,
17            GetTokenResponse, StandardApiResponse, TopApiResultResponse, TopApiSimpleResponse,
18        },
19    },
20    util::{
21        redact::{redact_text, truncate_snippet},
22        url::endpoint_url,
23    },
24};
25
26pub(crate) const DEFAULT_WEBHOOK_BASE_URL: &str = "https://oapi.dingtalk.com";
27pub(crate) const DEFAULT_ENTERPRISE_BASE_URL: &str = "https://api.dingtalk.com";
28pub(crate) const DEFAULT_MSG_KEY: &str = "sampleMarkdown";
29const DEFAULT_ACCESS_TOKEN_TTL: Duration = Duration::from_secs(7_200);
30const MIN_ACCESS_TOKEN_TTL: Duration = Duration::from_secs(30);
31
32#[derive(Debug, Clone, Copy)]
33/// Configuration for attaching a redacted body snippet to API errors.
34pub struct BodySnippetConfig {
35    /// Whether body snippet capture is enabled.
36    pub enabled: bool,
37    /// Maximum number of bytes to keep.
38    pub max_bytes: usize,
39}
40
41impl Default for BodySnippetConfig {
42    fn default() -> Self {
43        Self {
44            enabled: true,
45            max_bytes: 4096,
46        }
47    }
48}
49
50pub(crate) fn api_error(
51    code: i64,
52    message: impl Into<String>,
53    request_id: Option<String>,
54    body_snippet: Option<String>,
55) -> Error {
56    Error::Api {
57        code,
58        message: message.into(),
59        request_id,
60        body_snippet,
61    }
62}
63
64pub(crate) fn response_api_error(
65    code: i64,
66    message: impl Into<String>,
67    request_id: Option<String>,
68    body: &str,
69    body_snippet: BodySnippetConfig,
70) -> Error {
71    api_error(
72        code,
73        message,
74        request_id,
75        body_snippet_for_error(body, body_snippet),
76    )
77}
78
79pub(crate) fn build_webhook_url(base_url: &Url, token: &str, secret: Option<&str>) -> Result<Url> {
80    let mut url = endpoint_url(base_url, &["robot", "send"])?;
81    {
82        let mut query = url.query_pairs_mut();
83        query.append_pair("access_token", token);
84
85        if let Some(secret) = secret {
86            let timestamp = crate::signature::current_timestamp_millis()?;
87            let sign = crate::signature::create_signature(&timestamp, secret)?;
88            query.append_pair("timestamp", &timestamp);
89            query.append_pair("sign", &sign);
90        }
91    }
92    Ok(url)
93}
94
95#[derive(Debug, Clone)]
96pub(crate) struct AccessTokenCache {
97    inner: Arc<RwLock<HashMap<AppCredentials, CachedAccessToken>>>,
98}
99
100#[derive(Debug, Clone)]
101struct CachedAccessToken {
102    token: String,
103    expires_at: Instant,
104}
105
106impl AccessTokenCache {
107    #[must_use]
108    pub(crate) fn new() -> Self {
109        Self {
110            inner: Arc::new(RwLock::new(HashMap::new())),
111        }
112    }
113
114    pub(crate) fn get(
115        &self,
116        credentials: &AppCredentials,
117        refresh_margin: Duration,
118    ) -> Option<String> {
119        let now = Instant::now();
120        let guard = self.inner.read().ok()?;
121        let cached = guard.get(credentials)?;
122        let refresh_at = now.checked_add(refresh_margin)?;
123        if refresh_at < cached.expires_at {
124            Some(cached.token.clone())
125        } else {
126            None
127        }
128    }
129
130    pub(crate) fn store(
131        &self,
132        credentials: AppCredentials,
133        token: String,
134        expires_in_seconds: Option<i64>,
135    ) {
136        let ttl = normalize_token_ttl(expires_in_seconds);
137        let expires_at = Instant::now().checked_add(ttl).unwrap_or_else(Instant::now);
138
139        if let Ok(mut guard) = self.inner.write() {
140            guard.insert(credentials, CachedAccessToken { token, expires_at });
141        }
142    }
143}
144
145fn normalize_token_ttl(expires_in_seconds: Option<i64>) -> Duration {
146    match expires_in_seconds {
147        Some(value) if value > 0 => Duration::from_secs(value as u64).max(MIN_ACCESS_TOKEN_TTL),
148        _ => DEFAULT_ACCESS_TOKEN_TTL,
149    }
150}
151
152#[derive(Debug)]
153pub(crate) struct AccessTokenPayload {
154    pub(crate) token: String,
155    pub(crate) expires_in: Option<i64>,
156}
157
158struct SuccessfulResponseBody {
159    body: String,
160    header_request_id: Option<String>,
161}
162
163struct DecodedResponse<T> {
164    value: T,
165    body: String,
166    header_request_id: Option<String>,
167}
168
169#[cfg(test)]
170pub(crate) fn validate_standard_api_response(
171    body: &str,
172    body_snippet: BodySnippetConfig,
173) -> Result<()> {
174    validate_standard_api_response_with_request_id(body, None, body_snippet)
175}
176
177pub(crate) fn parse_standard_api_text_response(
178    response: reqx::Response,
179    body_snippet: BodySnippetConfig,
180) -> Result<String> {
181    let response = successful_response_body(response, body_snippet)?;
182    validate_standard_api_response_with_request_id(
183        &response.body,
184        response.header_request_id,
185        body_snippet,
186    )?;
187    Ok(response.body)
188}
189
190pub(crate) fn parse_get_token_response(
191    response: reqx::Response,
192    body_snippet: BodySnippetConfig,
193) -> Result<AccessTokenPayload> {
194    let DecodedResponse {
195        value,
196        body,
197        header_request_id,
198    } = decode_json_response::<GetTokenResponse>(response, body_snippet)?;
199    let GetTokenResponse {
200        errcode,
201        errmsg,
202        access_token,
203        expires_in,
204        request_id,
205    } = value;
206    let request_id = request_id.or(header_request_id);
207
208    if errcode != 0 {
209        return Err(response_api_error(
210            errcode,
211            errmsg,
212            request_id,
213            &body,
214            body_snippet,
215        ));
216    }
217
218    let token = access_token.ok_or_else(|| {
219        response_api_error(
220            -1,
221            "No access token returned",
222            request_id.clone(),
223            &body,
224            body_snippet,
225        )
226    })?;
227
228    Ok(AccessTokenPayload { token, expires_in })
229}
230
231pub(crate) fn parse_topapi_result_response<T>(
232    response: reqx::Response,
233    body_snippet: BodySnippetConfig,
234) -> Result<T>
235where
236    T: DeserializeOwned,
237{
238    let DecodedResponse {
239        value,
240        body,
241        header_request_id,
242    } = decode_json_response::<TopApiResultResponse<T>>(response, body_snippet)?;
243    let TopApiResultResponse {
244        errcode,
245        errmsg,
246        result,
247        request_id,
248    } = value;
249    let request_id = request_id.or(header_request_id);
250
251    if errcode != 0 {
252        return Err(response_api_error(
253            errcode,
254            errmsg,
255            request_id,
256            &body,
257            body_snippet,
258        ));
259    }
260
261    result.ok_or_else(|| {
262        response_api_error(
263            -1,
264            "Missing result field in topapi response",
265            request_id,
266            &body,
267            body_snippet,
268        )
269    })
270}
271
272pub(crate) fn parse_topapi_unit_response(
273    response: reqx::Response,
274    body_snippet: BodySnippetConfig,
275) -> Result<()> {
276    let DecodedResponse {
277        value,
278        body,
279        header_request_id,
280    } = decode_json_response::<TopApiSimpleResponse>(response, body_snippet)?;
281    let TopApiSimpleResponse {
282        errcode,
283        errmsg,
284        request_id,
285    } = value;
286    let request_id = request_id.or(header_request_id);
287
288    if errcode != 0 {
289        return Err(response_api_error(
290            errcode,
291            errmsg,
292            request_id,
293            &body,
294            body_snippet,
295        ));
296    }
297
298    Ok(())
299}
300
301pub(crate) fn parse_approval_create_response(
302    response: reqx::Response,
303    body_snippet: BodySnippetConfig,
304) -> Result<String> {
305    let DecodedResponse {
306        value,
307        body,
308        header_request_id,
309    } = decode_json_response::<ApprovalCreateProcessInstanceResponse>(response, body_snippet)?;
310    let ApprovalCreateProcessInstanceResponse {
311        errcode,
312        errmsg,
313        process_instance_id,
314        request_id,
315    } = value;
316    let request_id = request_id.or(header_request_id);
317
318    if errcode != 0 {
319        return Err(response_api_error(
320            errcode,
321            errmsg,
322            request_id,
323            &body,
324            body_snippet,
325        ));
326    }
327
328    process_instance_id.ok_or_else(|| {
329        response_api_error(
330            -1,
331            "Missing process_instance_id in response",
332            request_id,
333            &body,
334            body_snippet,
335        )
336    })
337}
338
339pub(crate) fn parse_approval_get_response(
340    response: reqx::Response,
341    body_snippet: BodySnippetConfig,
342) -> Result<ApprovalProcessInstance> {
343    let DecodedResponse {
344        value,
345        body,
346        header_request_id,
347    } = decode_json_response::<ApprovalGetProcessInstanceResponse>(response, body_snippet)?;
348    let ApprovalGetProcessInstanceResponse {
349        errcode,
350        errmsg,
351        process_instance,
352        request_id,
353    } = value;
354    let request_id = request_id.or(header_request_id);
355
356    if errcode != 0 {
357        return Err(response_api_error(
358            errcode,
359            errmsg,
360            request_id,
361            &body,
362            body_snippet,
363        ));
364    }
365
366    process_instance.ok_or_else(|| {
367        response_api_error(
368            -1,
369            "Missing process_instance field in response",
370            request_id,
371            &body,
372            body_snippet,
373        )
374    })
375}
376
377fn validate_standard_api_response_with_request_id(
378    body: &str,
379    header_request_id: Option<String>,
380    body_snippet: BodySnippetConfig,
381) -> Result<()> {
382    if let Some(response) = parse_standard_api_response_body(body)
383        && let Some(errcode) = response.errcode
384        && errcode != 0
385    {
386        let message = response
387            .errmsg
388            .unwrap_or_else(|| "unknown dingtalk api error".to_string());
389        let request_id = response.request_id.or(header_request_id);
390        let snippet = body_snippet_for_error(body, body_snippet);
391        return Err(api_error(errcode, message, request_id, snippet));
392    }
393    Ok(())
394}
395
396fn successful_response_body(
397    response: reqx::Response,
398    body_snippet: BodySnippetConfig,
399) -> Result<SuccessfulResponseBody> {
400    let status = response.status().as_u16();
401    let header_request_id = response_request_id(&response);
402    let retry_after = response_retry_after(&response);
403    let body = response.text_lossy();
404
405    if !(200..=299).contains(&status) {
406        return Err(http_error_from_response(
407            status,
408            &body,
409            header_request_id,
410            retry_after,
411            body_snippet,
412        ));
413    }
414
415    Ok(SuccessfulResponseBody {
416        body,
417        header_request_id,
418    })
419}
420
421fn decode_json_response<T>(
422    response: reqx::Response,
423    body_snippet: BodySnippetConfig,
424) -> Result<DecodedResponse<T>>
425where
426    T: DeserializeOwned,
427{
428    let SuccessfulResponseBody {
429        body,
430        header_request_id,
431    } = successful_response_body(response, body_snippet)?;
432    let value = serde_json::from_str(&body)?;
433
434    Ok(DecodedResponse {
435        value,
436        body,
437        header_request_id,
438    })
439}
440
441fn parse_standard_api_response_body(body: &str) -> Option<StandardApiResponse> {
442    serde_json::from_str(body).ok()
443}
444
445fn http_error_from_response(
446    status: u16,
447    body: &str,
448    header_request_id: Option<String>,
449    retry_after: Option<Duration>,
450    body_snippet: BodySnippetConfig,
451) -> Error {
452    let parsed = parse_standard_api_response_body(body);
453    let message = parsed.as_ref().and_then(|response| response.errmsg.clone());
454    let request_id = parsed
455        .and_then(|response| response.request_id)
456        .or(header_request_id);
457    let snippet = body_snippet_for_error(body, body_snippet);
458    let error = HttpError {
459        status,
460        message: message.clone(),
461        request_id: request_id.clone(),
462        body_snippet: snippet.clone(),
463    };
464
465    match status {
466        401 | 403 => Error::Auth(error),
467        404 => Error::NotFound(error),
468        409 | 412 => Error::Conflict(error),
469        429 => Error::RateLimited { retry_after, error },
470        _ => Error::Transport(Box::new(TransportError {
471            status: Some(status),
472            message,
473            request_id,
474            body_snippet: snippet,
475            retry_after,
476            retryable: matches!(status, 429 | 500..=599),
477            code: "http_status",
478            method: None,
479            uri: None,
480            timeout_phase: None,
481            transport_kind: None,
482        })),
483    }
484}
485
486fn response_request_id(response: &reqx::Response) -> Option<String> {
487    response
488        .headers()
489        .get("x-request-id")
490        .or_else(|| response.headers().get("x-acs-request-id"))
491        .or_else(|| response.headers().get("x-amz-request-id"))
492        .or_else(|| response.headers().get("x-amz-id-2"))
493        .and_then(|value| value.to_str().ok())
494        .map(ToOwned::to_owned)
495}
496
497fn response_retry_after(response: &reqx::Response) -> Option<Duration> {
498    let header = response.headers().get("retry-after")?;
499    let value = header.to_str().ok()?;
500    parse_retry_after(value, SystemTime::now())
501}
502
503fn parse_retry_after(value: &str, now: SystemTime) -> Option<Duration> {
504    if let Ok(seconds) = value.trim().parse::<u64>() {
505        return Some(Duration::from_secs(seconds));
506    }
507
508    let when = httpdate::parse_http_date(value).ok()?;
509    when.duration_since(now).ok()
510}
511
512pub(crate) fn body_snippet_for_error(
513    body: &str,
514    body_snippet: BodySnippetConfig,
515) -> Option<String> {
516    if !body_snippet.enabled {
517        return None;
518    }
519
520    let snippet = truncate_snippet(body, body_snippet.max_bytes);
521    Some(redact_text(&snippet))
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use crate::util::url::normalize_base_url;
528
529    #[test]
530    fn build_webhook_url_without_secret_contains_token_only() {
531        let base_url = normalize_base_url(DEFAULT_WEBHOOK_BASE_URL).expect("base");
532        let url = build_webhook_url(&base_url, "token-123", None).expect("url");
533        assert_eq!(
534            url.as_str(),
535            "https://oapi.dingtalk.com/robot/send?access_token=token-123"
536        );
537    }
538
539    #[test]
540    fn api_error_response_is_detected() {
541        let body = r#"{"errcode":310000,"errmsg":"invalid"}"#;
542        let error = validate_standard_api_response(body, BodySnippetConfig::default())
543            .expect_err("should fail");
544        match error {
545            Error::Api {
546                code,
547                message,
548                body_snippet,
549                ..
550            } => {
551                assert_eq!(code, 310000);
552                assert_eq!(message, "invalid");
553                assert_eq!(
554                    body_snippet.as_deref(),
555                    Some("{\"errcode\":310000,\"errmsg\":\"invalid\"}")
556                );
557            }
558            other => panic!("unexpected error: {other:?}"),
559        }
560    }
561
562    #[test]
563    fn api_success_response_passes() {
564        let body = r#"{"errcode":0,"errmsg":"ok"}"#;
565        validate_standard_api_response(body, BodySnippetConfig::default()).expect("ok");
566    }
567
568    #[test]
569    fn api_error_response_can_disable_body_snippet() {
570        let body = r#"{"errcode":310000,"errmsg":"invalid"}"#;
571        let config = BodySnippetConfig {
572            enabled: false,
573            max_bytes: 64,
574        };
575        let error = validate_standard_api_response(body, config).expect_err("should fail");
576        match error {
577            Error::Api {
578                code,
579                message,
580                body_snippet,
581                ..
582            } => {
583                assert_eq!(code, 310000);
584                assert_eq!(message, "invalid");
585                assert_eq!(body_snippet, None);
586            }
587            other => panic!("unexpected error: {other:?}"),
588        }
589    }
590
591    #[test]
592    fn access_token_cache_honors_refresh_margin_per_credentials() {
593        let cache = AccessTokenCache::new();
594        let credentials = AppCredentials::new("app-key", "app-secret");
595        cache.store(credentials.clone(), "token".to_string(), Some(1));
596        assert!(cache.get(&credentials, Duration::from_secs(60)).is_none());
597
598        cache.store(credentials.clone(), "token".to_string(), Some(60));
599        assert_eq!(
600            cache.get(&credentials, Duration::from_secs(0)).as_deref(),
601            Some("token")
602        );
603    }
604}