aliyun_openapi_core_rust_sdk/client/
log_service.rs

1use std::{collections::HashMap, time::Duration};
2
3use hmac::{Hmac, Mac};
4use md5::{Digest, Md5};
5use reqwest::{
6    header::{HeaderMap, HeaderValue},
7    ClientBuilder, Response,
8};
9use serde::{de::DeserializeOwned, Deserialize, Serialize};
10use sha1::Sha1;
11use time::{macros::format_description, OffsetDateTime};
12
13use crate::client::error::{Error, Result};
14
15#[derive(Debug, Deserialize, Serialize)]
16#[serde(rename_all = "camelCase")]
17pub struct LogServiceError {
18    /// error code
19    pub error_code: String,
20    /// error message
21    pub error_message: String,
22}
23
24/// Default const header.
25const AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
26const DEFAULT_HEADER: &[(&str, &str)] = &[
27    ("x-log-apiversion", "0.6.0"),
28    ("x-log-signaturemethod", "hmac-sha1"),
29    ("user-agent", AGENT),
30    ("x-sdk-client", AGENT),
31];
32
33type HamcSha1 = Hmac<Sha1>;
34
35/// Config for request.
36#[derive(Clone, Debug, Default)]
37struct Request {
38    method: String,
39    uri: String,
40    body: Option<String>,
41    query: Vec<(String, String)>,
42    headers: HeaderMap,
43    project: Option<String>,
44    timeout: Option<Duration>,
45}
46
47#[derive(Clone, Debug)]
48pub struct LogServiceClient {
49    /// The access key id of aliyun developer account.
50    access_key_id: String,
51    /// The access key secret of aliyun developer account.
52    access_key_secret: String,
53    /// The api endpoint of aliyun api service (need start with http:// or https://).
54    endpoint: String,
55    /// The config of http request.
56    request: Request,
57}
58
59impl LogServiceClient {
60    /// Create a api client.
61    pub fn new(
62        access_key_id: impl Into<String>,
63        access_key_secret: impl Into<String>,
64        endpoint: impl Into<String>,
65    ) -> Self {
66        LogServiceClient {
67            access_key_id: access_key_id.into(),
68            access_key_secret: access_key_secret.into(),
69            endpoint: endpoint.into(),
70            request: Default::default(),
71        }
72    }
73
74    /// Create a request with the `method` and `uri`.
75    ///
76    /// Returns a `Self` for send request.
77    pub fn request(mut self, method: impl Into<String>, uri: impl Into<String>) -> Self {
78        self.request.method = method.into();
79        self.request.uri = uri.into();
80
81        self
82    }
83
84    /// Create a `GET` request with the `uri`.
85    ///
86    /// Returns a `Self` for send request.
87    pub fn get(self, uri: impl Into<String>) -> Self {
88        self.request("GET".to_string(), uri.into())
89    }
90
91    /// Create a `POST` request with the `uri`.
92    ///
93    /// Returns a `Self` for send request.
94    pub fn post(self, uri: impl Into<String>) -> Self {
95        self.request("POST".to_string(), uri.into())
96    }
97
98    /// Set queries for request.
99    ///
100    /// Returns a `Self` for send request.
101    pub fn query(mut self, queries: impl Into<Vec<(String, String)>>) -> Self {
102        self.request.query = queries.into();
103
104        self
105    }
106
107    /// Set body for request.
108    ///
109    /// Returns a `Self` for send request.
110    pub fn body(mut self, body: impl Into<String>) -> Result<Self> {
111        // compute body length and md5.
112        let body = body.into();
113        let mut hasher = Md5::new();
114        hasher.update(body.as_bytes());
115        let md5_result = hasher.finalize();
116
117        // update headers.
118        self.request
119            .headers
120            .insert("content-length", body.len().to_string().parse()?);
121        self.request.headers.insert(
122            "content-md5",
123            base16ct::upper::encode_string(&md5_result).parse()?,
124        );
125
126        // store body string.
127        self.request.body = Some(body);
128
129        Ok(self)
130    }
131
132    /// Set header for request.
133    ///
134    /// Returns a `Self` for send request.
135    pub fn header(mut self, headers: impl Into<HashMap<String, String>>) -> Result<Self> {
136        self.request.headers = (&headers.into())
137            .try_into()
138            .map_err(|e| Error::InvalidRequest(format!("Cannot parse header: {}", e)))?;
139        Ok(self)
140    }
141
142    /// Set project for request.
143    ///
144    /// Returns a `Self` for send request.
145    pub fn project(mut self, project: impl Into<String>) -> Self {
146        self.request.project = Some(project.into());
147
148        self
149    }
150
151    /// Set a timeout for connect, read and write operations of a `Client`.
152    ///
153    /// Default is no timeout.
154    pub fn timeout(mut self, timeout: Duration) -> Self {
155        self.request.timeout = Some(timeout);
156
157        self
158    }
159
160    /// Send a request to service.
161    /// Try to deserialize the response body as JSON.
162    pub async fn json<T: DeserializeOwned>(self) -> Result<T> {
163        Ok(self.send().await?.json::<T>().await?)
164    }
165
166    /// Send a request to service.
167    /// Try to deserialize the response body as TEXT.
168    pub async fn text(self) -> Result<String> {
169        Ok(self.send().await?.text().await?)
170    }
171
172    /// Send a request to service.
173    /// Return client Response.
174    pub async fn send(mut self) -> Result<Response> {
175        // check special header
176        if !self.request.headers.contains_key("x-log-bodyrawsize") {
177            self.request
178                .headers
179                .insert("x-log-bodyrawsize", "0".parse()?);
180        }
181        if !self.request.headers.contains_key("accept") {
182            self.request
183                .headers
184                .insert("accept", "application/json".parse()?);
185        }
186
187        // add const header
188        for (k, v) in DEFAULT_HEADER.iter() {
189            self.request.headers.insert(*k, v.parse()?);
190        }
191
192        // add host header.
193        let mut prefix = "";
194        let mut host = self.endpoint.clone();
195        if let Some(endpoint) = self.endpoint.strip_prefix("http://") {
196            prefix = "http://";
197            host = endpoint.to_string();
198        } else if let Some(endpoint) = self.endpoint.strip_prefix("https://") {
199            prefix = "https://";
200            host = endpoint.to_string();
201        }
202        if let Some(project) = self.request.project.as_ref() {
203            host = format!("{}.{}", project, host);
204        }
205        self.request.headers.insert("host", host.parse()?);
206
207        // add date header.
208        // RFC 1123: %a, %d %b %Y %H:%M:%S GMT
209        let format = format_description!(
210            "[weekday repr:short], [day] [month repr:short] [year] [hour]:[minute]:[second] GMT"
211        );
212        let ts = OffsetDateTime::now_utc()
213            .format(&format)
214            .map_err(|e| Error::InvalidRequest(format!("Invalid RFC 1123 Date: {}", e)))?;
215        self.request.headers.insert("date", ts.parse()?);
216
217        // compute `Authorization` field.
218        // Authorization = "SLS <AccessKeyId>:<Signature>"
219        let authorization = format!("SLS {}:{}", self.access_key_id, self.signature()?);
220        self.request
221            .headers
222            .insert("Authorization", authorization.parse()?);
223
224        // build http client.
225        let final_url = format!("{}{}{}", prefix, host, self.request.uri);
226        let mut http_client_builder = ClientBuilder::new();
227        if let Some(timeout) = self.request.timeout {
228            http_client_builder = http_client_builder.timeout(timeout);
229        }
230        let mut http_client = http_client_builder.build()?.request(
231            self.request
232                .method
233                .parse()
234                .map_err(|e| Error::InvalidRequest(format!("Invalid HTTP method: {}", e)))?,
235            &final_url,
236        );
237
238        // set body.
239        if let Some(body) = self.request.body {
240            http_client = http_client.body(body);
241        }
242
243        // set query.
244        if !self.request.query.is_empty() {
245            http_client = http_client.query(&self.request.query);
246        }
247
248        // send request.
249        let response = http_client.headers(self.request.headers).send().await?;
250
251        // check HTTP StatusCode.
252        if !response.status().is_success() {
253            let result = response.json::<LogServiceError>().await?;
254            return Err(Error::InvalidResponse {
255                request_id: "".to_string(),
256                error_code: result.error_code,
257                error_message: result.error_message,
258            });
259        }
260
261        // return response.
262        Ok(response)
263    }
264
265    /// Compute canonicalized headers.
266    fn canonicalized_headers(&self) -> String {
267        let mut headers: Vec<(String, String)> = self
268            .request
269            .headers
270            .iter()
271            .filter_map(|(k, v)| {
272                let k = k.as_str().to_lowercase();
273                if k.starts_with("x-acs-") || k.starts_with("x-log-") {
274                    Some((k, v.to_str().unwrap().to_string()))
275                } else {
276                    None
277                }
278            })
279            .collect();
280        headers.sort_by(|a, b| a.0.cmp(&b.0));
281
282        let headers: Vec<String> = headers
283            .iter()
284            .map(|(k, v)| format!("{}:{}", k, v))
285            .collect();
286
287        headers.join("\n")
288    }
289
290    /// Compute canonicalized resource.
291    fn canonicalized_resource(&self) -> String {
292        if !self.request.query.is_empty() {
293            let mut params = self.request.query.clone();
294            params.sort_by_key(|item| item.0.clone());
295            let params: Vec<String> = params.iter().map(|(k, v)| format!("{}={}", k, v)).collect();
296            let sorted_query_string = params.join("&");
297            format!("{}?{}", self.request.uri, sorted_query_string)
298        } else {
299            self.request.uri.clone()
300        }
301    }
302
303    /// Compute signature for request.
304    fn signature(&self) -> Result<String> {
305        // build body.
306        let canonicalized_headers = self.canonicalized_headers();
307        let canonicalized_resource = self.canonicalized_resource();
308        let body = format!(
309            "{}\n{}\n{}\n{}\n{}\n{}",
310            self.request.method.to_uppercase(),
311            self.request
312                .headers
313                .get("content-md5")
314                .unwrap_or(&HeaderValue::from_static(""))
315                .to_str()
316                .unwrap(),
317            self.request
318                .headers
319                .get("content-type")
320                .unwrap_or(&HeaderValue::from_static(""))
321                .to_str()
322                .unwrap(),
323            self.request.headers["date"].to_str().unwrap(),
324            canonicalized_headers,
325            canonicalized_resource
326        );
327
328        // sign body.
329        let mut mac = HamcSha1::new_from_slice(self.access_key_secret.as_bytes())
330            .map_err(|e| Error::InvalidRequest(format!("Invalid HMAC-SHA1 secret key: {}", e)))?;
331        mac.update(body.as_bytes());
332        let result = mac.finalize();
333        let code = result.into_bytes();
334
335        Ok(base64::encode(code))
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use std::env;
343
344    #[tokio::test]
345    async fn get_log_store_test() -> Result<()> {
346        let client = LogServiceClient::new(
347            env::var("ACCESS_KEY_ID").unwrap(),
348            env::var("ACCESS_KEY_SECRET").unwrap(),
349            "https://cn-hangzhou.log.aliyuncs.com",
350        );
351
352        let err = client
353            .get(format!("/logstores/{}", "logstore"))
354            .project("project")
355            .text()
356            .await
357            .unwrap_err();
358
359        // send request
360        if let Error::InvalidResponse {
361            request_id: _,
362            error_code,
363            error_message: _,
364        } = err
365        {
366            assert_eq!(error_code, "Unauthorized");
367        } else {
368            return Err(err);
369        }
370
371        Ok(())
372    }
373}