1use std::sync::atomic::{AtomicI64, Ordering};
4use std::time::Duration;
5
6use reqwest::{Client, Method, RequestBuilder, Response};
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use tracing::{debug, trace, warn};
10
11use crate::auth::{current_timestamp_ms, headers, sign_rest_request};
12use crate::config::ClientConfig;
13use crate::error::{ApiResponse, BybitError};
14
15#[derive(Debug)]
17pub struct HttpClient {
18 client: Client,
19 config: ClientConfig,
20 time_offset: AtomicI64,
23}
24
25impl HttpClient {
26 pub fn new(config: ClientConfig) -> Result<Self, BybitError> {
28 let timeout = Duration::from_millis(config.timeout_ms);
29
30 let client = Client::builder()
31 .timeout(timeout)
32 .pool_max_idle_per_host(10)
33 .build()
34 .map_err(BybitError::Http)?;
35
36 Ok(Self {
37 client,
38 config,
39 time_offset: AtomicI64::new(0),
40 })
41 }
42
43 pub fn config(&self) -> &ClientConfig {
45 &self.config
46 }
47
48 pub fn time_offset(&self) -> i64 {
50 self.time_offset.load(Ordering::Relaxed)
51 }
52
53 pub fn set_time_offset(&self, offset_ms: i64) {
55 self.time_offset.store(offset_ms, Ordering::Relaxed);
56 }
57
58 fn get_timestamp(&self) -> u64 {
60 let local = current_timestamp_ms() as i64;
61 let offset = self.time_offset.load(Ordering::Relaxed);
62 (local + offset) as u64
63 }
64
65 fn build_url(&self, endpoint: &str) -> String {
67 format!("{}{}", self.config.get_rest_url(), endpoint)
68 }
69
70 pub async fn get<T, P>(&self, endpoint: &str, params: Option<&P>) -> Result<T, BybitError>
72 where
73 T: DeserializeOwned,
74 P: Serialize + ?Sized,
75 {
76 self.request(Method::GET, endpoint, params, None::<&()>, false)
77 .await
78 }
79
80 pub async fn get_signed<T, P>(
82 &self,
83 endpoint: &str,
84 params: Option<&P>,
85 ) -> Result<T, BybitError>
86 where
87 T: DeserializeOwned,
88 P: Serialize + ?Sized,
89 {
90 self.request(Method::GET, endpoint, params, None::<&()>, true)
91 .await
92 }
93
94 pub async fn post_signed<T, B>(
96 &self,
97 endpoint: &str,
98 body: Option<&B>,
99 ) -> Result<T, BybitError>
100 where
101 T: DeserializeOwned,
102 B: Serialize + ?Sized,
103 {
104 self.request(Method::POST, endpoint, None::<&()>, body, true)
105 .await
106 }
107
108 async fn request<T, P, B>(
110 &self,
111 method: Method,
112 endpoint: &str,
113 params: Option<&P>,
114 body: Option<&B>,
115 signed: bool,
116 ) -> Result<T, BybitError>
117 where
118 T: DeserializeOwned,
119 P: Serialize + ?Sized,
120 B: Serialize + ?Sized,
121 {
122 let url = self.build_url(endpoint);
123
124 let query_string = if let Some(p) = params {
125 serde_urlencoded::to_string(p).map_err(|e| {
126 BybitError::Serialization(serde_json::Error::io(std::io::Error::new(
127 std::io::ErrorKind::InvalidData,
128 e.to_string(),
129 )))
130 })?
131 } else {
132 String::new()
133 };
134
135 let body_string = if let Some(b) = body {
136 serde_json::to_string(b)?
137 } else {
138 String::new()
139 };
140
141 let full_url = if !query_string.is_empty() {
142 format!("{}?{}", url, query_string)
143 } else {
144 url.clone()
145 };
146
147 let mut request = self.client.request(method.clone(), &full_url);
148
149 if !body_string.is_empty() {
150 request = request
151 .header("Content-Type", "application/json")
152 .body(body_string.clone());
153 }
154
155 if signed {
156 request = self.add_auth_headers(request, &query_string, &body_string)?;
157 }
158
159 if let Some(ref referer) = self.config.referer {
160 request = request.header("Referer", referer);
161 }
162
163 if self.config.debug {
164 debug!(
165 method = %method,
166 url = %full_url,
167 signed = signed,
168 "Sending request"
169 );
170 if !body_string.is_empty() {
171 trace!(body = %body_string, "Request body");
172 }
173 }
174
175 let response = request.send().await?;
176
177 self.handle_response(response).await
178 }
179
180 fn add_auth_headers(
182 &self,
183 request: RequestBuilder,
184 query_string: &str,
185 body_string: &str,
186 ) -> Result<RequestBuilder, BybitError> {
187 let api_key = self
188 .config
189 .api_key
190 .as_ref()
191 .ok_or_else(|| BybitError::Auth("API key not configured".to_string()))?;
192
193 let api_secret = self
194 .config
195 .get_secret()
196 .ok_or_else(|| BybitError::Auth("API secret not configured".to_string()))?;
197
198 let timestamp = self.get_timestamp();
199 let recv_window = self.config.recv_window;
200
201 let payload = if body_string.is_empty() {
202 query_string
203 } else {
204 body_string
205 };
206
207 let signature = sign_rest_request(timestamp, api_key, recv_window, payload, api_secret);
208
209 Ok(request
210 .header(headers::API_KEY, api_key)
211 .header(headers::TIMESTAMP, timestamp.to_string())
212 .header(headers::SIGN, signature)
213 .header(headers::RECV_WINDOW, recv_window.to_string())
214 .header(headers::SIGN_TYPE, "2"))
215 }
216
217 async fn handle_response<T: DeserializeOwned>(
219 &self,
220 response: Response,
221 ) -> Result<T, BybitError> {
222 let status = response.status();
223
224 if self.config.debug {
225 debug!(status = %status, "Received response");
226 }
227
228 let body = response.text().await?;
229
230 if !status.is_success() {
231 warn!(status = %status, body = %body, "HTTP error");
232
233 if let Ok(api_response) = serde_json::from_str::<ApiResponse<serde_json::Value>>(&body)
234 {
235 return Err(BybitError::api_error(
236 api_response.ret_code,
237 api_response.ret_msg,
238 ));
239 }
240
241 return Err(BybitError::api_error(
242 status.as_u16() as i32,
243 format!("HTTP error {}: {}", status, body),
244 ));
245 }
246
247 if self.config.debug {
248 trace!(body = %body, "Response body");
249 }
250
251 let api_response: ApiResponse<T> = serde_json::from_str(&body)?;
252
253 api_response.into_result()
254 }
255
256 pub async fn sync_time(&self) -> Result<i64, BybitError> {
262 use crate::types::ServerTime;
263
264 let start = current_timestamp_ms();
265 let server_time: ServerTime = self.get("/v5/market/time", None::<&()>).await?;
266 let end = current_timestamp_ms();
267
268 let latency = (end - start) / 2;
269 let server_ms = server_time.as_millis();
270 let offset = server_ms as i64 - (end as i64) + (latency as i64);
271
272 self.time_offset.store(offset, Ordering::Relaxed);
273
274 debug!(
275 server_time = server_ms,
276 local_time = end,
277 latency = latency,
278 offset = offset,
279 "Time synchronized"
280 );
281
282 Ok(offset)
283 }
284}
285
286impl Clone for HttpClient {
287 fn clone(&self) -> Self {
288 Self {
289 client: self.client.clone(),
290 config: self.config.clone(),
291 time_offset: AtomicI64::new(self.time_offset.load(Ordering::Relaxed)),
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299
300 #[test]
301 fn test_build_url() {
302 let config = ClientConfig::default();
303 let client = match HttpClient::new(config) {
304 Ok(client) => client,
305 Err(err) => panic!("Failed to build HTTP client: {}", err),
306 };
307 let url = client.build_url("/v5/market/time");
308 assert_eq!(url, "https://api.bybit.com/v5/market/time");
309 }
310
311 #[test]
312 fn test_testnet_url() {
313 let config = ClientConfig::default().testnet();
314 let client = match HttpClient::new(config) {
315 Ok(client) => client,
316 Err(err) => panic!("Failed to build HTTP client: {}", err),
317 };
318 let url = client.build_url("/v5/market/time");
319 assert_eq!(url, "https://api-testnet.bybit.com/v5/market/time");
320 }
321
322 #[test]
323 fn test_time_offset() {
324 let config = ClientConfig::default();
325 let client = match HttpClient::new(config) {
326 Ok(client) => client,
327 Err(err) => panic!("Failed to build HTTP client: {}", err),
328 };
329
330 assert_eq!(client.time_offset(), 0);
331 client.set_time_offset(1000);
332 assert_eq!(client.time_offset(), 1000);
333 }
334}