1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18 tokens: u64,
19 last_refill: Instant,
20}
21
22#[derive(Debug)]
24struct RateLimiter {
25 capacity: u64,
26 refill_per_sec: u64,
27 state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31 fn new(capacity: u64, refill_per_sec: u64) -> Self {
32 Self {
33 capacity,
34 refill_per_sec,
35 state: Mutex::new(RateLimiterState {
36 tokens: capacity,
37 last_refill: Instant::now(),
38 }),
39 }
40 }
41
42 fn from_env() -> Self {
43 let rps = std::env::var("INDODAX_RATE_LIMIT")
44 .ok()
45 .and_then(|v| v.parse::<u64>().ok())
46 .unwrap_or(5)
47 .max(1);
48 Self::new(rps, rps)
49 }
50
51 async fn acquire(&self) {
52 loop {
53 let mut state = self.state.lock().await;
54 let elapsed = state.last_refill.elapsed();
55 if elapsed >= Duration::from_secs(1) {
56 let secs = elapsed.as_secs();
57 let add = self.refill_per_sec * secs;
58 state.tokens = state.tokens.saturating_add(add).min(self.capacity);
59 state.last_refill += Duration::from_secs(secs);
60 }
61 if state.tokens > 0 {
62 state.tokens -= 1;
63 return;
64 }
65 let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
66 let wait = if elapsed_ms < 1000 {
67 Duration::from_millis(1000 - elapsed_ms)
68 } else {
69 Duration::from_millis(50)
70 };
71 drop(state);
72 tokio::time::sleep(wait).await;
73 }
74 }
75}
76
77#[derive(Debug)]
78pub struct IndodaxClient {
79 http: Client,
80 signer: Option<Signer>,
81 rate_limiter: RateLimiter,
82 ws_token: Option<String>,
83}
84
85#[derive(Debug, serde::Deserialize)]
86pub struct IndodaxV1Response<T> {
87 pub success: i32,
88 #[serde(rename = "return")]
89 pub return_data: Option<T>,
90 pub error: Option<String>,
91 pub error_code: Option<String>,
92}
93
94#[derive(Debug, serde::Deserialize)]
95pub struct IndodaxV2Response<T> {
96 pub data: Option<T>,
97 pub code: Option<i64>,
98 pub error: Option<String>,
99}
100
101impl IndodaxClient {
102 pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
103 let http = Client::builder()
104 .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
105 .timeout(Duration::from_secs(30))
106 .pool_max_idle_per_host(2)
107 .build()
108 .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
109
110 Ok(Self {
111 http,
112 signer,
113 rate_limiter: RateLimiter::from_env(),
114 ws_token: None,
115 })
116 }
117
118 pub fn with_ws_token(mut self, token: Option<String>) -> Self {
119 self.ws_token = token;
120 self
121 }
122
123 pub fn signer(&self) -> Option<&Signer> {
124 self.signer.as_ref()
125 }
126
127 pub fn ws_token(&self) -> Option<&str> {
128 self.ws_token.as_deref()
129 }
130
131 pub fn http_client(&self) -> &Client {
132 &self.http
133 }
134
135 pub async fn public_get<T: DeserializeOwned>(
136 &self,
137 path: &str,
138 ) -> Result<T, IndodaxError> {
139 let url = format!("{}{}", PUBLIC_BASE_URL, path);
140 let resp = self.retry_get(&url).await?;
141 self.handle_response(resp).await
142 }
143
144 pub async fn countdown_cancel_all(
145 &self,
146 pair: Option<&str>,
147 countdown_time: u64,
148 ) -> Result<serde_json::Value, IndodaxError> {
149 let signer = self.signer.as_ref().ok_or_else(|| {
150 IndodaxError::Config("API credentials required for countdown cancel all".into())
151 })?;
152
153 let mut body_parts: Vec<String> = vec![
154 format!("countdownTime={}", countdown_time),
155 ];
156 if let Some(p) = pair {
157 body_parts.push(format!("pair={}", p));
158 }
159
160 let body = body_parts.join("&");
161 let (payload, signature) = signer.sign_v1(&body)?;
162
163 let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
164 let req = self
165 .http
166 .post(&url)
167 .header("Key", signer.api_key())
168 .header("Sign", &signature)
169 .header("Content-Type", "application/x-www-form-urlencoded")
170 .body(payload);
171 let resp = self.send_with_retry(req).await?;
172 self.handle_v1_response(resp).await
173 }
174
175 pub async fn generate_ws_token(&self) -> Result<(String, String), IndodaxError> {
176 let signer = self.signer.as_ref().ok_or_else(|| {
177 IndodaxError::Config("API credentials required for WebSocket token generation".into())
178 })?;
179
180 let nonce = signer.next_nonce_str();
181 let (_, signature) = signer.sign_v1(&nonce)?;
182
183 let req = self
184 .http
185 .post(WS_TOKEN_URL)
186 .header("Key", signer.api_key())
187 .header("Sign", &signature)
188 .header("Content-Type", "application/x-www-form-urlencoded")
189 .body(format!("nonce={}", nonce));
190 let resp = self.send_with_retry(req).await?;
191
192 let body_text = resp.text().await?;
193 let val: serde_json::Value = serde_json::from_str(&body_text)?;
194
195 let token = val.get("token")
196 .and_then(|t| t.as_str())
197 .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()))
198 .or_else(|| val.get("return").and_then(|r| r.get("connToken")).and_then(|t| t.as_str()))
199 .map(|t| t.to_string());
200
201 let channel = val.get("channel")
202 .and_then(|c| c.as_str())
203 .or_else(|| val.get("data").and_then(|d| d.get("channel")).and_then(|c| c.as_str()))
204 .or_else(|| val.get("return").and_then(|r| r.get("channel")).and_then(|c| c.as_str()))
205 .map(|c| c.to_string());
206
207 match (token, channel) {
208 (Some(t), Some(c)) => Ok((t, c)),
209 (Some(t), None) => Ok((t, "private:orders".to_string())), _ => Err(IndodaxError::WsToken(format!("No token or channel in response: {}", body_text))),
211 }
212 }
213
214 pub async fn public_get_v2<T: DeserializeOwned>(
215 &self,
216 path: &str,
217 params: &[(&str, &str)],
218 ) -> Result<T, IndodaxError> {
219 let url = format!("{}{}", PUBLIC_BASE_URL, path);
220 let resp = self.retry_get_with_params(&url, params).await?;
221 self.handle_response(resp).await
222 }
223
224 async fn handle_v1_response<T: DeserializeOwned>(
225 &self,
226 resp: Response,
227 ) -> Result<T, IndodaxError> {
228 let body_text = resp.text().await?;
229 let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
230 IndodaxError::Parse(format!(
231 "Failed to parse response: {} (body: {})",
232 e, body_text
233 ))
234 })?;
235
236 if envelope.success == 1 {
237 envelope.return_data.ok_or_else(|| {
238 IndodaxError::Parse("API returned success but no 'return' data".into())
239 })
240 } else {
241 Err(IndodaxError::api(
242 envelope.error.unwrap_or_else(|| "Unknown error".into()),
243 match envelope.error_code.as_deref() {
244 Some("invalid_credentials") => ErrorCategory::Authentication,
245 Some("rate_limit") => ErrorCategory::RateLimit,
246 Some(c) if c.contains("invalid") => ErrorCategory::Validation,
247 _ => ErrorCategory::Unknown,
248 },
249 envelope.error_code,
250 ))
251 }
252 }
253
254 pub async fn private_post_v1<T: DeserializeOwned>(
255 &self,
256 method: &str,
257 params: &HashMap<String, String>,
258 ) -> Result<T, IndodaxError> {
259 let signer = self.signer.as_ref().ok_or_else(|| {
260 IndodaxError::Config("API credentials required for private endpoints".into())
261 })?;
262
263 let mut full_params: BTreeMap<String, String> = params
264 .iter()
265 .map(|(k, v)| (k.clone(), v.clone()))
266 .collect();
267
268 full_params.insert("method".into(), method.to_string());
269 full_params.insert("nonce".into(), signer.next_nonce_str());
270
271 let body = serde_urlencoded_str(&full_params);
272 let (_, signature) = signer.sign_v1(&body)?;
273
274 let resp = self
275 .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
276 .await?;
277
278 self.handle_v1_response(resp).await
279 }
280
281 pub async fn private_get_v2<T: DeserializeOwned>(
282 &self,
283 path: &str,
284 params: &HashMap<String, String>,
285 ) -> Result<T, IndodaxError> {
286 let signer = self.signer.as_ref().ok_or_else(|| {
287 IndodaxError::Config("API credentials required for private endpoints".into())
288 })?;
289
290 let mut qs_parts: Vec<String> = params
291 .iter()
292 .map(|(k, v)| format!("{}={}", k, v))
293 .collect();
294 let timestamp = Signer::now_millis();
295 qs_parts.push(format!("timestamp={}", timestamp));
296 qs_parts.push("recvWindow=5000".to_string());
297 qs_parts.sort();
298 let query_string = qs_parts.join("&");
299
300 let signature = signer.sign_v2(&query_string, timestamp)?;
301 let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
302
303 let req = self
304 .http
305 .get(&url)
306 .header("X-APIKEY", signer.api_key())
307 .header("Sign", &signature)
308 .header("Accept", "application/json")
309 .header("Content-Type", "application/json");
310 let resp = self.send_with_retry(req).await?;
311
312 let body_text = resp.text().await?;
313 let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
314 IndodaxError::Parse(format!(
315 "Failed to parse v2 response: {} (body: {})",
316 e, body_text
317 ))
318 })?;
319
320 if let Some(data) = envelope.data {
321 Ok(data)
322 } else if let Some(error) = envelope.error {
323 Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
324 } else {
325 Ok(serde_json::from_str(&body_text)?)
326 }
327 }
328
329 async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
330 let req = self.http.get(url);
331 self.send_with_retry(req).await
332 }
333
334 async fn retry_get_with_params(
335 &self,
336 url: &str,
337 params: &[(&str, &str)],
338 ) -> Result<Response, IndodaxError> {
339 let req = self.http.get(url).query(params);
340 self.send_with_retry(req).await
341 }
342
343 async fn retry_post(
344 &self,
345 url: &str,
346 body: &str,
347 api_key: &str,
348 signature: &str,
349 ) -> Result<Response, IndodaxError> {
350 let req = self
351 .http
352 .post(url)
353 .header("Key", api_key)
354 .header("Sign", signature)
355 .header("Content-Type", "application/x-www-form-urlencoded")
356 .body(body.to_string());
357 self.send_with_retry(req).await
358 }
359
360 async fn send_with_retry(
361 &self,
362 builder: RequestBuilder,
363 ) -> Result<Response, IndodaxError> {
364 self.rate_limiter.acquire().await;
365 let mut last_err = None;
366
367 for attempt in 0..=MAX_RETRIES {
368 if attempt > 0 {
369 tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(attempt - 1))).await;
370 }
371
372 let req = builder
373 .try_clone()
374 .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
375
376 match req.send().await {
377 Ok(resp) => {
378 let status = resp.status();
379 if status.is_success() {
380 return Ok(resp);
381 }
382
383 if status == StatusCode::TOO_MANY_REQUESTS {
384 last_err = Some(IndodaxError::api(
385 format!("Rate limited (HTTP {})", status.as_u16()),
386 ErrorCategory::RateLimit,
387 None,
388 ));
389 continue;
390 }
391
392 if status.is_server_error() {
393 last_err = Some(IndodaxError::api(
394 format!("Server error (HTTP {})", status.as_u16()),
395 ErrorCategory::Server,
396 None,
397 ));
398 continue;
399 }
400
401 last_err = Some(IndodaxError::api(
402 format!("HTTP {}", status.as_u16()),
403 ErrorCategory::Unknown,
404 None,
405 ));
406 break;
407 }
408 Err(e) => {
409 if e.is_timeout() || e.is_connect() {
410 last_err = Some(IndodaxError::Http(e));
411 continue;
412 }
413 return Err(IndodaxError::Http(e));
414 }
415 }
416 }
417
418 Err(last_err.unwrap_or_else(|| {
419 IndodaxError::Other("Max retries exceeded".into())
420 }))
421 }
422
423 async fn handle_response<T: DeserializeOwned>(
424 &self,
425 resp: Response,
426 ) -> Result<T, IndodaxError> {
427 let body_text = resp.text().await?;
428 serde_json::from_str(&body_text).map_err(|e| {
429 IndodaxError::Parse(format!(
430 "Failed to parse response: {} (body: {})",
431 e, body_text
432 ))
433 })
434 }
435}
436
437fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
438 params
439 .iter()
440 .map(|(k, v)| {
441 format!(
442 "{}={}",
443 url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
444 url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
445 )
446 })
447 .collect::<Vec<_>>()
448 .join("&")
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use crate::auth::Signer;
455
456 #[test]
457 fn test_indodax_client_new_with_signer() {
458 let signer = Signer::new("key", "secret");
459 let client = IndodaxClient::new(Some(signer)).unwrap();
460 assert!(client.signer().is_some());
461 }
462
463 #[test]
464 fn test_indodax_client_new_without_signer() {
465 let client = IndodaxClient::new(None).unwrap();
466 assert!(client.signer().is_none());
467 }
468
469 #[test]
470 fn test_indodax_client_signer() {
471 let signer = Signer::new("mykey", "mysecret");
472 let client = IndodaxClient::new(Some(signer)).unwrap();
473 let s = client.signer().unwrap();
474 assert_eq!(s.api_key(), "mykey");
475 }
476
477 #[test]
478 fn test_indodax_v1_response_success() {
479 let json = serde_json::json!({
480 "success": 1,
481 "return": {"balance": {"btc": "1.0"}},
482 "error": null,
483 "error_code": null
484 });
485 let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
486 assert_eq!(resp.success, 1);
487 assert!(resp.return_data.is_some());
488 assert!(resp.error.is_none());
489 }
490
491 #[test]
492 fn test_indodax_v1_response_failure() {
493 let json = serde_json::json!({
494 "success": 0,
495 "return": null,
496 "error": "Invalid credentials",
497 "error_code": "invalid_credentials"
498 });
499 let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
500 assert_eq!(resp.success, 0);
501 assert!(resp.return_data.is_none());
502 assert!(resp.error.is_some());
503 assert!(resp.error_code.is_some());
504 }
505
506 #[test]
507 fn test_indodax_v2_response_success() {
508 let json = serde_json::json!({
509 "data": {"name": "test"},
510 "code": null,
511 "error": null
512 });
513 let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
514 assert!(resp.data.is_some());
515 assert!(resp.error.is_none());
516 }
517
518 #[test]
519 fn test_indodax_v2_response_error() {
520 let json = serde_json::json!({
521 "data": null,
522 "code": 400,
523 "error": "Bad request"
524 });
525 let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
526 assert!(resp.data.is_none());
527 assert!(resp.error.is_some());
528 assert!(resp.code.is_some());
529 }
530
531 #[test]
532 fn test_serde_urlencoded_str_single() {
533 let mut params = std::collections::BTreeMap::new();
534 params.insert("method".into(), "getInfo".into());
535 params.insert("nonce".into(), "12345".into());
536
537 let result = serde_urlencoded_str(¶ms);
538 assert!(result.contains("method=getInfo"));
539 assert!(result.contains("nonce=12345"));
540 }
541
542 #[test]
543 fn test_serde_urlencoded_str_empty() {
544 let params = std::collections::BTreeMap::new();
545 let result = serde_urlencoded_str(¶ms);
546 assert_eq!(result, "");
547 }
548
549 #[test]
550 fn test_serde_urlencoded_str_special_chars() {
551 let mut params = std::collections::BTreeMap::new();
552 params.insert("key with space".into(), "value&more".into());
553
554 let result = serde_urlencoded_str(¶ms);
555 assert!(result.contains("%20") || result.contains("+"));
557 }
558
559 #[test]
560 fn test_public_base_url() {
561 assert!(PUBLIC_BASE_URL.contains("indodax.com"));
562 }
563
564 #[test]
565 fn test_private_v1_url() {
566 assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
567 }
568
569 #[test]
570 fn test_private_v2_base() {
571 assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
572 }
573
574 #[test]
575 fn test_max_retries_constant() {
576 assert_eq!(MAX_RETRIES, 3);
577 }
578
579 #[test]
580 fn test_indodax_v1_response_debug() {
581 let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
582 success: 1,
583 return_data: Some(serde_json::json!({})),
584 error: None,
585 error_code: None,
586 };
587 let debug_str = format!("{:?}", resp);
588 assert!(debug_str.contains("success"));
589 }
590
591 #[test]
592 fn test_indodax_v2_response_debug() {
593 let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
594 data: Some(serde_json::json!({})),
595 code: None,
596 error: None,
597 };
598 let debug_str = format!("{:?}", resp);
599 assert!(debug_str.contains("data"));
600 }
601
602 #[test]
603 fn test_rate_limiter_from_env_default() {
604 let rl = RateLimiter::from_env();
606 assert!(rl.capacity > 0);
609 assert!(rl.refill_per_sec > 0);
610 }
611
612 #[tokio::test]
613 async fn test_rate_limiter_acquire_single() {
614 let rl = RateLimiter::new(5, 5);
615 rl.acquire().await;
616 let state = rl.state.lock().await;
617 assert_eq!(state.tokens, 4);
618 }
619
620 #[tokio::test]
621 async fn test_rate_limiter_token_exhaustion_refills() {
622 let rl = RateLimiter::new(3, 10);
623 for _ in 0..3 {
624 rl.acquire().await;
625 }
626 {
627 let state = rl.state.lock().await;
628 assert_eq!(state.tokens, 0);
629 }
630
631 {
632 let mut state = rl.state.lock().await;
633 state.last_refill = Instant::now() - Duration::from_secs(1);
634 }
635
636 rl.acquire().await;
637 let state = rl.state.lock().await;
638 assert_eq!(state.tokens, 2);
639 }
640
641 #[tokio::test]
642 async fn test_rate_limiter_refill_capped_at_capacity() {
643 let rl = RateLimiter::new(5, 100);
644 for _ in 0..5 {
645 rl.acquire().await;
646 }
647 {
648 let state = rl.state.lock().await;
649 assert_eq!(state.tokens, 0);
650 }
651
652 {
653 let mut state = rl.state.lock().await;
654 state.last_refill = Instant::now() - Duration::from_secs(10);
655 }
656
657 rl.acquire().await;
658 let state = rl.state.lock().await;
659 assert_eq!(state.tokens, 4);
660 }
661
662 #[test]
663 fn test_rate_limiter_new_custom() {
664 let rl = RateLimiter::new(25, 25);
665 assert_eq!(rl.capacity, 25);
666 assert_eq!(rl.refill_per_sec, 25);
667 }
668}