aliyun_log_sdk/
lib.rs

1#[macro_use]
2extern crate log;
3
4use std::str::FromStr;
5
6use chrono::Utc;
7use digest::Digest;
8use hmac::{Hmac, Mac, NewMac};
9use itertools::Itertools;
10use md5::Md5;
11use quick_protobuf::MessageWrite;
12use reqwest::header::{
13    HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, USER_AGENT,
14};
15use reqwest::{Client, Method, RequestBuilder, Response, Url};
16use sha1::Sha1;
17
18pub use crate::error::{Error, Result};
19use crate::model::LogGroup;
20
21mod error;
22mod model;
23#[allow(dead_code)]
24mod headers {
25    pub const LOG_API_VERSION: &str = "x-log-apiversion";
26    pub const LOG_SIGNATURE_METHOD: &str = "x-log-signaturemethod";
27    pub const LOG_BODY_RAW_SIZE: &str = "x-log-bodyrawsize";
28    pub const LOG_COMPRESS_TYPE: &str = "x-log-compresstype";
29    pub const CONTENT_MD5: &str = "content-md5";
30}
31use headers::*;
32
33#[cfg(test)]
34mod test;
35
36type HmacSha1 = Hmac<Sha1>;
37
38pub const API_VERSION: &str = "0.6.0";
39pub const SIGNATURE_METHOD: &str = "hmac-sha1";
40pub const DEFAULT_CONTENT_TYPE: &str = "application/x-protobuf";
41pub const USER_AGENT_VALUE: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
42
43pub struct LogProducer {
44    access_key: Box<str>,
45    access_secret: Box<str>,
46    endpoint: Box<str>,
47    host: Box<str>,
48    logstore: Box<str>,
49    client: Client,
50}
51
52impl LogProducer {
53    pub fn new<K, S, E, P, L>(
54        access_key: K,
55        access_secret: S,
56        endpoint: E,
57        project: P,
58        logstore: L,
59    ) -> Result<Self>
60    where
61        K: AsRef<str>,
62        S: AsRef<str>,
63        E: AsRef<str>,
64        P: AsRef<str>,
65        L: AsRef<str>,
66    {
67        Ok(Self {
68            access_key: access_key.as_ref().into(),
69            access_secret: access_secret.as_ref().into(),
70            endpoint: endpoint.as_ref().into(),
71            host: format!("{}.{}", project.as_ref(), endpoint.as_ref()).into_boxed_str(),
72            logstore: logstore.as_ref().into(),
73            client: reqwest::ClientBuilder::new().build()?,
74        })
75    }
76
77    /// POST /logstores/logstoreName/shards/lb
78    pub async fn put_logs_lb(&self, log_group: &LogGroup<'_>) -> Result<Response> {
79        let buf = log_group.encode()?;
80        let request = self
81            .new_request(
82                Method::POST,
83                format!("/logstores/{}/shards/lb", self.logstore),
84            )?
85            .header(LOG_BODY_RAW_SIZE, log_group.get_size())
86            .body(buf);
87
88        Ok(self.exec(request).await?)
89    }
90
91    fn new_request(&self, method: Method, path: String) -> Result<RequestBuilder> {
92        let url = Url::from_str(&*format!("https://{}{}", self.endpoint, path))?;
93        let date = Utc::now().format("%a,%d%b%Y %H:%M:%S GMT").to_string();
94        debug!("created request on {}", date);
95        let request = self
96            .client
97            .request(method, url)
98            .header(USER_AGENT, USER_AGENT_VALUE)
99            .header(DATE, date)
100            .header(HOST, &*self.host)
101            .header(LOG_API_VERSION, API_VERSION)
102            .header(LOG_SIGNATURE_METHOD, SIGNATURE_METHOD);
103
104        Ok(request)
105    }
106
107    async fn exec(&self, request: RequestBuilder) -> Result<reqwest::Response> {
108        let mut request = request.build()?;
109
110        let mut mac = HmacSha1::new_varkey(self.access_secret.as_bytes()).unwrap();
111        // SignString = VERB + "\n"
112        //              + CONTENT-MD5 + "\n"
113        //              + CONTENT-TYPE + "\n"
114        //              + DATE + "\n"
115        //              + CanonicalizedLOGHeaders + "\n"
116        //              + CanonicalizedResource
117        let verb = request.method().as_str();
118        debug!("-- method: {}", verb);
119        mac.update(verb.as_bytes());
120        mac.update(b"\n");
121
122        if request.body().is_some() {
123            debug!("-- body found");
124            let body = request.body().unwrap().as_bytes().unwrap();
125            let length = body.len();
126            let digest = Md5::digest(body);
127            let digest = hex::encode(digest).to_ascii_uppercase();
128            debug!("-- content-md5: {}", digest);
129            let md5 = request
130                .headers_mut()
131                .entry(CONTENT_MD5)
132                .or_insert_with(|| digest.parse().unwrap());
133            mac.update(md5.as_ref());
134            mac.update(b"\n");
135
136            // Add CONTENT_LENGTH header
137            request.headers_mut().insert(CONTENT_LENGTH, length.into());
138
139            let content_type = request
140                .headers_mut()
141                .entry(CONTENT_TYPE)
142                .or_insert_with(|| HeaderValue::from_static(DEFAULT_CONTENT_TYPE));
143            mac.update(content_type.as_ref());
144            mac.update(b"\n");
145        } else {
146            mac.update(b"\n\n");
147        }
148
149        let date = request.headers_mut().entry(DATE).or_insert_with(|| {
150            let date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
151            date.parse().unwrap()
152        });
153
154        mac.update(date.as_ref());
155        mac.update(b"\n");
156        // CanonicalizedLOGHeaders的构造方式如下:
157        //     将所有以x-log和x-acs为前缀的HTTP请求头的名字转换成小写字母。
158        //     将上一步得到的所有LOG自定义请求头按照字典顺序进行升序排序。
159        //     删除请求头和内容之间分隔符两端出现的任何空格。
160        //     将所有的头和内容用\n分隔符组合成最后的CanonicalizedLOGHeader。
161        request
162            .headers()
163            .iter()
164            .filter(|(key, _)| {
165                key.as_str().starts_with("x-log") || key.as_str().starts_with("x-acs")
166            })
167            .sorted_by_key(|(key, _)| key.as_str())
168            .map(|(key, value)| {
169                format!(
170                    "{}:{}",
171                    key.as_str().to_ascii_lowercase(),
172                    value.to_str().unwrap()
173                )
174            })
175            .for_each(|next| {
176                debug!("-- header: {}", next);
177                mac.update(next.as_bytes());
178                mac.update(b"\n");
179            });
180
181        // CanonicalizedResource的构造方式如下:
182        // a. 将CanonicalizedResource设置为空字符串" "。
183        // b. 放入要访问的LOG资源,如/logstores/logstorename(如果没有logstorename则可不填写)。
184        // c. 如果请求包含查询字符串QUERY_STRING,则在CanonicalizedResource字符串尾部添加?和查询字符串。
185        //
186        // QUERY_STRING是URL中请求参数按字典顺序排序后的字符串,其中参数名和值之间用=相隔组成字符串,并对参数名-值对按照字典顺序升序排序,然后以&符号连接构成字符串。其公式化描述如下:
187        // QUERY_STRING = "KEY1=VALUE1" + "&" + "KEY2=VALUE2"
188
189        let url = request.url();
190        let path = url.path();
191        debug!("-- path: {}", path);
192        debug!("-- query: {:?}", url.query());
193        mac.update(path.as_bytes());
194        if url.query().is_some() {
195            mac.update(b"?");
196            let query_string = url
197                .query_pairs()
198                .map(|(key, value)| format!("{}={}", key, value))
199                .sorted()
200                .join("&");
201            mac.update(query_string.as_bytes());
202        }
203
204        let authorization = base64::encode(mac.finalize().into_bytes());
205        let authorization = format!("LOG {}:{}", self.access_key, authorization);
206        request
207            .headers_mut()
208            .insert(AUTHORIZATION, authorization.parse().unwrap());
209
210        Ok(self.client.execute(request).await?)
211    }
212}