1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18 tokens: u64,
19 last_refill: Instant,
20}
21
22#[derive(Debug)]
24struct RateLimiter {
25 capacity: u64,
26 refill_per_sec: u64,
27 state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31 fn new(capacity: u64, refill_per_sec: u64) -> Self {
32 Self {
33 capacity,
34 refill_per_sec,
35 state: Mutex::new(RateLimiterState {
36 tokens: capacity,
37 last_refill: Instant::now(),
38 }),
39 }
40 }
41
42 fn from_env() -> Self {
43 let rps = std::env::var("INDODAX_RATE_LIMIT")
44 .ok()
45 .and_then(|v| v.parse::<u64>().ok())
46 .unwrap_or(5)
47 .max(1);
48 Self::new(rps, rps)
49 }
50
51 async fn acquire(&self) {
52 loop {
53 let mut state = self.state.lock().await;
54 let elapsed = state.last_refill.elapsed();
55 if elapsed >= Duration::from_secs(1) {
56 let secs = elapsed.as_secs();
57 let capped = secs.min(60); let add = self.refill_per_sec * capped;
59 state.tokens = state.tokens.saturating_add(add).min(self.capacity);
60 state.last_refill += Duration::from_secs(capped);
61 }
62 if state.tokens > 0 {
63 state.tokens -= 1;
64 return;
65 }
66 let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
67 let wait = if elapsed_ms < 1000 {
68 Duration::from_millis(1000 - elapsed_ms)
69 } else {
70 Duration::from_millis(50)
71 };
72 drop(state);
73 tokio::time::sleep(wait.max(Duration::from_millis(10))).await;
74 }
75 }
76}
77
78#[derive(Debug)]
79pub struct IndodaxClient {
80 http: Client,
81 signer: Option<Signer>,
82 rate_limiter: RateLimiter,
83 ws_token: Option<String>,
84}
85
86#[derive(Debug, serde::Deserialize)]
87pub struct IndodaxV1Response<T> {
88 pub success: i32,
89 #[serde(rename = "return")]
90 pub return_data: Option<T>,
91 pub error: Option<String>,
92 pub error_code: Option<String>,
93}
94
95#[derive(Debug, serde::Deserialize)]
96pub struct IndodaxV2Response<T> {
97 pub data: Option<T>,
98 pub code: Option<i64>,
99 pub error: Option<String>,
100}
101
102impl IndodaxClient {
103 pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
104 let http = Client::builder()
105 .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
106 .timeout(Duration::from_secs(30))
107 .pool_max_idle_per_host(2)
108 .build()
109 .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
110
111 Ok(Self {
112 http,
113 signer,
114 rate_limiter: RateLimiter::from_env(),
115 ws_token: None,
116 })
117 }
118
119 pub fn with_ws_token(mut self, token: Option<String>) -> Self {
120 self.ws_token = token;
121 self
122 }
123
124 pub fn signer(&self) -> Option<&Signer> {
125 self.signer.as_ref()
126 }
127
128 pub fn ws_token(&self) -> Option<&str> {
129 self.ws_token.as_deref()
130 }
131
132 pub fn http_client(&self) -> &Client {
133 &self.http
134 }
135
136 pub async fn public_get<T: DeserializeOwned>(
137 &self,
138 path: &str,
139 ) -> Result<T, IndodaxError> {
140 let url = format!("{}{}", PUBLIC_BASE_URL, path);
141 let resp = self.retry_get(&url).await?;
142 self.handle_response(resp).await
143 }
144
145 pub async fn countdown_cancel_all(
146 &self,
147 pair: Option<&str>,
148 countdown_time: u64,
149 ) -> Result<serde_json::Value, IndodaxError> {
150 let signer = self.signer.as_ref().ok_or_else(|| {
151 IndodaxError::Config("API credentials required for countdown cancel all".into())
152 })?;
153
154 let mut body_parts: Vec<String> = vec![
155 format!("countdownTime={}", countdown_time),
156 ];
157 if let Some(p) = pair {
158 body_parts.push(format!("pair={}", p));
159 }
160
161 let body = body_parts.join("&");
162 let (payload, signature) = signer.sign_v1(&body)?;
163
164 let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
165 let req = self
166 .http
167 .post(&url)
168 .header("Key", signer.api_key())
169 .header("Sign", &signature)
170 .header("Content-Type", "application/x-www-form-urlencoded")
171 .body(payload);
172 let resp = self.send_with_retry(req).await?;
173 self.handle_v1_response(resp).await
174 }
175
176 pub async fn generate_ws_token(&self) -> Result<(String, String), IndodaxError> {
177 let signer = self.signer.as_ref().ok_or_else(|| {
178 IndodaxError::Config("API credentials required for WebSocket token generation".into())
179 })?;
180
181 let nonce = signer.next_nonce_str();
182 let (_, signature) = signer.sign_v1(&nonce)?;
183
184 let req = self
185 .http
186 .post(WS_TOKEN_URL)
187 .header("Key", signer.api_key())
188 .header("Sign", &signature)
189 .header("Content-Type", "application/x-www-form-urlencoded")
190 .body(format!("nonce={}", nonce));
191 let resp = self.send_with_retry(req).await?;
192
193 let body_text = resp.text().await?;
194 let val: serde_json::Value = serde_json::from_str(&body_text)?;
195
196 let token = val.get("token")
197 .and_then(|t| t.as_str())
198 .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()))
199 .or_else(|| val.get("return").and_then(|r| r.get("connToken")).and_then(|t| t.as_str()))
200 .map(|t| t.to_string());
201
202 let channel = val.get("channel")
203 .and_then(|c| c.as_str())
204 .or_else(|| val.get("data").and_then(|d| d.get("channel")).and_then(|c| c.as_str()))
205 .or_else(|| val.get("return").and_then(|r| r.get("channel")).and_then(|c| c.as_str()))
206 .map(|c| c.to_string());
207
208 match (token, channel) {
209 (Some(t), Some(c)) => Ok((t, c)),
210 (Some(t), None) => Ok((t, "private:orders".to_string())), _ => Err(IndodaxError::WsToken(format!("No token or channel in response: {}", body_text))),
212 }
213 }
214
215 pub async fn public_get_v2<T: DeserializeOwned>(
216 &self,
217 path: &str,
218 params: &[(&str, &str)],
219 ) -> Result<T, IndodaxError> {
220 let url = format!("{}{}", PUBLIC_BASE_URL, path);
221 let resp = self.retry_get_with_params(&url, params).await?;
222 self.handle_response(resp).await
223 }
224
225 async fn handle_v1_response<T: DeserializeOwned>(
226 &self,
227 resp: Response,
228 ) -> Result<T, IndodaxError> {
229 let body_text = resp.text().await?;
230 let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
231 IndodaxError::Parse(format!(
232 "Failed to parse response: {} (body: {})",
233 e, body_text
234 ))
235 })?;
236
237 if envelope.success == 1 {
238 envelope.return_data.ok_or_else(|| {
239 IndodaxError::Parse("API returned success but no 'return' data".into())
240 })
241 } else {
242 Err(IndodaxError::api(
243 envelope.error.unwrap_or_else(|| "Unknown error".into()),
244 match envelope.error_code.as_deref() {
245 Some("invalid_credentials") => ErrorCategory::Authentication,
246 Some("rate_limit") => ErrorCategory::RateLimit,
247 Some(c) if c.contains("invalid") => ErrorCategory::Validation,
248 _ => ErrorCategory::Unknown,
249 },
250 envelope.error_code,
251 ))
252 }
253 }
254
255 pub async fn private_post_v1<T: DeserializeOwned>(
256 &self,
257 method: &str,
258 params: &HashMap<String, String>,
259 ) -> Result<T, IndodaxError> {
260 let signer = self.signer.as_ref().ok_or_else(|| {
261 IndodaxError::Config("API credentials required for private endpoints".into())
262 })?;
263
264 let mut full_params: BTreeMap<String, String> = params
265 .iter()
266 .map(|(k, v)| (k.clone(), v.clone()))
267 .collect();
268
269 full_params.insert("method".into(), method.to_string());
270 full_params.insert("nonce".into(), signer.next_nonce_str());
271
272 let body = serde_urlencoded_str(&full_params);
273 let (_, signature) = signer.sign_v1(&body)?;
274
275 let resp = self
276 .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
277 .await?;
278
279 self.handle_v1_response(resp).await
280 }
281
282 pub async fn private_get_v2<T: DeserializeOwned>(
283 &self,
284 path: &str,
285 params: &HashMap<String, String>,
286 ) -> Result<T, IndodaxError> {
287 let signer = self.signer.as_ref().ok_or_else(|| {
288 IndodaxError::Config("API credentials required for private endpoints".into())
289 })?;
290
291 let mut qs_parts: Vec<String> = params
292 .iter()
293 .map(|(k, v)| format!("{}={}", k, v))
294 .collect();
295 let timestamp = crate::commands::helpers::now_millis();
296 qs_parts.push(format!("timestamp={}", timestamp));
297 qs_parts.push("recvWindow=5000".to_string());
298 qs_parts.sort();
299 let query_string = qs_parts.join("&");
300
301 let signature = signer.sign_v2(&query_string)?;
302 let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
303
304 let req = self
305 .http
306 .get(&url)
307 .header("X-APIKEY", signer.api_key())
308 .header("Sign", &signature)
309 .header("Accept", "application/json")
310 .header("Content-Type", "application/json");
311 let resp = self.send_with_retry(req).await?;
312
313 let body_text = resp.text().await?;
314 let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
315 IndodaxError::Parse(format!(
316 "Failed to parse v2 response: {} (body: {})",
317 e, body_text
318 ))
319 })?;
320
321 if let Some(data) = envelope.data {
322 Ok(data)
323 } else if let Some(error) = envelope.error {
324 Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
325 } else {
326 Ok(serde_json::from_str(&body_text)?)
327 }
328 }
329
330 async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
331 let req = self.http.get(url);
332 self.send_with_retry(req).await
333 }
334
335 async fn retry_get_with_params(
336 &self,
337 url: &str,
338 params: &[(&str, &str)],
339 ) -> Result<Response, IndodaxError> {
340 let req = self.http.get(url).query(params);
341 self.send_with_retry(req).await
342 }
343
344 async fn retry_post(
345 &self,
346 url: &str,
347 body: &str,
348 api_key: &str,
349 signature: &str,
350 ) -> Result<Response, IndodaxError> {
351 let req = self
352 .http
353 .post(url)
354 .header("Key", api_key)
355 .header("Sign", signature)
356 .header("Content-Type", "application/x-www-form-urlencoded")
357 .body(body.to_string());
358 self.send_with_retry(req).await
359 }
360
361 async fn send_with_retry(
362 &self,
363 builder: RequestBuilder,
364 ) -> Result<Response, IndodaxError> {
365 self.rate_limiter.acquire().await;
366 let mut last_err = None;
367 let mut total_retries = 0u32;
368 let mut backoff_count = 0u32;
369
370 while total_retries <= MAX_RETRIES {
371 if backoff_count > 0 {
372 tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(backoff_count - 1))).await;
373 }
374
375 let req = builder
376 .try_clone()
377 .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
378
379 match req.send().await {
380 Ok(resp) => {
381 let status = resp.status();
382 if status.is_success() {
383 return Ok(resp);
384 }
385
386 if status == StatusCode::TOO_MANY_REQUESTS {
387 let retry_after = resp.headers()
388 .get("Retry-After")
389 .and_then(|v| v.to_str().ok())
390 .and_then(|s| s.parse::<u64>().ok())
391 .map(Duration::from_secs);
392 if let Some(delay) = retry_after {
393 tokio::time::sleep(delay).await;
394 backoff_count = 0;
395 } else {
396 backoff_count += 1;
397 }
398 total_retries += 1;
399 last_err = Some(IndodaxError::api(
400 format!("Rate limited (HTTP {})", status.as_u16()),
401 ErrorCategory::RateLimit,
402 None,
403 ));
404 continue;
405 }
406
407 if status.is_server_error() {
408 total_retries += 1;
409 backoff_count += 1;
410 last_err = Some(IndodaxError::api(
411 format!("Server error (HTTP {})", status.as_u16()),
412 ErrorCategory::Server,
413 None,
414 ));
415 continue;
416 }
417
418 last_err = Some(IndodaxError::api(
419 format!("HTTP {}", status.as_u16()),
420 ErrorCategory::Unknown,
421 None,
422 ));
423 break;
424 }
425 Err(e) => {
426 total_retries += 1;
427 backoff_count += 1;
428 if e.is_timeout() || e.is_connect() {
429 last_err = Some(IndodaxError::Http(e));
430 continue;
431 }
432 return Err(IndodaxError::Http(e));
433 }
434 }
435 }
436
437 Err(last_err.unwrap_or_else(|| {
438 IndodaxError::Other("Max retries exceeded".into())
439 }))
440 }
441
442 async fn handle_response<T: DeserializeOwned>(
443 &self,
444 resp: Response,
445 ) -> Result<T, IndodaxError> {
446 let body_text = resp.text().await?;
447 serde_json::from_str(&body_text).map_err(|e| {
448 IndodaxError::Parse(format!(
449 "Failed to parse response: {} (body: {})",
450 e, body_text
451 ))
452 })
453 }
454}
455
456fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
457 params
458 .iter()
459 .map(|(k, v)| {
460 format!(
461 "{}={}",
462 url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
463 url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
464 )
465 })
466 .collect::<Vec<_>>()
467 .join("&")
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use crate::auth::Signer;
474
475 #[test]
476 fn test_indodax_client_new_with_signer() {
477 let signer = Signer::new("key", "secret");
478 let client = IndodaxClient::new(Some(signer)).unwrap();
479 assert!(client.signer().is_some());
480 }
481
482 #[test]
483 fn test_indodax_client_new_without_signer() {
484 let client = IndodaxClient::new(None).unwrap();
485 assert!(client.signer().is_none());
486 }
487
488 #[test]
489 fn test_indodax_client_signer() {
490 let signer = Signer::new("mykey", "mysecret");
491 let client = IndodaxClient::new(Some(signer)).unwrap();
492 let s = client.signer().unwrap();
493 assert_eq!(s.api_key(), "mykey");
494 }
495
496 #[test]
497 fn test_indodax_v1_response_success() {
498 let json = serde_json::json!({
499 "success": 1,
500 "return": {"balance": {"btc": "1.0"}},
501 "error": null,
502 "error_code": null
503 });
504 let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
505 assert_eq!(resp.success, 1);
506 assert!(resp.return_data.is_some());
507 assert!(resp.error.is_none());
508 }
509
510 #[test]
511 fn test_indodax_v1_response_failure() {
512 let json = serde_json::json!({
513 "success": 0,
514 "return": null,
515 "error": "Invalid credentials",
516 "error_code": "invalid_credentials"
517 });
518 let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
519 assert_eq!(resp.success, 0);
520 assert!(resp.return_data.is_none());
521 assert!(resp.error.is_some());
522 assert!(resp.error_code.is_some());
523 }
524
525 #[test]
526 fn test_indodax_v2_response_success() {
527 let json = serde_json::json!({
528 "data": {"name": "test"},
529 "code": null,
530 "error": null
531 });
532 let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
533 assert!(resp.data.is_some());
534 assert!(resp.error.is_none());
535 }
536
537 #[test]
538 fn test_indodax_v2_response_error() {
539 let json = serde_json::json!({
540 "data": null,
541 "code": 400,
542 "error": "Bad request"
543 });
544 let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
545 assert!(resp.data.is_none());
546 assert!(resp.error.is_some());
547 assert!(resp.code.is_some());
548 }
549
550 #[test]
551 fn test_serde_urlencoded_str_single() {
552 let mut params = std::collections::BTreeMap::new();
553 params.insert("method".into(), "getInfo".into());
554 params.insert("nonce".into(), "12345".into());
555
556 let result = serde_urlencoded_str(¶ms);
557 assert!(result.contains("method=getInfo"));
558 assert!(result.contains("nonce=12345"));
559 }
560
561 #[test]
562 fn test_serde_urlencoded_str_empty() {
563 let params = std::collections::BTreeMap::new();
564 let result = serde_urlencoded_str(¶ms);
565 assert_eq!(result, "");
566 }
567
568 #[test]
569 fn test_serde_urlencoded_str_special_chars() {
570 let mut params = std::collections::BTreeMap::new();
571 params.insert("key with space".into(), "value&more".into());
572
573 let result = serde_urlencoded_str(¶ms);
574 assert!(result.contains("%20") || result.contains("+"));
576 }
577
578 #[test]
579 fn test_public_base_url() {
580 assert!(PUBLIC_BASE_URL.contains("indodax.com"));
581 }
582
583 #[test]
584 fn test_private_v1_url() {
585 assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
586 }
587
588 #[test]
589 fn test_private_v2_base() {
590 assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
591 }
592
593 #[test]
594 fn test_max_retries_constant() {
595 assert_eq!(MAX_RETRIES, 3);
596 }
597
598 #[test]
599 fn test_indodax_v1_response_debug() {
600 let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
601 success: 1,
602 return_data: Some(serde_json::json!({})),
603 error: None,
604 error_code: None,
605 };
606 let debug_str = format!("{:?}", resp);
607 assert!(debug_str.contains("success"));
608 }
609
610 #[test]
611 fn test_indodax_v2_response_debug() {
612 let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
613 data: Some(serde_json::json!({})),
614 code: None,
615 error: None,
616 };
617 let debug_str = format!("{:?}", resp);
618 assert!(debug_str.contains("data"));
619 }
620
621 #[test]
622 fn test_rate_limiter_from_env_default() {
623 let rl = RateLimiter::from_env();
625 assert!(rl.capacity > 0);
628 assert!(rl.refill_per_sec > 0);
629 }
630
631 #[tokio::test]
632 async fn test_rate_limiter_acquire_single() {
633 let rl = RateLimiter::new(5, 5);
634 rl.acquire().await;
635 let state = rl.state.lock().await;
636 assert_eq!(state.tokens, 4);
637 }
638
639 #[tokio::test]
640 async fn test_rate_limiter_token_exhaustion_refills() {
641 let rl = RateLimiter::new(3, 10);
642 for _ in 0..3 {
643 rl.acquire().await;
644 }
645 {
646 let state = rl.state.lock().await;
647 assert_eq!(state.tokens, 0);
648 }
649
650 {
651 let mut state = rl.state.lock().await;
652 state.last_refill = Instant::now() - Duration::from_secs(1);
653 }
654
655 rl.acquire().await;
656 let state = rl.state.lock().await;
657 assert_eq!(state.tokens, 2);
658 }
659
660 #[tokio::test]
661 async fn test_rate_limiter_refill_capped_at_capacity() {
662 let rl = RateLimiter::new(5, 100);
663 for _ in 0..5 {
664 rl.acquire().await;
665 }
666 {
667 let state = rl.state.lock().await;
668 assert_eq!(state.tokens, 0);
669 }
670
671 {
672 let mut state = rl.state.lock().await;
673 state.last_refill = Instant::now() - Duration::from_secs(10);
674 }
675
676 rl.acquire().await;
677 let state = rl.state.lock().await;
678 assert_eq!(state.tokens, 4);
679 }
680
681 #[test]
682 fn test_rate_limiter_new_custom() {
683 let rl = RateLimiter::new(25, 25);
684 assert_eq!(rl.capacity, 25);
685 assert_eq!(rl.refill_per_sec, 25);
686 }
687}