1use crate::error::{Error, Result};
2use reqwest::{Method, header::HeaderMap};
3use serde_json::Value;
4use tracing::{debug, error, instrument, warn};
5
6use super::builder::HttpClient;
7
8impl HttpClient {
9 #[instrument(
34 name = "http_fetch",
35 skip(self, headers, body),
36 fields(method = %method, url = %url, timeout_ms = %self.config().timeout.as_millis())
37 )]
38 pub async fn fetch(
39 &self,
40 url: &str,
41 method: Method,
42 headers: Option<HeaderMap>,
43 body: Option<Value>,
44 ) -> Result<Value> {
45 if let Some(cb) = self.circuit_breaker() {
46 cb.allow_request()?;
47 }
48
49 #[allow(clippy::collapsible_if)]
50 if self.config().enable_rate_limit {
51 if let Some(limiter) = self.rate_limiter() {
52 limiter.wait().await;
53 }
54 }
55
56 let total_timeout = self.config().timeout;
57 let url_for_error = url.to_string();
58
59 let result = match tokio::time::timeout(
60 total_timeout,
61 self.execute_with_retry(|| {
62 let url = url.to_string();
63 let method = method.clone();
64 let headers = headers.clone();
65 let body = body.clone();
66 async move { self.fetch_once(&url, method, headers, body).await }
67 }),
68 )
69 .await
70 {
71 Ok(result) => result,
72 Err(_elapsed) => {
73 warn!(
74 url = %url_for_error,
75 timeout_ms = %total_timeout.as_millis(),
76 "HTTP request timed out (including retries)"
77 );
78 Err(Error::timeout(format!(
79 "Request to {} timed out after {}ms",
80 url_for_error,
81 total_timeout.as_millis()
82 )))
83 }
84 };
85
86 if let Some(cb) = self.circuit_breaker() {
87 match &result {
88 Ok(_) => cb.record_success(),
89 Err(_) => cb.record_failure(),
90 }
91 }
92
93 result
94 }
95
96 #[instrument(
97 name = "http_fetch_once",
98 skip(self, headers, body),
99 fields(method = %method, url = %url, has_body = body.is_some())
100 )]
101 async fn fetch_once(
102 &self,
103 url: &str,
104 method: Method,
105 headers: Option<HeaderMap>,
106 body: Option<Value>,
107 ) -> Result<Value> {
108 let mut request = self.client().request(method.clone(), url);
109
110 if let Some(headers) = headers {
111 request = request.headers(headers);
112 }
113
114 if let Some(ref body) = body {
115 let body_str = serde_json::to_string(body)
116 .map_err(|e| Error::invalid_request(format!("JSON serialization failed: {e}")))?;
117
118 if body_str.len() > self.config().max_request_size {
119 return Err(Error::invalid_request(format!(
120 "Request body {} bytes exceeds limit {} bytes",
121 body_str.len(),
122 self.config().max_request_size
123 )));
124 }
125
126 request = request.body(body_str);
127 }
128
129 if self.config().verbose {
130 if let Some(body) = &body {
131 debug!(
132 body = ?body,
133 "HTTP request with body"
134 );
135 } else {
136 debug!("HTTP request without body");
137 }
138 }
139
140 let response = request.send().await.map_err(|e| {
141 error!(
142 error = %e,
143 "HTTP request send failed"
144 );
145 Error::network(format!("Request failed: {e}"))
146 })?;
147
148 self.process_response_with_limit(response, url).await
149 }
150
151 #[instrument(name = "http_get", skip(self, headers), fields(url = %url))]
166 pub async fn get(&self, url: &str, headers: Option<HeaderMap>) -> Result<Value> {
167 self.fetch(url, Method::GET, headers, None).await
168 }
169
170 #[instrument(name = "http_post", skip(self, headers, body), fields(url = %url))]
186 pub async fn post(
187 &self,
188 url: &str,
189 headers: Option<HeaderMap>,
190 body: Option<Value>,
191 ) -> Result<Value> {
192 self.fetch(url, Method::POST, headers, body).await
193 }
194
195 #[instrument(name = "http_put", skip(self, headers, body), fields(url = %url))]
211 pub async fn put(
212 &self,
213 url: &str,
214 headers: Option<HeaderMap>,
215 body: Option<Value>,
216 ) -> Result<Value> {
217 self.fetch(url, Method::PUT, headers, body).await
218 }
219
220 #[instrument(name = "http_delete", skip(self, headers, body), fields(url = %url))]
236 pub async fn delete(
237 &self,
238 url: &str,
239 headers: Option<HeaderMap>,
240 body: Option<Value>,
241 ) -> Result<Value> {
242 self.fetch(url, Method::DELETE, headers, body).await
243 }
244}