tencentcloud_cls_sdk_rust/
lib.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::str::FromStr;
4
5use chrono::Utc;
6use reqwest::{Client, Method, RequestBuilder, Url};
7use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE, DATE, HOST, USER_AGENT};
8
9use crate::cls_log::{Log, LogGroup, LogGroupList};
10use crate::cls_log::mod_Log::Content;
11use crate::cls_log_json::Logs;
12use crate::consts::headers::{LOG_COMPRESS_TYPE, USER_AGENT_VALUE};
13use crate::error::LogProducerError;
14use crate::sign::signature;
15
16pub mod cls_log;
17mod cls_log_json;
18mod consts;
19pub mod error;
20pub mod sign;
21
22pub struct LogProducer {
23    access_key: String,
24    access_secret: String,
25    host: String,
26    client: Client,
27}
28
29impl LogProducer {
30    pub fn new(
31        access_key: String,
32        access_secret: String,
33        host: String,
34    ) -> Result<Self, LogProducerError> {
35        if access_key.is_empty() {
36            Err(LogProducerError::InvalidParameter {
37                error_message: "access_key is empty".to_string(),
38            })?;
39        }
40        if access_secret.is_empty() {
41            Err(LogProducerError::InvalidParameter {
42                error_message: "access_secret is empty".to_string(),
43            })?;
44        }
45        if host.is_empty() {
46            Err(LogProducerError::InvalidParameter {
47                error_message: "host is empty".to_string(),
48            })?;
49        }
50        Ok(Self {
51            access_key,
52            access_secret,
53            host,
54            client: reqwest::ClientBuilder::new().build()?,
55        })
56    }
57
58    pub async fn put_logs_json(
59        &self,
60        topic_id: String,
61        data: String,
62    ) -> Result<reqwest::Response, LogProducerError> {
63        let logs: Logs = serde_json::from_str(data.as_str()).unwrap();
64        let mut log_group_list = LogGroupList::default();
65        let mut log_group: LogGroup = LogGroup::default();
66        if logs.source.is_some() {
67            log_group.source = Option::Some(Cow::from(logs.source.unwrap()));
68        }
69
70        if logs.filename.is_some() {
71            log_group.filename = Option::Some(Cow::from(logs.filename.unwrap()));
72        }
73
74        if logs.hostname.is_some() {
75            log_group.hostname = Option::Some(Cow::from(logs.hostname.unwrap()));
76        }
77
78        logs.logs.iter().for_each(|item| {
79            let mut log: Log = Log::default();
80            log.time = item.time;
81            item.contents.iter().for_each(|content| {
82                log.contents
83                    .push(Content::new(content.key.as_str(), content.value.as_str()));
84            });
85            log_group.logs.push(log);
86        });
87
88        log_group_list.logGroupList.push(log_group);
89
90        Ok(self.put_logs(topic_id, &log_group_list).await?)
91    }
92
93    pub async fn put_logs(
94        &self,
95        topic_id: String,
96        log_group: &LogGroupList<'_>,
97    ) -> Result<reqwest::Response, LogProducerError> {
98        if topic_id.is_empty() {
99            Err(LogProducerError::InvalidParameter {
100                error_message: "topic_id is empty".to_string(),
101            })?;
102        }
103
104        let buf = log_group.encode()?;
105        let compressed = zstd::encode_all(buf.as_ref(), 3)?;
106        let request = self
107            .new_request(Method::POST, "/structuredlog".to_string())?
108            .query(&[("topic_id", topic_id)])
109            .header(CONTENT_LENGTH, compressed.len())
110            .header(CONTENT_TYPE, "application/x-protobuf")
111            .header(LOG_COMPRESS_TYPE, "zstd")
112            .body(compressed);
113
114        Ok(self.send(request).await?)
115    }
116
117    fn new_request(
118        &self,
119        method: Method,
120        path: String,
121    ) -> Result<RequestBuilder, LogProducerError> {
122        let url = Url::from_str(&*format!("https://{}{}", self.host, path))?;
123        let date = Utc::now().format("%a,%d%b%Y %H:%M:%S GMT").to_string();
124        let request = self
125            .client
126            .request(method, url)
127            .header(USER_AGENT, USER_AGENT_VALUE)
128            .header(DATE, date)
129            .header(HOST, &*self.host);
130        Ok(request)
131    }
132
133    async fn send(&self, request: RequestBuilder) -> Result<reqwest::Response, LogProducerError> {
134        let mut request = request.build()?;
135        let mut headers: HashMap<String, String> = HashMap::new();
136        let mut params: HashMap<String, String> = HashMap::new();
137        let _ = request.headers().iter().map(|(key, value)| {
138            headers.insert(key.to_string(), value.to_str().unwrap().to_string())
139        });
140        let pairs = request.url().query_pairs();
141        let _ = pairs.map(|(key, value)| params.insert(key.to_string(), value.to_string()));
142
143        let sign_str = signature(
144            self.access_key.clone().as_str(),
145            self.access_secret.clone().as_str(),
146            request.method().as_str(),
147            request.url().path(),
148            &params,
149            &headers,
150            300,
151        );
152        request
153            .headers_mut()
154            .insert("Authorization", sign_str.parse().unwrap());
155
156        Ok(self.client.execute(request).await?)
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use std::borrow::Cow;
163    
164    use crate::cls_log::{Log, LogGroup};
165    use crate::cls_log::mod_Log::Content;
166    
167    use super::*;
168    
169    #[test]
170    fn send_logs() {
171        // create a async runtime
172        let rt = tokio::runtime::Runtime::new().unwrap();
173        let producer = LogProducer::new(
174            "".to_string(),
175            "".to_string(),
176            "ap-guangzhou-open.cls.tencentcs.com".to_string(),
177        )
178        .unwrap();
179
180        // Create a new Log with default timestamp (now)
181        let mut log: Log = Log::default();
182        log.time = chrono::Local::now().timestamp_millis();
183        // Push K-V pairs to Log
184        log.contents.push(Content::new("level", "INFO"));
185        log.contents.push(Content::new("message", "startup"));
186        // Create LogGroup
187        let mut log_group: LogGroup = LogGroup::default();
188        log_group.source = Option::Some(Cow::from("127.0.0.1"));
189        log_group.logs.push(log);
190        let mut log_group_list = LogGroupList::default();
191        log_group_list.logGroupList.push(log_group);
192
193        let result = rt
194            .block_on(producer.put_logs("".to_string(), &log_group_list))
195            .unwrap();
196        let text = rt.block_on(result.text()).unwrap();
197        println!("{}", text);
198    }
199
200    #[test]
201    fn send_logs_json() {
202        // create a async runtime
203        let rt = tokio::runtime::Runtime::new().unwrap();
204        let producer = LogProducer::new(
205            "".to_string(),
206            "".to_string(),
207            "ap-guangzhou-open.cls.tencentcs.com".to_string(),
208        );
209
210        if let Ok(producer) = producer {
211            let logs = "{\"filename\":\"\",\"source\":\"127.0.0.2\",\"hostname\":\"\",\"logs\":[{\"time\":1718247083,\"contents\":[{\"value\":\"hello\",\"key\":\"world\"}]},{\"time\":1718247083,\"contents\":[{\"value\":\"hi\",\"key\":\"hey\"}]}]}";
212            let result = rt
213                .block_on(
214                    producer
215                        .put_logs_json("23eaa499-b7a9-4a60-a628-49a4239ddbba".to_string(), logs),
216                )
217                .unwrap();
218            let text = rt.block_on(result.text()).unwrap();
219            println!("{}", text);
220        } else {
221            println!("{}", "init producer failed");
222        }
223    }
224}