1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18 tokens: u64,
19 last_refill: Instant,
20}
21
22#[derive(Debug)]
24struct RateLimiter {
25 capacity: u64,
26 refill_per_sec: u64,
27 state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31 fn new(capacity: u64, refill_per_sec: u64) -> Self {
32 Self {
33 capacity,
34 refill_per_sec,
35 state: Mutex::new(RateLimiterState {
36 tokens: capacity,
37 last_refill: Instant::now(),
38 }),
39 }
40 }
41
42 fn from_env() -> Self {
43 let rps = std::env::var("INDODAX_RATE_LIMIT")
44 .ok()
45 .and_then(|v| v.parse::<u64>().ok())
46 .unwrap_or(5)
47 .max(1);
48 Self::new(rps, rps)
49 }
50
51 async fn acquire(&self) {
52 loop {
53 let mut state = self.state.lock().await;
54 let elapsed = state.last_refill.elapsed();
55 if elapsed >= Duration::from_secs(1) {
56 let secs = elapsed.as_secs();
57 let add = self.refill_per_sec * secs;
58 state.tokens = state.tokens.saturating_add(add).min(self.capacity);
59 state.last_refill += Duration::from_secs(secs);
60 }
61 if state.tokens > 0 {
62 state.tokens -= 1;
63 return;
64 }
65 let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
66 let wait = if elapsed_ms < 1000 {
67 Duration::from_millis(1000 - elapsed_ms)
68 } else {
69 Duration::from_millis(50)
70 };
71 drop(state);
72 tokio::time::sleep(wait).await;
73 }
74 }
75}
76
77#[derive(Debug)]
78pub struct IndodaxClient {
79 http: Client,
80 signer: Option<Signer>,
81 rate_limiter: RateLimiter,
82}
83
84#[derive(Debug, serde::Deserialize)]
85pub struct IndodaxV1Response<T> {
86 pub success: i32,
87 #[serde(rename = "return")]
88 pub return_data: Option<T>,
89 pub error: Option<String>,
90 pub error_code: Option<String>,
91}
92
93#[derive(Debug, serde::Deserialize)]
94pub struct IndodaxV2Response<T> {
95 pub data: Option<T>,
96 pub code: Option<i64>,
97 pub error: Option<String>,
98}
99
100impl IndodaxClient {
101 pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
102 let http = Client::builder()
103 .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
104 .timeout(Duration::from_secs(30))
105 .pool_max_idle_per_host(2)
106 .build()
107 .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
108
109 Ok(Self {
110 http,
111 signer,
112 rate_limiter: RateLimiter::from_env(),
113 })
114 }
115
116 pub fn signer(&self) -> Option<&Signer> {
117 self.signer.as_ref()
118 }
119
120 pub fn http_client(&self) -> &Client {
121 &self.http
122 }
123
124 pub async fn public_get<T: DeserializeOwned>(
125 &self,
126 path: &str,
127 ) -> Result<T, IndodaxError> {
128 let url = format!("{}{}", PUBLIC_BASE_URL, path);
129 let resp = self.retry_get(&url).await?;
130 self.handle_response(resp).await
131 }
132
133 pub async fn countdown_cancel_all(
134 &self,
135 pair: Option<&str>,
136 countdown_time: u64,
137 ) -> Result<serde_json::Value, IndodaxError> {
138 let signer = self.signer.as_ref().ok_or_else(|| {
139 IndodaxError::Config("API credentials required for countdown cancel all".into())
140 })?;
141
142 let mut body_parts: Vec<String> = vec![
143 format!("countdownTime={}", countdown_time),
144 ];
145 if let Some(p) = pair {
146 body_parts.push(format!("pair={}", p));
147 }
148
149 let body = body_parts.join("&");
150 let (payload, signature) = signer.sign_v1(&body)?;
151
152 let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
153 let req = self
154 .http
155 .post(&url)
156 .header("Key", signer.api_key())
157 .header("Sign", &signature)
158 .header("Content-Type", "application/x-www-form-urlencoded")
159 .body(payload);
160 let resp = self.send_with_retry(req).await?;
161 self.handle_v1_response(resp).await
162 }
163
164 pub async fn generate_ws_token(&self) -> Result<String, IndodaxError> {
165 let signer = self.signer.as_ref().ok_or_else(|| {
166 IndodaxError::Config("API credentials required for WebSocket token generation".into())
167 })?;
168
169 let nonce = signer.next_nonce_str();
170 let (_, signature) = signer.sign_v1(&nonce)?;
171
172 let req = self
173 .http
174 .post(WS_TOKEN_URL)
175 .header("Key", signer.api_key())
176 .header("Sign", &signature)
177 .header("Content-Type", "application/x-www-form-urlencoded")
178 .body(format!("nonce={}", nonce));
179 let resp = self.send_with_retry(req).await?;
180
181 let body_text = resp.text().await?;
182 let val: serde_json::Value = serde_json::from_str(&body_text)?;
183
184 val.get("token")
185 .and_then(|t| t.as_str())
186 .map(|t| t.to_string())
187 .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()).map(|t| t.to_string()))
188 .ok_or_else(|| IndodaxError::WsToken(format!("No token in response: {}", body_text)))
189 }
190
191 pub async fn public_get_v2<T: DeserializeOwned>(
192 &self,
193 path: &str,
194 params: &[(&str, &str)],
195 ) -> Result<T, IndodaxError> {
196 let url = format!("{}{}", PUBLIC_BASE_URL, path);
197 let resp = self.retry_get_with_params(&url, params).await?;
198 self.handle_response(resp).await
199 }
200
201 async fn handle_v1_response<T: DeserializeOwned>(
202 &self,
203 resp: Response,
204 ) -> Result<T, IndodaxError> {
205 let body_text = resp.text().await?;
206 let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
207 IndodaxError::Parse(format!(
208 "Failed to parse response: {} (body: {})",
209 e, body_text
210 ))
211 })?;
212
213 if envelope.success == 1 {
214 envelope.return_data.ok_or_else(|| {
215 IndodaxError::Parse("API returned success but no 'return' data".into())
216 })
217 } else {
218 Err(IndodaxError::api(
219 envelope.error.unwrap_or_else(|| "Unknown error".into()),
220 match envelope.error_code.as_deref() {
221 Some("invalid_credentials") => ErrorCategory::Authentication,
222 Some("rate_limit") => ErrorCategory::RateLimit,
223 Some(c) if c.contains("invalid") => ErrorCategory::Validation,
224 _ => ErrorCategory::Unknown,
225 },
226 envelope.error_code,
227 ))
228 }
229 }
230
231 pub async fn private_post_v1<T: DeserializeOwned>(
232 &self,
233 method: &str,
234 params: &HashMap<String, String>,
235 ) -> Result<T, IndodaxError> {
236 let signer = self.signer.as_ref().ok_or_else(|| {
237 IndodaxError::Config("API credentials required for private endpoints".into())
238 })?;
239
240 let mut full_params: BTreeMap<String, String> = params
241 .iter()
242 .map(|(k, v)| (k.clone(), v.clone()))
243 .collect();
244
245 full_params.insert("method".into(), method.to_string());
246 full_params.insert("nonce".into(), signer.next_nonce_str());
247
248 let body = serde_urlencoded_str(&full_params);
249 let (_, signature) = signer.sign_v1(&body)?;
250
251 let resp = self
252 .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
253 .await?;
254
255 self.handle_v1_response(resp).await
256 }
257
258 pub async fn private_get_v2<T: DeserializeOwned>(
259 &self,
260 path: &str,
261 params: &HashMap<String, String>,
262 ) -> Result<T, IndodaxError> {
263 let signer = self.signer.as_ref().ok_or_else(|| {
264 IndodaxError::Config("API credentials required for private endpoints".into())
265 })?;
266
267 let mut qs_parts: Vec<String> = params
268 .iter()
269 .map(|(k, v)| format!("{}={}", k, v))
270 .collect();
271 let timestamp = Signer::now_millis();
272 qs_parts.push(format!("timestamp={}", timestamp));
273 qs_parts.push("recvWindow=5000".to_string());
274 qs_parts.sort();
275 let query_string = qs_parts.join("&");
276
277 let signature = signer.sign_v2(&query_string, timestamp)?;
278 let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
279
280 let req = self
281 .http
282 .get(&url)
283 .header("X-APIKEY", signer.api_key())
284 .header("Sign", &signature)
285 .header("Accept", "application/json")
286 .header("Content-Type", "application/json");
287 let resp = self.send_with_retry(req).await?;
288
289 let body_text = resp.text().await?;
290 let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
291 IndodaxError::Parse(format!(
292 "Failed to parse v2 response: {} (body: {})",
293 e, body_text
294 ))
295 })?;
296
297 if let Some(data) = envelope.data {
298 Ok(data)
299 } else if let Some(error) = envelope.error {
300 Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
301 } else {
302 Ok(serde_json::from_str(&body_text)?)
303 }
304 }
305
306 async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
307 let req = self.http.get(url);
308 self.send_with_retry(req).await
309 }
310
311 async fn retry_get_with_params(
312 &self,
313 url: &str,
314 params: &[(&str, &str)],
315 ) -> Result<Response, IndodaxError> {
316 let req = self.http.get(url).query(params);
317 self.send_with_retry(req).await
318 }
319
320 async fn retry_post(
321 &self,
322 url: &str,
323 body: &str,
324 api_key: &str,
325 signature: &str,
326 ) -> Result<Response, IndodaxError> {
327 let req = self
328 .http
329 .post(url)
330 .header("Key", api_key)
331 .header("Sign", signature)
332 .header("Content-Type", "application/x-www-form-urlencoded")
333 .body(body.to_string());
334 self.send_with_retry(req).await
335 }
336
337 async fn send_with_retry(
338 &self,
339 builder: RequestBuilder,
340 ) -> Result<Response, IndodaxError> {
341 self.rate_limiter.acquire().await;
342 let mut last_err = None;
343
344 for attempt in 0..=MAX_RETRIES {
345 if attempt > 0 {
346 tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(attempt - 1))).await;
347 }
348
349 let req = builder
350 .try_clone()
351 .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
352
353 match req.send().await {
354 Ok(resp) => {
355 let status = resp.status();
356 if status.is_success() {
357 return Ok(resp);
358 }
359
360 if status == StatusCode::TOO_MANY_REQUESTS {
361 last_err = Some(IndodaxError::api(
362 format!("Rate limited (HTTP {})", status.as_u16()),
363 ErrorCategory::RateLimit,
364 None,
365 ));
366 continue;
367 }
368
369 if status.is_server_error() {
370 last_err = Some(IndodaxError::api(
371 format!("Server error (HTTP {})", status.as_u16()),
372 ErrorCategory::Server,
373 None,
374 ));
375 continue;
376 }
377
378 last_err = Some(IndodaxError::api(
379 format!("HTTP {}", status.as_u16()),
380 ErrorCategory::Unknown,
381 None,
382 ));
383 break;
384 }
385 Err(e) => {
386 if e.is_timeout() || e.is_connect() {
387 last_err = Some(IndodaxError::Http(e));
388 continue;
389 }
390 return Err(IndodaxError::Http(e));
391 }
392 }
393 }
394
395 Err(last_err.unwrap_or_else(|| {
396 IndodaxError::Other("Max retries exceeded".into())
397 }))
398 }
399
400 async fn handle_response<T: DeserializeOwned>(
401 &self,
402 resp: Response,
403 ) -> Result<T, IndodaxError> {
404 let body_text = resp.text().await?;
405 serde_json::from_str(&body_text).map_err(|e| {
406 IndodaxError::Parse(format!(
407 "Failed to parse response: {} (body: {})",
408 e, body_text
409 ))
410 })
411 }
412}
413
414fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
415 params
416 .iter()
417 .map(|(k, v)| {
418 format!(
419 "{}={}",
420 url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
421 url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
422 )
423 })
424 .collect::<Vec<_>>()
425 .join("&")
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431 use crate::auth::Signer;
432
433 #[test]
434 fn test_indodax_client_new_with_signer() {
435 let signer = Signer::new("key", "secret");
436 let client = IndodaxClient::new(Some(signer)).unwrap();
437 assert!(client.signer().is_some());
438 }
439
440 #[test]
441 fn test_indodax_client_new_without_signer() {
442 let client = IndodaxClient::new(None).unwrap();
443 assert!(client.signer().is_none());
444 }
445
446 #[test]
447 fn test_indodax_client_signer() {
448 let signer = Signer::new("mykey", "mysecret");
449 let client = IndodaxClient::new(Some(signer)).unwrap();
450 let s = client.signer().unwrap();
451 assert_eq!(s.api_key(), "mykey");
452 }
453
454 #[test]
455 fn test_indodax_v1_response_success() {
456 let json = serde_json::json!({
457 "success": 1,
458 "return": {"balance": {"btc": "1.0"}},
459 "error": null,
460 "error_code": null
461 });
462 let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
463 assert_eq!(resp.success, 1);
464 assert!(resp.return_data.is_some());
465 assert!(resp.error.is_none());
466 }
467
468 #[test]
469 fn test_indodax_v1_response_failure() {
470 let json = serde_json::json!({
471 "success": 0,
472 "return": null,
473 "error": "Invalid credentials",
474 "error_code": "invalid_credentials"
475 });
476 let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
477 assert_eq!(resp.success, 0);
478 assert!(resp.return_data.is_none());
479 assert!(resp.error.is_some());
480 assert!(resp.error_code.is_some());
481 }
482
483 #[test]
484 fn test_indodax_v2_response_success() {
485 let json = serde_json::json!({
486 "data": {"name": "test"},
487 "code": null,
488 "error": null
489 });
490 let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
491 assert!(resp.data.is_some());
492 assert!(resp.error.is_none());
493 }
494
495 #[test]
496 fn test_indodax_v2_response_error() {
497 let json = serde_json::json!({
498 "data": null,
499 "code": 400,
500 "error": "Bad request"
501 });
502 let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
503 assert!(resp.data.is_none());
504 assert!(resp.error.is_some());
505 assert!(resp.code.is_some());
506 }
507
508 #[test]
509 fn test_serde_urlencoded_str_single() {
510 let mut params = std::collections::BTreeMap::new();
511 params.insert("method".into(), "getInfo".into());
512 params.insert("nonce".into(), "12345".into());
513
514 let result = serde_urlencoded_str(¶ms);
515 assert!(result.contains("method=getInfo"));
516 assert!(result.contains("nonce=12345"));
517 }
518
519 #[test]
520 fn test_serde_urlencoded_str_empty() {
521 let params = std::collections::BTreeMap::new();
522 let result = serde_urlencoded_str(¶ms);
523 assert_eq!(result, "");
524 }
525
526 #[test]
527 fn test_serde_urlencoded_str_special_chars() {
528 let mut params = std::collections::BTreeMap::new();
529 params.insert("key with space".into(), "value&more".into());
530
531 let result = serde_urlencoded_str(¶ms);
532 assert!(result.contains("%20") || result.contains("+"));
534 }
535
536 #[test]
537 fn test_public_base_url() {
538 assert!(PUBLIC_BASE_URL.contains("indodax.com"));
539 }
540
541 #[test]
542 fn test_private_v1_url() {
543 assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
544 }
545
546 #[test]
547 fn test_private_v2_base() {
548 assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
549 }
550
551 #[test]
552 fn test_max_retries_constant() {
553 assert_eq!(MAX_RETRIES, 3);
554 }
555
556 #[test]
557 fn test_indodax_v1_response_debug() {
558 let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
559 success: 1,
560 return_data: Some(serde_json::json!({})),
561 error: None,
562 error_code: None,
563 };
564 let debug_str = format!("{:?}", resp);
565 assert!(debug_str.contains("success"));
566 }
567
568 #[test]
569 fn test_indodax_v2_response_debug() {
570 let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
571 data: Some(serde_json::json!({})),
572 code: None,
573 error: None,
574 };
575 let debug_str = format!("{:?}", resp);
576 assert!(debug_str.contains("data"));
577 }
578
579 #[test]
580 fn test_rate_limiter_from_env_default() {
581 let rl = RateLimiter::from_env();
583 assert!(rl.capacity > 0);
586 assert!(rl.refill_per_sec > 0);
587 }
588
589 #[tokio::test]
590 async fn test_rate_limiter_acquire_single() {
591 let rl = RateLimiter::new(5, 5);
592 rl.acquire().await;
593 let state = rl.state.lock().await;
594 assert_eq!(state.tokens, 4);
595 }
596
597 #[tokio::test]
598 async fn test_rate_limiter_token_exhaustion_refills() {
599 let rl = RateLimiter::new(3, 10);
600 for _ in 0..3 {
601 rl.acquire().await;
602 }
603 {
604 let state = rl.state.lock().await;
605 assert_eq!(state.tokens, 0);
606 }
607
608 {
609 let mut state = rl.state.lock().await;
610 state.last_refill = Instant::now() - Duration::from_secs(1);
611 }
612
613 rl.acquire().await;
614 let state = rl.state.lock().await;
615 assert_eq!(state.tokens, 2);
616 }
617
618 #[tokio::test]
619 async fn test_rate_limiter_refill_capped_at_capacity() {
620 let rl = RateLimiter::new(5, 100);
621 for _ in 0..5 {
622 rl.acquire().await;
623 }
624 {
625 let state = rl.state.lock().await;
626 assert_eq!(state.tokens, 0);
627 }
628
629 {
630 let mut state = rl.state.lock().await;
631 state.last_refill = Instant::now() - Duration::from_secs(10);
632 }
633
634 rl.acquire().await;
635 let state = rl.state.lock().await;
636 assert_eq!(state.tokens, 4);
637 }
638
639 #[test]
640 fn test_rate_limiter_new_custom() {
641 let rl = RateLimiter::new(25, 25);
642 assert_eq!(rl.capacity, 25);
643 assert_eq!(rl.refill_per_sec, 25);
644 }
645}