rocketmq_common/utils/
http_tiny_client.rs1use std::collections::HashMap;
18use std::io::Read;
19use std::io::{self};
20use std::str::FromStr;
21use std::time::Duration;
22
23use reqwest::blocking::Client;
24use reqwest::blocking::Response;
25use reqwest::header::HeaderMap;
26use reqwest::header::HeaderName;
27use reqwest::header::HeaderValue;
28use reqwest::header::CONTENT_TYPE;
29
30use crate::common::mq_version::RocketMqVersion;
31use crate::common::mq_version::CURRENT_VERSION;
32use crate::TimeUtils::get_current_millis;
33
34pub struct HttpTinyClient;
35
36#[derive(Debug, Clone)]
38pub struct HttpResult {
39 pub code: i32,
41 pub content: String,
43}
44
45impl HttpResult {
46 pub fn new(code: i32, content: String) -> Self {
48 Self { code, content }
49 }
50
51 pub fn is_success(&self) -> bool {
53 self.code >= 200 && self.code < 300
54 }
55
56 pub fn is_ok(&self) -> bool {
58 self.code == 200
59 }
60}
61
62impl std::fmt::Display for HttpResult {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(
65 f,
66 "HttpResult(code: {}, content_length: {})",
67 self.code,
68 self.content.len()
69 )
70 }
71}
72
73impl HttpTinyClient {
74 pub fn http_get(
86 url: &str,
87 headers: Option<&[String]>,
88 param_values: Option<&[String]>,
89 encoding: &str,
90 read_timeout_ms: u64,
91 ) -> Result<HttpResult, io::Error> {
92 let encoded_content = Self::encoding_params(param_values, encoding)?;
93 let full_url = if let Some(params) = encoded_content {
94 format!("{url}?{params}")
95 } else {
96 url.to_string()
97 };
98
99 let client = Client::builder()
100 .timeout(Duration::from_millis(read_timeout_ms))
101 .build()
102 .map_err(io::Error::other)?;
103
104 let mut request_builder = client.get(&full_url);
105
106 request_builder = Self::set_headers(request_builder, headers, encoding);
108
109 let response = request_builder.send().map_err(io::Error::other)?;
110
111 let status_code = response.status().as_u16() as i32;
112
113 let content = if response.status().is_success() {
115 response.text()
116 } else {
117 response.text()
119 }
120 .map_err(io::Error::other)?;
121
122 Ok(HttpResult::new(status_code, content))
123 }
124
125 pub fn http_post(
137 url: &str,
138 headers: Option<&[String]>,
139 param_values: Option<&[String]>,
140 encoding: &str,
141 read_timeout_ms: u64,
142 ) -> Result<HttpResult, io::Error> {
143 let encoded_content = Self::encoding_params(param_values, encoding)?.unwrap_or_default();
144
145 let client = Client::builder()
146 .timeout(Duration::from_millis(read_timeout_ms))
147 .connect_timeout(Duration::from_millis(3000)) .build()
149 .map_err(io::Error::other)?;
150
151 let mut request_builder = client.post(url);
152
153 request_builder = Self::set_headers(request_builder, headers, encoding);
155
156 request_builder = request_builder.body(encoded_content);
158
159 let response = request_builder.send().map_err(io::Error::other)?;
160
161 let status_code = response.status().as_u16() as i32;
162
163 let content = if response.status().is_success() {
165 response.text()
166 } else {
167 response.text()
169 }
170 .map_err(io::Error::other)?;
171
172 Ok(HttpResult::new(status_code, content))
173 }
174
175 fn encoding_params(
177 param_values: Option<&[String]>,
178 _encoding: &str,
179 ) -> Result<Option<String>, io::Error> {
180 let params = match param_values {
181 Some(params) if !params.is_empty() => params,
182 _ => return Ok(None),
183 };
184
185 if params.len() % 2 != 0 {
186 return Err(io::Error::new(
187 io::ErrorKind::InvalidInput,
188 "Parameter values must be in key-value pairs",
189 ));
190 }
191
192 let mut encoder = form_urlencoded::Serializer::new(String::new());
193
194 let mut iter = params.iter();
195 while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
196 encoder.append_pair(key, value);
197 }
198
199 let encoded = encoder.finish();
200 if encoded.is_empty() {
201 Ok(None)
202 } else {
203 Ok(Some(encoded))
204 }
205 }
206
207 fn set_headers(
209 mut request_builder: reqwest::blocking::RequestBuilder,
210 headers: Option<&[String]>,
211 encoding: &str,
212 ) -> reqwest::blocking::RequestBuilder {
213 if let Some(headers) = headers {
215 if headers.len() % 2 == 0 {
216 let mut iter = headers.iter();
217 while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
218 request_builder = request_builder.header(key, value);
219 }
220 }
221 }
222
223 request_builder = request_builder
225 .header("Client-Version", CURRENT_VERSION.name())
226 .header(
227 "Content-Type",
228 format!("application/x-www-form-urlencoded;charset={encoding}"),
229 );
230
231 let timestamp = get_current_millis();
233 request_builder = request_builder.header("Metaq-Client-RequestTS", timestamp.to_string());
234
235 request_builder
236 }
237}