1#[macro_use]
2extern crate log;
3
4use std::str::FromStr;
5
6use chrono::Utc;
7use digest::Digest;
8use hmac::{Hmac, Mac, NewMac};
9use itertools::Itertools;
10use md5::Md5;
11use quick_protobuf::MessageWrite;
12use reqwest::header::{
13 HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, USER_AGENT,
14};
15use reqwest::{Client, Method, RequestBuilder, Response, Url};
16use sha1::Sha1;
17
18pub use crate::error::{Error, Result};
19use crate::model::LogGroup;
20
21mod error;
22mod model;
23#[allow(dead_code)]
24mod headers {
25 pub const LOG_API_VERSION: &str = "x-log-apiversion";
26 pub const LOG_SIGNATURE_METHOD: &str = "x-log-signaturemethod";
27 pub const LOG_BODY_RAW_SIZE: &str = "x-log-bodyrawsize";
28 pub const LOG_COMPRESS_TYPE: &str = "x-log-compresstype";
29 pub const CONTENT_MD5: &str = "content-md5";
30}
31use headers::*;
32
33#[cfg(test)]
34mod test;
35
36type HmacSha1 = Hmac<Sha1>;
37
38pub const API_VERSION: &str = "0.6.0";
39pub const SIGNATURE_METHOD: &str = "hmac-sha1";
40pub const DEFAULT_CONTENT_TYPE: &str = "application/x-protobuf";
41pub const USER_AGENT_VALUE: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
42
43pub struct LogProducer {
44 access_key: Box<str>,
45 access_secret: Box<str>,
46 endpoint: Box<str>,
47 host: Box<str>,
48 logstore: Box<str>,
49 client: Client,
50}
51
52impl LogProducer {
53 pub fn new<K, S, E, P, L>(
54 access_key: K,
55 access_secret: S,
56 endpoint: E,
57 project: P,
58 logstore: L,
59 ) -> Result<Self>
60 where
61 K: AsRef<str>,
62 S: AsRef<str>,
63 E: AsRef<str>,
64 P: AsRef<str>,
65 L: AsRef<str>,
66 {
67 Ok(Self {
68 access_key: access_key.as_ref().into(),
69 access_secret: access_secret.as_ref().into(),
70 endpoint: endpoint.as_ref().into(),
71 host: format!("{}.{}", project.as_ref(), endpoint.as_ref()).into_boxed_str(),
72 logstore: logstore.as_ref().into(),
73 client: reqwest::ClientBuilder::new().build()?,
74 })
75 }
76
77 pub async fn put_logs_lb(&self, log_group: &LogGroup<'_>) -> Result<Response> {
79 let buf = log_group.encode()?;
80 let request = self
81 .new_request(
82 Method::POST,
83 format!("/logstores/{}/shards/lb", self.logstore),
84 )?
85 .header(LOG_BODY_RAW_SIZE, log_group.get_size())
86 .body(buf);
87
88 Ok(self.exec(request).await?)
89 }
90
91 fn new_request(&self, method: Method, path: String) -> Result<RequestBuilder> {
92 let url = Url::from_str(&*format!("https://{}{}", self.endpoint, path))?;
93 let date = Utc::now().format("%a,%d%b%Y %H:%M:%S GMT").to_string();
94 debug!("created request on {}", date);
95 let request = self
96 .client
97 .request(method, url)
98 .header(USER_AGENT, USER_AGENT_VALUE)
99 .header(DATE, date)
100 .header(HOST, &*self.host)
101 .header(LOG_API_VERSION, API_VERSION)
102 .header(LOG_SIGNATURE_METHOD, SIGNATURE_METHOD);
103
104 Ok(request)
105 }
106
107 async fn exec(&self, request: RequestBuilder) -> Result<reqwest::Response> {
108 let mut request = request.build()?;
109
110 let mut mac = HmacSha1::new_varkey(self.access_secret.as_bytes()).unwrap();
111 let verb = request.method().as_str();
118 debug!("-- method: {}", verb);
119 mac.update(verb.as_bytes());
120 mac.update(b"\n");
121
122 if request.body().is_some() {
123 debug!("-- body found");
124 let body = request.body().unwrap().as_bytes().unwrap();
125 let length = body.len();
126 let digest = Md5::digest(body);
127 let digest = hex::encode(digest).to_ascii_uppercase();
128 debug!("-- content-md5: {}", digest);
129 let md5 = request
130 .headers_mut()
131 .entry(CONTENT_MD5)
132 .or_insert_with(|| digest.parse().unwrap());
133 mac.update(md5.as_ref());
134 mac.update(b"\n");
135
136 request.headers_mut().insert(CONTENT_LENGTH, length.into());
138
139 let content_type = request
140 .headers_mut()
141 .entry(CONTENT_TYPE)
142 .or_insert_with(|| HeaderValue::from_static(DEFAULT_CONTENT_TYPE));
143 mac.update(content_type.as_ref());
144 mac.update(b"\n");
145 } else {
146 mac.update(b"\n\n");
147 }
148
149 let date = request.headers_mut().entry(DATE).or_insert_with(|| {
150 let date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
151 date.parse().unwrap()
152 });
153
154 mac.update(date.as_ref());
155 mac.update(b"\n");
156 request
162 .headers()
163 .iter()
164 .filter(|(key, _)| {
165 key.as_str().starts_with("x-log") || key.as_str().starts_with("x-acs")
166 })
167 .sorted_by_key(|(key, _)| key.as_str())
168 .map(|(key, value)| {
169 format!(
170 "{}:{}",
171 key.as_str().to_ascii_lowercase(),
172 value.to_str().unwrap()
173 )
174 })
175 .for_each(|next| {
176 debug!("-- header: {}", next);
177 mac.update(next.as_bytes());
178 mac.update(b"\n");
179 });
180
181 let url = request.url();
190 let path = url.path();
191 debug!("-- path: {}", path);
192 debug!("-- query: {:?}", url.query());
193 mac.update(path.as_bytes());
194 if url.query().is_some() {
195 mac.update(b"?");
196 let query_string = url
197 .query_pairs()
198 .map(|(key, value)| format!("{}={}", key, value))
199 .sorted()
200 .join("&");
201 mac.update(query_string.as_bytes());
202 }
203
204 let authorization = base64::encode(mac.finalize().into_bytes());
205 let authorization = format!("LOG {}:{}", self.access_key, authorization);
206 request
207 .headers_mut()
208 .insert(AUTHORIZATION, authorization.parse().unwrap());
209
210 Ok(self.client.execute(request).await?)
211 }
212}