tencentcloud_cls_sdk_rust/
lib.rs1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::str::FromStr;
4
5use chrono::Utc;
6use reqwest::{Client, Method, RequestBuilder, Url};
7use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, USER_AGENT};
8
9use crate::cls_log::{Log, LogGroup, LogGroupList};
10use crate::cls_log::mod_Log::Content;
11use crate::cls_log_json::Logs;
12use crate::consts::headers::{LOG_COMPRESS_TYPE, USER_AGENT_VALUE};
13use crate::error::LogProducerError;
14use crate::sign::signature;
15
16pub mod cls_log;
17mod cls_log_json;
18mod consts;
19pub mod error;
20pub mod sign;
21
22pub struct LogProducer {
23 access_key: String,
24 access_secret: String,
25 host: String,
26 client: Client,
27}
28
29impl LogProducer {
30 pub fn new(
31 access_key: String,
32 access_secret: String,
33 host: String,
34 ) -> Result<Self, LogProducerError> {
35 if access_key.is_empty() {
36 Err(LogProducerError::InvalidParameter {
37 error_message: "access_key is empty".to_string(),
38 })?;
39 }
40 if access_secret.is_empty() {
41 Err(LogProducerError::InvalidParameter {
42 error_message: "access_secret is empty".to_string(),
43 })?;
44 }
45 if host.is_empty() {
46 Err(LogProducerError::InvalidParameter {
47 error_message: "host is empty".to_string(),
48 })?;
49 }
50 Ok(Self {
51 access_key,
52 access_secret,
53 host,
54 client: reqwest::ClientBuilder::new().build()?,
55 })
56 }
57
58 pub async fn put_logs_json(
59 &self,
60 topic_id: String,
61 data: String,
62 ) -> Result<reqwest::Response, LogProducerError> {
63 let logs: Logs = serde_json::from_str(data.as_str()).unwrap();
64 let mut log_group_list = LogGroupList::default();
65 let mut log_group: LogGroup = LogGroup::default();
66 if logs.source.is_some() {
67 log_group.source = Option::Some(Cow::from(logs.source.unwrap()));
68 }
69
70 if logs.filename.is_some() {
71 log_group.filename = Option::Some(Cow::from(logs.filename.unwrap()));
72 }
73
74 if logs.hostname.is_some() {
75 log_group.hostname = Option::Some(Cow::from(logs.hostname.unwrap()));
76 }
77
78 logs.logs.iter().for_each(|item| {
79 let mut log: Log = Log::default();
80 log.time = item.time;
81 item.contents.iter().for_each(|content| {
82 log.contents
83 .push(Content::new(content.key.as_str(), content.value.as_str()));
84 });
85 log_group.logs.push(log);
86 });
87
88 log_group_list.logGroupList.push(log_group);
89
90 Ok(self.put_logs(topic_id, &log_group_list).await?)
91 }
92
93 pub async fn put_logs(
94 &self,
95 topic_id: String,
96 log_group: &LogGroupList<'_>,
97 ) -> Result<reqwest::Response, LogProducerError> {
98 if topic_id.is_empty() {
99 Err(LogProducerError::InvalidParameter {
100 error_message: "topic_id is empty".to_string(),
101 })?;
102 }
103
104 let buf = log_group.encode()?;
105 let compressed = zstd::encode_all(buf.as_ref(), 3)?;
106 let request = self
107 .new_request(Method::POST, "/structuredlog".to_string())?
108 .query(&[("topic_id", topic_id)])
109 .header(CONTENT_LENGTH, compressed.len())
110 .header(CONTENT_TYPE, "application/x-protobuf")
111 .header(LOG_COMPRESS_TYPE, "zstd")
112 .body(compressed);
113
114 Ok(self.send(request).await?)
115 }
116
117 fn new_request(
118 &self,
119 method: Method,
120 path: String,
121 ) -> Result<RequestBuilder, LogProducerError> {
122 let url = Url::from_str(&*format!("https://{}{}", self.host, path))?;
123 let date = Utc::now().format("%a,%d%b%Y %H:%M:%S GMT").to_string();
124 let request = self
125 .client
126 .request(method, url)
127 .header(USER_AGENT, USER_AGENT_VALUE)
128 .header(DATE, date)
129 .header(HOST, &*self.host);
130 Ok(request)
131 }
132
133 async fn send(&self, request: RequestBuilder) -> Result<reqwest::Response, LogProducerError> {
134 let mut request = request.build()?;
135 let mut headers: HashMap<String, String> = HashMap::new();
136 let mut params: HashMap<String, String> = HashMap::new();
137 let _ = request.headers().iter().map(|(key, value)| {
138 headers.insert(key.to_string(), value.to_str().unwrap().to_string())
139 });
140 let pairs = request.url().query_pairs();
141 let _ = pairs.map(|(key, value)| params.insert(key.to_string(), value.to_string()));
142
143 let sign_str = signature(
144 self.access_key.clone().as_str(),
145 self.access_secret.clone().as_str(),
146 request.method().as_str(),
147 request.url().path(),
148 ¶ms,
149 &headers,
150 300,
151 );
152 request
153 .headers_mut()
154 .insert("Authorization", sign_str.parse().unwrap());
155
156 Ok(self.client.execute(request).await?)
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use std::borrow::Cow;
163
164 use crate::cls_log::{Log, LogGroup};
165 use crate::cls_log::mod_Log::Content;
166
167 use super::*;
168
169 #[test]
170 fn send_logs() {
171 let rt = tokio::runtime::Runtime::new().unwrap();
173 let producer = LogProducer::new(
174 "".to_string(),
175 "".to_string(),
176 "ap-guangzhou-open.cls.tencentcs.com".to_string(),
177 )
178 .unwrap();
179
180 let mut log: Log = Log::default();
182 log.time = chrono::Local::now().timestamp_millis();
183 log.contents.push(Content::new("level", "INFO"));
185 log.contents.push(Content::new("message", "startup"));
186 let mut log_group: LogGroup = LogGroup::default();
188 log_group.source = Option::Some(Cow::from("127.0.0.1"));
189 log_group.logs.push(log);
190 let mut log_group_list = LogGroupList::default();
191 log_group_list.logGroupList.push(log_group);
192
193 let result = rt
194 .block_on(producer.put_logs("".to_string(), &log_group_list))
195 .unwrap();
196 let text = rt.block_on(result.text()).unwrap();
197 println!("{}", text);
198 }
199
200 #[test]
201 fn send_logs_json() {
202 let rt = tokio::runtime::Runtime::new().unwrap();
204 let producer = LogProducer::new(
205 "".to_string(),
206 "".to_string(),
207 "ap-guangzhou-open.cls.tencentcs.com".to_string(),
208 );
209
210 if let Ok(producer) = producer {
211 let logs = "{\"filename\":\"\",\"source\":\"127.0.0.2\",\"hostname\":\"\",\"logs\":[{\"time\":1718247083,\"contents\":[{\"value\":\"hello\",\"key\":\"world\"}]},{\"time\":1718247083,\"contents\":[{\"value\":\"hi\",\"key\":\"hey\"}]}]}";
212 let result = rt
213 .block_on(
214 producer
215 .put_logs_json("23eaa499-b7a9-4a60-a628-49a4239ddbba".to_string(), logs),
216 )
217 .unwrap();
218 let text = rt.block_on(result.text()).unwrap();
219 println!("{}", text);
220 } else {
221 println!("{}", "init producer failed");
222 }
223 }
224}