Skip to main content

indodax_cli/
client.rs

1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18    tokens: u64,
19    last_refill: Instant,
20}
21
22/// Token-bucket rate limiter for proactive 429 avoidance.
23#[derive(Debug)]
24struct RateLimiter {
25    capacity: u64,
26    refill_per_sec: u64,
27    state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31    fn new(capacity: u64, refill_per_sec: u64) -> Self {
32        Self {
33            capacity,
34            refill_per_sec,
35            state: Mutex::new(RateLimiterState {
36                tokens: capacity,
37                last_refill: Instant::now(),
38            }),
39        }
40    }
41
42    fn from_env() -> Self {
43        let rps = std::env::var("INDODAX_RATE_LIMIT")
44            .ok()
45            .and_then(|v| v.parse::<u64>().ok())
46            .unwrap_or(5)
47            .max(1);
48        Self::new(rps, rps)
49    }
50
51    async fn acquire(&self) {
52        loop {
53            let mut state = self.state.lock().await;
54            let elapsed = state.last_refill.elapsed();
55            if elapsed >= Duration::from_secs(1) {
56                let secs = elapsed.as_secs();
57                let add = self.refill_per_sec * secs;
58                state.tokens = state.tokens.saturating_add(add).min(self.capacity);
59                state.last_refill += Duration::from_secs(secs);
60            }
61            if state.tokens > 0 {
62                state.tokens -= 1;
63                return;
64            }
65            let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
66            let wait = if elapsed_ms < 1000 {
67                Duration::from_millis(1000 - elapsed_ms)
68            } else {
69                Duration::from_millis(50)
70            };
71            drop(state);
72            tokio::time::sleep(wait).await;
73        }
74    }
75}
76
77#[derive(Debug)]
78pub struct IndodaxClient {
79    http: Client,
80    signer: Option<Signer>,
81    rate_limiter: RateLimiter,
82}
83
84#[derive(Debug, serde::Deserialize)]
85pub struct IndodaxV1Response<T> {
86    pub success: i32,
87    #[serde(rename = "return")]
88    pub return_data: Option<T>,
89    pub error: Option<String>,
90    pub error_code: Option<String>,
91}
92
93#[derive(Debug, serde::Deserialize)]
94pub struct IndodaxV2Response<T> {
95    pub data: Option<T>,
96    pub code: Option<i64>,
97    pub error: Option<String>,
98}
99
100impl IndodaxClient {
101    pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
102        let http = Client::builder()
103            .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
104            .timeout(Duration::from_secs(30))
105            .pool_max_idle_per_host(2)
106            .build()
107            .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
108
109        Ok(Self {
110            http,
111            signer,
112            rate_limiter: RateLimiter::from_env(),
113        })
114    }
115
116    pub fn signer(&self) -> Option<&Signer> {
117        self.signer.as_ref()
118    }
119
120    pub fn http_client(&self) -> &Client {
121        &self.http
122    }
123
124    pub async fn public_get<T: DeserializeOwned>(
125        &self,
126        path: &str,
127    ) -> Result<T, IndodaxError> {
128        let url = format!("{}{}", PUBLIC_BASE_URL, path);
129        let resp = self.retry_get(&url).await?;
130        self.handle_response(resp).await
131    }
132
133    pub async fn countdown_cancel_all(
134        &self,
135        pair: Option<&str>,
136        countdown_time: u64,
137    ) -> Result<serde_json::Value, IndodaxError> {
138        let signer = self.signer.as_ref().ok_or_else(|| {
139            IndodaxError::Config("API credentials required for countdown cancel all".into())
140        })?;
141
142        let mut body_parts: Vec<String> = vec![
143            format!("countdownTime={}", countdown_time),
144        ];
145        if let Some(p) = pair {
146            body_parts.push(format!("pair={}", p));
147        }
148
149        let body = body_parts.join("&");
150        let (payload, signature) = signer.sign_v1(&body)?;
151
152        let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
153        let req = self
154            .http
155            .post(&url)
156            .header("Key", signer.api_key())
157            .header("Sign", &signature)
158            .header("Content-Type", "application/x-www-form-urlencoded")
159            .body(payload);
160        let resp = self.send_with_retry(req).await?;
161        self.handle_v1_response(resp).await
162    }
163
164    pub async fn generate_ws_token(&self) -> Result<String, IndodaxError> {
165        let signer = self.signer.as_ref().ok_or_else(|| {
166            IndodaxError::Config("API credentials required for WebSocket token generation".into())
167        })?;
168
169        let nonce = signer.next_nonce_str();
170        let (_, signature) = signer.sign_v1(&nonce)?;
171
172        let req = self
173            .http
174            .post(WS_TOKEN_URL)
175            .header("Key", signer.api_key())
176            .header("Sign", &signature)
177            .header("Content-Type", "application/x-www-form-urlencoded")
178            .body(format!("nonce={}", nonce));
179        let resp = self.send_with_retry(req).await?;
180
181        let body_text = resp.text().await?;
182        let val: serde_json::Value = serde_json::from_str(&body_text)?;
183
184        val.get("token")
185            .and_then(|t| t.as_str())
186            .map(|t| t.to_string())
187            .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()).map(|t| t.to_string()))
188            .ok_or_else(|| IndodaxError::WsToken(format!("No token in response: {}", body_text)))
189    }
190
191    pub async fn public_get_v2<T: DeserializeOwned>(
192        &self,
193        path: &str,
194        params: &[(&str, &str)],
195    ) -> Result<T, IndodaxError> {
196        let url = format!("{}{}", PUBLIC_BASE_URL, path);
197        let resp = self.retry_get_with_params(&url, params).await?;
198        self.handle_response(resp).await
199    }
200
201    async fn handle_v1_response<T: DeserializeOwned>(
202        &self,
203        resp: Response,
204    ) -> Result<T, IndodaxError> {
205        let body_text = resp.text().await?;
206        let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
207            IndodaxError::Parse(format!(
208                "Failed to parse response: {} (body: {})",
209                e, body_text
210            ))
211        })?;
212
213        if envelope.success == 1 {
214            envelope.return_data.ok_or_else(|| {
215                IndodaxError::Parse("API returned success but no 'return' data".into())
216            })
217        } else {
218            Err(IndodaxError::api(
219                envelope.error.unwrap_or_else(|| "Unknown error".into()),
220                match envelope.error_code.as_deref() {
221                    Some("invalid_credentials") => ErrorCategory::Authentication,
222                    Some("rate_limit") => ErrorCategory::RateLimit,
223                    Some(c) if c.contains("invalid") => ErrorCategory::Validation,
224                    _ => ErrorCategory::Unknown,
225                },
226                envelope.error_code,
227            ))
228        }
229    }
230
231    pub async fn private_post_v1<T: DeserializeOwned>(
232        &self,
233        method: &str,
234        params: &HashMap<String, String>,
235    ) -> Result<T, IndodaxError> {
236        let signer = self.signer.as_ref().ok_or_else(|| {
237            IndodaxError::Config("API credentials required for private endpoints".into())
238        })?;
239
240        let mut full_params: BTreeMap<String, String> = params
241            .iter()
242            .map(|(k, v)| (k.clone(), v.clone()))
243            .collect();
244        
245        full_params.insert("method".into(), method.to_string());
246        full_params.insert("nonce".into(), signer.next_nonce_str());
247
248        let body = serde_urlencoded_str(&full_params);
249        let (_, signature) = signer.sign_v1(&body)?;
250
251        let resp = self
252            .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
253            .await?;
254
255        self.handle_v1_response(resp).await
256    }
257
258    pub async fn private_get_v2<T: DeserializeOwned>(
259        &self,
260        path: &str,
261        params: &HashMap<String, String>,
262    ) -> Result<T, IndodaxError> {
263        let signer = self.signer.as_ref().ok_or_else(|| {
264            IndodaxError::Config("API credentials required for private endpoints".into())
265        })?;
266
267        let mut qs_parts: Vec<String> = params
268            .iter()
269            .map(|(k, v)| format!("{}={}", k, v))
270            .collect();
271        let timestamp = Signer::now_millis();
272        qs_parts.push(format!("timestamp={}", timestamp));
273        qs_parts.push("recvWindow=5000".to_string());
274        qs_parts.sort();
275        let query_string = qs_parts.join("&");
276
277        let signature = signer.sign_v2(&query_string, timestamp)?;
278        let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
279
280        let req = self
281            .http
282            .get(&url)
283            .header("X-APIKEY", signer.api_key())
284            .header("Sign", &signature)
285            .header("Accept", "application/json")
286            .header("Content-Type", "application/json");
287        let resp = self.send_with_retry(req).await?;
288
289        let body_text = resp.text().await?;
290        let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
291            IndodaxError::Parse(format!(
292                "Failed to parse v2 response: {} (body: {})",
293                e, body_text
294            ))
295        })?;
296
297        if let Some(data) = envelope.data {
298            Ok(data)
299        } else if let Some(error) = envelope.error {
300            Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
301        } else {
302            Ok(serde_json::from_str(&body_text)?)
303        }
304    }
305
306    async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
307        let req = self.http.get(url);
308        self.send_with_retry(req).await
309    }
310
311    async fn retry_get_with_params(
312        &self,
313        url: &str,
314        params: &[(&str, &str)],
315    ) -> Result<Response, IndodaxError> {
316        let req = self.http.get(url).query(params);
317        self.send_with_retry(req).await
318    }
319
320    async fn retry_post(
321        &self,
322        url: &str,
323        body: &str,
324        api_key: &str,
325        signature: &str,
326    ) -> Result<Response, IndodaxError> {
327        let req = self
328            .http
329            .post(url)
330            .header("Key", api_key)
331            .header("Sign", signature)
332            .header("Content-Type", "application/x-www-form-urlencoded")
333            .body(body.to_string());
334        self.send_with_retry(req).await
335    }
336
337    async fn send_with_retry(
338        &self,
339        builder: RequestBuilder,
340    ) -> Result<Response, IndodaxError> {
341        self.rate_limiter.acquire().await;
342        let mut last_err = None;
343
344        for attempt in 0..=MAX_RETRIES {
345            if attempt > 0 {
346                tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(attempt - 1))).await;
347            }
348
349            let req = builder
350                .try_clone()
351                .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
352
353            match req.send().await {
354                Ok(resp) => {
355                    let status = resp.status();
356                    if status.is_success() {
357                        return Ok(resp);
358                    }
359
360                    if status == StatusCode::TOO_MANY_REQUESTS {
361                        last_err = Some(IndodaxError::api(
362                            format!("Rate limited (HTTP {})", status.as_u16()),
363                            ErrorCategory::RateLimit,
364                            None,
365                        ));
366                        continue;
367                    }
368
369                    if status.is_server_error() {
370                        last_err = Some(IndodaxError::api(
371                            format!("Server error (HTTP {})", status.as_u16()),
372                            ErrorCategory::Server,
373                            None,
374                        ));
375                        continue;
376                    }
377
378                    last_err = Some(IndodaxError::api(
379                        format!("HTTP {}", status.as_u16()),
380                        ErrorCategory::Unknown,
381                        None,
382                    ));
383                    break;
384                }
385                Err(e) => {
386                    if e.is_timeout() || e.is_connect() {
387                        last_err = Some(IndodaxError::Http(e));
388                        continue;
389                    }
390                    return Err(IndodaxError::Http(e));
391                }
392            }
393        }
394
395        Err(last_err.unwrap_or_else(|| {
396            IndodaxError::Other("Max retries exceeded".into())
397        }))
398    }
399
400    async fn handle_response<T: DeserializeOwned>(
401        &self,
402        resp: Response,
403    ) -> Result<T, IndodaxError> {
404        let body_text = resp.text().await?;
405        serde_json::from_str(&body_text).map_err(|e| {
406            IndodaxError::Parse(format!(
407                "Failed to parse response: {} (body: {})",
408                e, body_text
409            ))
410        })
411    }
412}
413
414fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
415    params
416        .iter()
417        .map(|(k, v)| {
418            format!(
419                "{}={}",
420                url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
421                url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
422            )
423        })
424        .collect::<Vec<_>>()
425        .join("&")
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use crate::auth::Signer;
432
433    #[test]
434    fn test_indodax_client_new_with_signer() {
435        let signer = Signer::new("key", "secret");
436        let client = IndodaxClient::new(Some(signer)).unwrap();
437        assert!(client.signer().is_some());
438    }
439
440    #[test]
441    fn test_indodax_client_new_without_signer() {
442        let client = IndodaxClient::new(None).unwrap();
443        assert!(client.signer().is_none());
444    }
445
446    #[test]
447    fn test_indodax_client_signer() {
448        let signer = Signer::new("mykey", "mysecret");
449        let client = IndodaxClient::new(Some(signer)).unwrap();
450        let s = client.signer().unwrap();
451        assert_eq!(s.api_key(), "mykey");
452    }
453
454    #[test]
455    fn test_indodax_v1_response_success() {
456        let json = serde_json::json!({
457            "success": 1,
458            "return": {"balance": {"btc": "1.0"}},
459            "error": null,
460            "error_code": null
461        });
462        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
463        assert_eq!(resp.success, 1);
464        assert!(resp.return_data.is_some());
465        assert!(resp.error.is_none());
466    }
467
468    #[test]
469    fn test_indodax_v1_response_failure() {
470        let json = serde_json::json!({
471            "success": 0,
472            "return": null,
473            "error": "Invalid credentials",
474            "error_code": "invalid_credentials"
475        });
476        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
477        assert_eq!(resp.success, 0);
478        assert!(resp.return_data.is_none());
479        assert!(resp.error.is_some());
480        assert!(resp.error_code.is_some());
481    }
482
483    #[test]
484    fn test_indodax_v2_response_success() {
485        let json = serde_json::json!({
486            "data": {"name": "test"},
487            "code": null,
488            "error": null
489        });
490        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
491        assert!(resp.data.is_some());
492        assert!(resp.error.is_none());
493    }
494
495    #[test]
496    fn test_indodax_v2_response_error() {
497        let json = serde_json::json!({
498            "data": null,
499            "code": 400,
500            "error": "Bad request"
501        });
502        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
503        assert!(resp.data.is_none());
504        assert!(resp.error.is_some());
505        assert!(resp.code.is_some());
506    }
507
508    #[test]
509    fn test_serde_urlencoded_str_single() {
510        let mut params = std::collections::BTreeMap::new();
511        params.insert("method".into(), "getInfo".into());
512        params.insert("nonce".into(), "12345".into());
513        
514        let result = serde_urlencoded_str(&params);
515        assert!(result.contains("method=getInfo"));
516        assert!(result.contains("nonce=12345"));
517    }
518
519    #[test]
520    fn test_serde_urlencoded_str_empty() {
521        let params = std::collections::BTreeMap::new();
522        let result = serde_urlencoded_str(&params);
523        assert_eq!(result, "");
524    }
525
526    #[test]
527    fn test_serde_urlencoded_str_special_chars() {
528        let mut params = std::collections::BTreeMap::new();
529        params.insert("key with space".into(), "value&more".into());
530        
531        let result = serde_urlencoded_str(&params);
532        // Should be URL encoded
533        assert!(result.contains("%20") || result.contains("+"));
534    }
535
536    #[test]
537    fn test_public_base_url() {
538        assert!(PUBLIC_BASE_URL.contains("indodax.com"));
539    }
540
541    #[test]
542    fn test_private_v1_url() {
543        assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
544    }
545
546    #[test]
547    fn test_private_v2_base() {
548        assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
549    }
550
551    #[test]
552    fn test_max_retries_constant() {
553        assert_eq!(MAX_RETRIES, 3);
554    }
555
556    #[test]
557    fn test_indodax_v1_response_debug() {
558        let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
559            success: 1,
560            return_data: Some(serde_json::json!({})),
561            error: None,
562            error_code: None,
563        };
564        let debug_str = format!("{:?}", resp);
565        assert!(debug_str.contains("success"));
566    }
567
568    #[test]
569    fn test_indodax_v2_response_debug() {
570        let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
571            data: Some(serde_json::json!({})),
572            code: None,
573            error: None,
574        };
575        let debug_str = format!("{:?}", resp);
576        assert!(debug_str.contains("data"));
577    }
578
579    #[test]
580    fn test_rate_limiter_from_env_default() {
581        // Without env var, should default to 10
582        let rl = RateLimiter::from_env();
583        // If INDODAX_RATE_LIMIT is set in environment, test may fail
584        // so we just verify it doesn't panic
585        assert!(rl.capacity > 0);
586        assert!(rl.refill_per_sec > 0);
587    }
588
589    #[tokio::test]
590    async fn test_rate_limiter_acquire_single() {
591        let rl = RateLimiter::new(5, 5);
592        rl.acquire().await;
593        let state = rl.state.lock().await;
594        assert_eq!(state.tokens, 4);
595    }
596
597    #[tokio::test]
598    async fn test_rate_limiter_token_exhaustion_refills() {
599        let rl = RateLimiter::new(3, 10);
600        for _ in 0..3 {
601            rl.acquire().await;
602        }
603        {
604            let state = rl.state.lock().await;
605            assert_eq!(state.tokens, 0);
606        }
607
608        {
609            let mut state = rl.state.lock().await;
610            state.last_refill = Instant::now() - Duration::from_secs(1);
611        }
612
613        rl.acquire().await;
614        let state = rl.state.lock().await;
615        assert_eq!(state.tokens, 2);
616    }
617
618    #[tokio::test]
619    async fn test_rate_limiter_refill_capped_at_capacity() {
620        let rl = RateLimiter::new(5, 100);
621        for _ in 0..5 {
622            rl.acquire().await;
623        }
624        {
625            let state = rl.state.lock().await;
626            assert_eq!(state.tokens, 0);
627        }
628
629        {
630            let mut state = rl.state.lock().await;
631            state.last_refill = Instant::now() - Duration::from_secs(10);
632        }
633
634        rl.acquire().await;
635        let state = rl.state.lock().await;
636        assert_eq!(state.tokens, 4);
637    }
638
639    #[test]
640    fn test_rate_limiter_new_custom() {
641        let rl = RateLimiter::new(25, 25);
642        assert_eq!(rl.capacity, 25);
643        assert_eq!(rl.refill_per_sec, 25);
644    }
645}