Skip to main content

indodax_cli/
client.rs

1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18    tokens: u64,
19    last_refill: Instant,
20}
21
22/// Token-bucket rate limiter for proactive 429 avoidance.
23#[derive(Debug)]
24struct RateLimiter {
25    capacity: u64,
26    refill_per_sec: u64,
27    state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31    fn new(capacity: u64, refill_per_sec: u64) -> Self {
32        Self {
33            capacity,
34            refill_per_sec,
35            state: Mutex::new(RateLimiterState {
36                tokens: capacity,
37                last_refill: Instant::now(),
38            }),
39        }
40    }
41
42    fn from_env() -> Self {
43        let rps = std::env::var("INDODAX_RATE_LIMIT")
44            .ok()
45            .and_then(|v| v.parse::<u64>().ok())
46            .unwrap_or(5)
47            .max(1);
48        Self::new(rps, rps)
49    }
50
51    async fn acquire(&self) {
52        loop {
53            let mut state = self.state.lock().await;
54            let elapsed = state.last_refill.elapsed();
55            if elapsed >= Duration::from_secs(1) {
56                let secs = elapsed.as_secs();
57                let add = self.refill_per_sec * secs;
58                state.tokens = state.tokens.saturating_add(add).min(self.capacity);
59                state.last_refill += Duration::from_secs(secs);
60            }
61            if state.tokens > 0 {
62                state.tokens -= 1;
63                return;
64            }
65            let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
66            let wait = if elapsed_ms < 1000 {
67                Duration::from_millis(1000 - elapsed_ms)
68            } else {
69                Duration::from_millis(50)
70            };
71            drop(state);
72            tokio::time::sleep(wait).await;
73        }
74    }
75}
76
77#[derive(Debug)]
78pub struct IndodaxClient {
79    http: Client,
80    signer: Option<Signer>,
81    rate_limiter: RateLimiter,
82}
83
84#[derive(Debug, serde::Deserialize)]
85pub struct IndodaxV1Response<T> {
86    pub success: i32,
87    #[serde(rename = "return")]
88    pub return_data: Option<T>,
89    pub error: Option<String>,
90    pub error_code: Option<String>,
91}
92
93#[derive(Debug, serde::Deserialize)]
94pub struct IndodaxV2Response<T> {
95    pub data: Option<T>,
96    pub code: Option<i64>,
97    pub error: Option<String>,
98}
99
100impl IndodaxClient {
101    pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
102        let http = Client::builder()
103            .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
104            .timeout(Duration::from_secs(30))
105            .pool_max_idle_per_host(2)
106            .build()
107            .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
108
109        Ok(Self {
110            http,
111            signer,
112            rate_limiter: RateLimiter::from_env(),
113        })
114    }
115
116    pub fn signer(&self) -> Option<&Signer> {
117        self.signer.as_ref()
118    }
119
120    pub fn http_client(&self) -> &Client {
121        &self.http
122    }
123
124    pub async fn public_get<T: DeserializeOwned>(
125        &self,
126        path: &str,
127    ) -> Result<T, IndodaxError> {
128        let url = format!("{}{}", PUBLIC_BASE_URL, path);
129        let resp = self.retry_get(&url).await?;
130        self.handle_response(resp).await
131    }
132
133    pub async fn countdown_cancel_all(
134        &self,
135        pair: Option<&str>,
136        countdown_time: u64,
137    ) -> Result<serde_json::Value, IndodaxError> {
138        let signer = self.signer.as_ref().ok_or_else(|| {
139            IndodaxError::Config("API credentials required for countdown cancel all".into())
140        })?;
141
142        let mut body_parts: Vec<String> = vec![
143            format!("countdownTime={}", countdown_time),
144        ];
145        if let Some(p) = pair {
146            body_parts.push(format!("pair={}", p));
147        }
148
149        let body = body_parts.join("&");
150        let (payload, signature) = signer.sign_v1(&body)?;
151
152        let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
153        let req = self
154            .http
155            .post(&url)
156            .header("Key", signer.api_key())
157            .header("Sign", &signature)
158            .header("Content-Type", "application/x-www-form-urlencoded")
159            .body(payload);
160        let resp = self.send_with_retry(req).await?;
161
162        let body_text = resp.text().await?;
163        let data: serde_json::Value = serde_json::from_str(&body_text)?;
164
165        if let Some(success) = data.get("success").and_then(|v| v.as_i64()) {
166            if success == 1 {
167                Ok(data)
168            } else {
169                let error_msg = data
170                    .get("error")
171                    .and_then(|v| v.as_str())
172                    .unwrap_or("Unknown error");
173                let error_code = data
174                    .get("error_code")
175                    .and_then(|v| v.as_str())
176                    .map(|s| s.to_string());
177                let category = match error_code.as_deref() {
178                    Some("invalid_credentials") => ErrorCategory::Authentication,
179                    Some("rate_limit") => ErrorCategory::RateLimit,
180                    Some(c) if c.contains("invalid") => ErrorCategory::Validation,
181                    _ => ErrorCategory::Unknown,
182                };
183                Err(IndodaxError::api(error_msg, category, error_code))
184            }
185        } else {
186            Ok(data)
187        }
188    }
189
190    pub async fn generate_ws_token(&self) -> Result<String, IndodaxError> {
191        let signer = self.signer.as_ref().ok_or_else(|| {
192            IndodaxError::Config("API credentials required for WebSocket token generation".into())
193        })?;
194
195        let nonce = signer.next_nonce_str();
196        let (_, signature) = signer.sign_v1(&nonce)?;
197
198        let req = self
199            .http
200            .post(WS_TOKEN_URL)
201            .header("Key", signer.api_key())
202            .header("Sign", &signature)
203            .header("Content-Type", "application/x-www-form-urlencoded")
204            .body(format!("nonce={}", nonce));
205        let resp = self.send_with_retry(req).await?;
206
207        let body_text = resp.text().await?;
208        let val: serde_json::Value = serde_json::from_str(&body_text)?;
209
210        val.get("token")
211            .and_then(|t| t.as_str())
212            .map(|t| t.to_string())
213            .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()).map(|t| t.to_string()))
214            .ok_or_else(|| IndodaxError::WsToken(format!("No token in response: {}", body_text)))
215    }
216
217    pub async fn public_get_v2<T: DeserializeOwned>(
218        &self,
219        path: &str,
220        params: &[(&str, &str)],
221    ) -> Result<T, IndodaxError> {
222        let url = format!("{}{}", PUBLIC_BASE_URL, path);
223        let resp = self.retry_get_with_params(&url, params).await?;
224        self.handle_response(resp).await
225    }
226
227    pub async fn private_post_v1<T: DeserializeOwned>(
228        &self,
229        method: &str,
230        params: &HashMap<String, String>,
231    ) -> Result<T, IndodaxError> {
232        let signer = self.signer.as_ref().ok_or_else(|| {
233            IndodaxError::Config("API credentials required for private endpoints".into())
234        })?;
235
236        let mut full_params: BTreeMap<String, String> = params
237            .iter()
238            .map(|(k, v)| (k.clone(), v.clone()))
239            .collect();
240        
241        full_params.insert("method".into(), method.to_string());
242        full_params.insert("nonce".into(), signer.next_nonce_str());
243
244        let body = serde_urlencoded_str(&full_params);
245        let (_, signature) = signer.sign_v1(&body)?;
246
247        let resp = self
248            .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
249            .await?;
250
251        let body_text = resp.text().await?;
252        let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
253            IndodaxError::Parse(format!(
254                "Failed to parse response: {} (body: {})",
255                e, body_text
256            ))
257        })?;
258
259        if envelope.success == 1 {
260            envelope.return_data.ok_or_else(|| {
261                IndodaxError::Parse("API returned success but no 'return' data".into())
262            })
263        } else {
264            Err(IndodaxError::api(
265                envelope.error.unwrap_or_else(|| "Unknown error".into()),
266                match envelope.error_code.as_deref() {
267                    Some("invalid_credentials") => ErrorCategory::Authentication,
268                    Some("rate_limit") => ErrorCategory::RateLimit,
269                    Some(c) if c.contains("invalid") => ErrorCategory::Validation,
270                    _ => ErrorCategory::Unknown,
271                },
272                envelope.error_code,
273            ))
274        }
275    }
276
277    pub async fn private_get_v2<T: DeserializeOwned>(
278        &self,
279        path: &str,
280        params: &HashMap<String, String>,
281    ) -> Result<T, IndodaxError> {
282        let signer = self.signer.as_ref().ok_or_else(|| {
283            IndodaxError::Config("API credentials required for private endpoints".into())
284        })?;
285
286        let mut qs_parts: Vec<String> = params
287            .iter()
288            .map(|(k, v)| format!("{}={}", k, v))
289            .collect();
290        let timestamp = Signer::now_millis();
291        qs_parts.push(format!("timestamp={}", timestamp));
292        qs_parts.push("recvWindow=5000".to_string());
293        qs_parts.sort();
294        let query_string = qs_parts.join("&");
295
296        let signature = signer.sign_v2(&query_string, timestamp)?;
297        let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
298
299        let req = self
300            .http
301            .get(&url)
302            .header("X-APIKEY", signer.api_key())
303            .header("Sign", &signature)
304            .header("Accept", "application/json")
305            .header("Content-Type", "application/json");
306        let resp = self.send_with_retry(req).await?;
307
308        let body_text = resp.text().await?;
309        let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
310            IndodaxError::Parse(format!(
311                "Failed to parse v2 response: {} (body: {})",
312                e, body_text
313            ))
314        })?;
315
316        if let Some(data) = envelope.data {
317            Ok(data)
318        } else if let Some(error) = envelope.error {
319            Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
320        } else {
321            Ok(serde_json::from_str(&body_text)?)
322        }
323    }
324
325    async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
326        let req = self.http.get(url);
327        self.send_with_retry(req).await
328    }
329
330    async fn retry_get_with_params(
331        &self,
332        url: &str,
333        params: &[(&str, &str)],
334    ) -> Result<Response, IndodaxError> {
335        let req = self.http.get(url).query(params);
336        self.send_with_retry(req).await
337    }
338
339    async fn retry_post(
340        &self,
341        url: &str,
342        body: &str,
343        api_key: &str,
344        signature: &str,
345    ) -> Result<Response, IndodaxError> {
346        let req = self
347            .http
348            .post(url)
349            .header("Key", api_key)
350            .header("Sign", signature)
351            .header("Content-Type", "application/x-www-form-urlencoded")
352            .body(body.to_string());
353        self.send_with_retry(req).await
354    }
355
356    async fn send_with_retry(
357        &self,
358        builder: RequestBuilder,
359    ) -> Result<Response, IndodaxError> {
360        self.rate_limiter.acquire().await;
361        let mut last_err = None;
362
363        for attempt in 0..=MAX_RETRIES {
364            if attempt > 0 {
365                tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(attempt - 1))).await;
366            }
367
368            let req = builder
369                .try_clone()
370                .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
371
372            match req.send().await {
373                Ok(resp) => {
374                    let status = resp.status();
375                    if status.is_success() {
376                        return Ok(resp);
377                    }
378
379                    if status == StatusCode::TOO_MANY_REQUESTS {
380                        last_err = Some(IndodaxError::api(
381                            format!("Rate limited (HTTP {})", status.as_u16()),
382                            ErrorCategory::RateLimit,
383                            None,
384                        ));
385                        continue;
386                    }
387
388                    if status.is_server_error() {
389                        last_err = Some(IndodaxError::api(
390                            format!("Server error (HTTP {})", status.as_u16()),
391                            ErrorCategory::Server,
392                            None,
393                        ));
394                        continue;
395                    }
396
397                    last_err = Some(IndodaxError::api(
398                        format!("HTTP {}", status.as_u16()),
399                        ErrorCategory::Unknown,
400                        None,
401                    ));
402                    break;
403                }
404                Err(e) => {
405                    if e.is_timeout() || e.is_connect() {
406                        last_err = Some(IndodaxError::Http(e));
407                        continue;
408                    }
409                    return Err(IndodaxError::Http(e));
410                }
411            }
412        }
413
414        Err(last_err.unwrap_or_else(|| {
415            IndodaxError::Other("Max retries exceeded".into())
416        }))
417    }
418
419    async fn handle_response<T: DeserializeOwned>(
420        &self,
421        resp: Response,
422    ) -> Result<T, IndodaxError> {
423        let body_text = resp.text().await?;
424        serde_json::from_str(&body_text).map_err(|e| {
425            IndodaxError::Parse(format!(
426                "Failed to parse response: {} (body: {})",
427                e, body_text
428            ))
429        })
430    }
431}
432
433fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
434    params
435        .iter()
436        .map(|(k, v)| {
437            format!(
438                "{}={}",
439                url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
440                url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
441            )
442        })
443        .collect::<Vec<_>>()
444        .join("&")
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450    use crate::auth::Signer;
451
452    #[test]
453    fn test_indodax_client_new_with_signer() {
454        let signer = Signer::new("key", "secret");
455        let client = IndodaxClient::new(Some(signer)).unwrap();
456        assert!(client.signer().is_some());
457    }
458
459    #[test]
460    fn test_indodax_client_new_without_signer() {
461        let client = IndodaxClient::new(None).unwrap();
462        assert!(client.signer().is_none());
463    }
464
465    #[test]
466    fn test_indodax_client_signer() {
467        let signer = Signer::new("mykey", "mysecret");
468        let client = IndodaxClient::new(Some(signer)).unwrap();
469        let s = client.signer().unwrap();
470        assert_eq!(s.api_key(), "mykey");
471    }
472
473    #[test]
474    fn test_indodax_v1_response_success() {
475        let json = serde_json::json!({
476            "success": 1,
477            "return": {"balance": {"btc": "1.0"}},
478            "error": null,
479            "error_code": null
480        });
481        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
482        assert_eq!(resp.success, 1);
483        assert!(resp.return_data.is_some());
484        assert!(resp.error.is_none());
485    }
486
487    #[test]
488    fn test_indodax_v1_response_failure() {
489        let json = serde_json::json!({
490            "success": 0,
491            "return": null,
492            "error": "Invalid credentials",
493            "error_code": "invalid_credentials"
494        });
495        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
496        assert_eq!(resp.success, 0);
497        assert!(resp.return_data.is_none());
498        assert!(resp.error.is_some());
499        assert!(resp.error_code.is_some());
500    }
501
502    #[test]
503    fn test_indodax_v2_response_success() {
504        let json = serde_json::json!({
505            "data": {"name": "test"},
506            "code": null,
507            "error": null
508        });
509        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
510        assert!(resp.data.is_some());
511        assert!(resp.error.is_none());
512    }
513
514    #[test]
515    fn test_indodax_v2_response_error() {
516        let json = serde_json::json!({
517            "data": null,
518            "code": 400,
519            "error": "Bad request"
520        });
521        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
522        assert!(resp.data.is_none());
523        assert!(resp.error.is_some());
524        assert!(resp.code.is_some());
525    }
526
527    #[test]
528    fn test_serde_urlencoded_str_single() {
529        let mut params = std::collections::BTreeMap::new();
530        params.insert("method".into(), "getInfo".into());
531        params.insert("nonce".into(), "12345".into());
532        
533        let result = serde_urlencoded_str(&params);
534        assert!(result.contains("method=getInfo"));
535        assert!(result.contains("nonce=12345"));
536    }
537
538    #[test]
539    fn test_serde_urlencoded_str_empty() {
540        let params = std::collections::BTreeMap::new();
541        let result = serde_urlencoded_str(&params);
542        assert_eq!(result, "");
543    }
544
545    #[test]
546    fn test_serde_urlencoded_str_special_chars() {
547        let mut params = std::collections::BTreeMap::new();
548        params.insert("key with space".into(), "value&more".into());
549        
550        let result = serde_urlencoded_str(&params);
551        // Should be URL encoded
552        assert!(result.contains("%20") || result.contains("+"));
553    }
554
555    #[test]
556    fn test_public_base_url() {
557        assert!(PUBLIC_BASE_URL.contains("indodax.com"));
558    }
559
560    #[test]
561    fn test_private_v1_url() {
562        assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
563    }
564
565    #[test]
566    fn test_private_v2_base() {
567        assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
568    }
569
570    #[test]
571    fn test_max_retries_constant() {
572        assert_eq!(MAX_RETRIES, 3);
573    }
574
575    #[test]
576    fn test_indodax_v1_response_debug() {
577        let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
578            success: 1,
579            return_data: Some(serde_json::json!({})),
580            error: None,
581            error_code: None,
582        };
583        let debug_str = format!("{:?}", resp);
584        assert!(debug_str.contains("success"));
585    }
586
587    #[test]
588    fn test_indodax_v2_response_debug() {
589        let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
590            data: Some(serde_json::json!({})),
591            code: None,
592            error: None,
593        };
594        let debug_str = format!("{:?}", resp);
595        assert!(debug_str.contains("data"));
596    }
597
598    #[test]
599    fn test_rate_limiter_from_env_default() {
600        // Without env var, should default to 10
601        let rl = RateLimiter::from_env();
602        // If INDODAX_RATE_LIMIT is set in environment, test may fail
603        // so we just verify it doesn't panic
604        assert!(rl.capacity > 0);
605        assert!(rl.refill_per_sec > 0);
606    }
607
608    #[tokio::test]
609    async fn test_rate_limiter_acquire_single() {
610        let rl = RateLimiter::new(5, 5);
611        rl.acquire().await;
612        let state = rl.state.lock().await;
613        assert_eq!(state.tokens, 4);
614    }
615
616    #[tokio::test]
617    async fn test_rate_limiter_token_exhaustion_refills() {
618        let rl = RateLimiter::new(3, 10);
619        for _ in 0..3 {
620            rl.acquire().await;
621        }
622        {
623            let state = rl.state.lock().await;
624            assert_eq!(state.tokens, 0);
625        }
626
627        {
628            let mut state = rl.state.lock().await;
629            state.last_refill = Instant::now() - Duration::from_secs(1);
630        }
631
632        rl.acquire().await;
633        let state = rl.state.lock().await;
634        assert_eq!(state.tokens, 2);
635    }
636
637    #[tokio::test]
638    async fn test_rate_limiter_refill_capped_at_capacity() {
639        let rl = RateLimiter::new(5, 100);
640        for _ in 0..5 {
641            rl.acquire().await;
642        }
643        {
644            let state = rl.state.lock().await;
645            assert_eq!(state.tokens, 0);
646        }
647
648        {
649            let mut state = rl.state.lock().await;
650            state.last_refill = Instant::now() - Duration::from_secs(10);
651        }
652
653        rl.acquire().await;
654        let state = rl.state.lock().await;
655        assert_eq!(state.tokens, 4);
656    }
657
658    #[test]
659    fn test_rate_limiter_new_custom() {
660        let rl = RateLimiter::new(25, 25);
661        assert_eq!(rl.capacity, 25);
662        assert_eq!(rl.refill_per_sec, 25);
663    }
664}