rocketmq_common/utils/
http_tiny_client.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::io::Read;
19use std::io::{self};
20use std::str::FromStr;
21use std::time::Duration;
22
23use reqwest::blocking::Client;
24use reqwest::blocking::Response;
25use reqwest::header::HeaderMap;
26use reqwest::header::HeaderName;
27use reqwest::header::HeaderValue;
28use reqwest::header::CONTENT_TYPE;
29
30use crate::common::mq_version::RocketMqVersion;
31use crate::common::mq_version::CURRENT_VERSION;
32use crate::TimeUtils::get_current_millis;
33
34pub struct HttpTinyClient;
35
36/// HTTP response result (mirrors Java HttpResult)
37#[derive(Debug, Clone)]
38pub struct HttpResult {
39    /// HTTP status code
40    pub code: i32,
41    /// Response content
42    pub content: String,
43}
44
45impl HttpResult {
46    /// Create a new HttpResult
47    pub fn new(code: i32, content: String) -> Self {
48        Self { code, content }
49    }
50
51    /// Check if the response is successful (2xx status codes)
52    pub fn is_success(&self) -> bool {
53        self.code >= 200 && self.code < 300
54    }
55
56    /// Check if the response is OK (200 status code)
57    pub fn is_ok(&self) -> bool {
58        self.code == 200
59    }
60}
61
62impl std::fmt::Display for HttpResult {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        write!(
65            f,
66            "HttpResult(code: {}, content_length: {})",
67            self.code,
68            self.content.len()
69        )
70    }
71}
72
73impl HttpTinyClient {
74    /// Perform HTTP GET request
75    ///
76    /// # Arguments
77    /// * `url` - The URL to request
78    /// * `headers` - Optional list of headers (key-value pairs)
79    /// * `param_values` - Optional list of query parameters (key-value pairs)
80    /// * `encoding` - Character encoding (e.g., "UTF-8")
81    /// * `read_timeout_ms` - Timeout in milliseconds
82    ///
83    /// # Returns
84    /// `HttpResult` containing response code and content
85    pub fn http_get(
86        url: &str,
87        headers: Option<&[String]>,
88        param_values: Option<&[String]>,
89        encoding: &str,
90        read_timeout_ms: u64,
91    ) -> Result<HttpResult, io::Error> {
92        let encoded_content = Self::encoding_params(param_values, encoding)?;
93        let full_url = if let Some(params) = encoded_content {
94            format!("{url}?{params}")
95        } else {
96            url.to_string()
97        };
98
99        let client = Client::builder()
100            .timeout(Duration::from_millis(read_timeout_ms))
101            .build()
102            .map_err(io::Error::other)?;
103
104        let mut request_builder = client.get(&full_url);
105
106        // Set headers
107        request_builder = Self::set_headers(request_builder, headers, encoding);
108
109        let response = request_builder.send().map_err(io::Error::other)?;
110
111        let status_code = response.status().as_u16() as i32;
112
113        // Read response content based on status code
114        let content = if response.status().is_success() {
115            response.text()
116        } else {
117            // For error responses, still try to read the body
118            response.text()
119        }
120        .map_err(io::Error::other)?;
121
122        Ok(HttpResult::new(status_code, content))
123    }
124
125    /// Perform HTTP POST request
126    ///
127    /// # Arguments
128    /// * `url` - The URL to request
129    /// * `headers` - Optional list of headers (key-value pairs)
130    /// * `param_values` - Optional list of form parameters (key-value pairs)
131    /// * `encoding` - Character encoding (e.g., "UTF-8")
132    /// * `read_timeout_ms` - Timeout in milliseconds
133    ///
134    /// # Returns
135    /// `HttpResult` containing response code and content
136    pub fn http_post(
137        url: &str,
138        headers: Option<&[String]>,
139        param_values: Option<&[String]>,
140        encoding: &str,
141        read_timeout_ms: u64,
142    ) -> Result<HttpResult, io::Error> {
143        let encoded_content = Self::encoding_params(param_values, encoding)?.unwrap_or_default();
144
145        let client = Client::builder()
146            .timeout(Duration::from_millis(read_timeout_ms))
147            .connect_timeout(Duration::from_millis(3000)) // Fixed 3 second connect timeout like Java
148            .build()
149            .map_err(io::Error::other)?;
150
151        let mut request_builder = client.post(url);
152
153        // Set headers
154        request_builder = Self::set_headers(request_builder, headers, encoding);
155
156        // Set body content
157        request_builder = request_builder.body(encoded_content);
158
159        let response = request_builder.send().map_err(io::Error::other)?;
160
161        let status_code = response.status().as_u16() as i32;
162
163        // Read response content based on status code
164        let content = if response.status().is_success() {
165            response.text()
166        } else {
167            // For error responses, still try to read the body
168            response.text()
169        }
170        .map_err(io::Error::other)?;
171
172        Ok(HttpResult::new(status_code, content))
173    }
174
175    /// Encode parameters for URL or form data using form_urlencoded
176    fn encoding_params(
177        param_values: Option<&[String]>,
178        _encoding: &str,
179    ) -> Result<Option<String>, io::Error> {
180        let params = match param_values {
181            Some(params) if !params.is_empty() => params,
182            _ => return Ok(None),
183        };
184
185        if params.len() % 2 != 0 {
186            return Err(io::Error::new(
187                io::ErrorKind::InvalidInput,
188                "Parameter values must be in key-value pairs",
189            ));
190        }
191
192        let mut encoder = form_urlencoded::Serializer::new(String::new());
193
194        let mut iter = params.iter();
195        while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
196            encoder.append_pair(key, value);
197        }
198
199        let encoded = encoder.finish();
200        if encoded.is_empty() {
201            Ok(None)
202        } else {
203            Ok(Some(encoded))
204        }
205    }
206
207    /// Set headers on the request builder
208    fn set_headers(
209        mut request_builder: reqwest::blocking::RequestBuilder,
210        headers: Option<&[String]>,
211        encoding: &str,
212    ) -> reqwest::blocking::RequestBuilder {
213        // Set custom headers (key-value pairs)
214        if let Some(headers) = headers {
215            if headers.len() % 2 == 0 {
216                let mut iter = headers.iter();
217                while let (Some(key), Some(value)) = (iter.next(), iter.next()) {
218                    request_builder = request_builder.header(key, value);
219                }
220            }
221        }
222
223        // Set standard headers (matching Java implementation)
224        request_builder = request_builder
225            .header("Client-Version", CURRENT_VERSION.name())
226            .header(
227                "Content-Type",
228                format!("application/x-www-form-urlencoded;charset={encoding}"),
229            );
230
231        // Set timestamp header
232        let timestamp = get_current_millis();
233        request_builder = request_builder.header("Metaq-Client-RequestTS", timestamp.to_string());
234
235        request_builder
236    }
237}