Skip to main content

indodax_cli/
client.rs

1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18    tokens: u64,
19    last_refill: Instant,
20}
21
22/// Token-bucket rate limiter for proactive 429 avoidance.
23#[derive(Debug)]
24struct RateLimiter {
25    capacity: u64,
26    refill_per_sec: u64,
27    state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31    fn new(capacity: u64, refill_per_sec: u64) -> Self {
32        Self {
33            capacity,
34            refill_per_sec,
35            state: Mutex::new(RateLimiterState {
36                tokens: capacity,
37                last_refill: Instant::now(),
38            }),
39        }
40    }
41
42    fn from_env() -> Self {
43        let rps = std::env::var("INDODAX_RATE_LIMIT")
44            .ok()
45            .and_then(|v| v.parse::<u64>().ok())
46            .unwrap_or(5)
47            .max(1);
48        Self::new(rps, rps)
49    }
50
51    async fn acquire(&self) {
52        loop {
53            let mut state = self.state.lock().await;
54            let elapsed = state.last_refill.elapsed();
55            if elapsed >= Duration::from_secs(1) {
56                let secs = elapsed.as_secs();
57                let add = self.refill_per_sec * secs;
58                state.tokens = state.tokens.saturating_add(add).min(self.capacity);
59                state.last_refill += Duration::from_secs(secs);
60            }
61            if state.tokens > 0 {
62                state.tokens -= 1;
63                return;
64            }
65            let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
66            let wait = if elapsed_ms < 1000 {
67                Duration::from_millis(1000 - elapsed_ms)
68            } else {
69                Duration::from_millis(50)
70            };
71            drop(state);
72            tokio::time::sleep(wait).await;
73        }
74    }
75}
76
77#[derive(Debug)]
78pub struct IndodaxClient {
79    http: Client,
80    signer: Option<Signer>,
81    rate_limiter: RateLimiter,
82    ws_token: Option<String>,
83}
84
85#[derive(Debug, serde::Deserialize)]
86pub struct IndodaxV1Response<T> {
87    pub success: i32,
88    #[serde(rename = "return")]
89    pub return_data: Option<T>,
90    pub error: Option<String>,
91    pub error_code: Option<String>,
92}
93
94#[derive(Debug, serde::Deserialize)]
95pub struct IndodaxV2Response<T> {
96    pub data: Option<T>,
97    pub code: Option<i64>,
98    pub error: Option<String>,
99}
100
101impl IndodaxClient {
102    pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
103        let http = Client::builder()
104            .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
105            .timeout(Duration::from_secs(30))
106            .pool_max_idle_per_host(2)
107            .build()
108            .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
109
110        Ok(Self {
111            http,
112            signer,
113            rate_limiter: RateLimiter::from_env(),
114            ws_token: None,
115        })
116    }
117
118    pub fn with_ws_token(mut self, token: Option<String>) -> Self {
119        self.ws_token = token;
120        self
121    }
122
123    pub fn signer(&self) -> Option<&Signer> {
124        self.signer.as_ref()
125    }
126
127    pub fn ws_token(&self) -> Option<&str> {
128        self.ws_token.as_deref()
129    }
130
131    pub fn http_client(&self) -> &Client {
132        &self.http
133    }
134
135    pub async fn public_get<T: DeserializeOwned>(
136        &self,
137        path: &str,
138    ) -> Result<T, IndodaxError> {
139        let url = format!("{}{}", PUBLIC_BASE_URL, path);
140        let resp = self.retry_get(&url).await?;
141        self.handle_response(resp).await
142    }
143
144    pub async fn countdown_cancel_all(
145        &self,
146        pair: Option<&str>,
147        countdown_time: u64,
148    ) -> Result<serde_json::Value, IndodaxError> {
149        let signer = self.signer.as_ref().ok_or_else(|| {
150            IndodaxError::Config("API credentials required for countdown cancel all".into())
151        })?;
152
153        let mut body_parts: Vec<String> = vec![
154            format!("countdownTime={}", countdown_time),
155        ];
156        if let Some(p) = pair {
157            body_parts.push(format!("pair={}", p));
158        }
159
160        let body = body_parts.join("&");
161        let (payload, signature) = signer.sign_v1(&body)?;
162
163        let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
164        let req = self
165            .http
166            .post(&url)
167            .header("Key", signer.api_key())
168            .header("Sign", &signature)
169            .header("Content-Type", "application/x-www-form-urlencoded")
170            .body(payload);
171        let resp = self.send_with_retry(req).await?;
172        self.handle_v1_response(resp).await
173    }
174
175    pub async fn generate_ws_token(&self) -> Result<(String, String), IndodaxError> {
176        let signer = self.signer.as_ref().ok_or_else(|| {
177            IndodaxError::Config("API credentials required for WebSocket token generation".into())
178        })?;
179
180        let nonce = signer.next_nonce_str();
181        let (_, signature) = signer.sign_v1(&nonce)?;
182
183        let req = self
184            .http
185            .post(WS_TOKEN_URL)
186            .header("Key", signer.api_key())
187            .header("Sign", &signature)
188            .header("Content-Type", "application/x-www-form-urlencoded")
189            .body(format!("nonce={}", nonce));
190        let resp = self.send_with_retry(req).await?;
191
192        let body_text = resp.text().await?;
193        let val: serde_json::Value = serde_json::from_str(&body_text)?;
194
195        let token = val.get("token")
196            .and_then(|t| t.as_str())
197            .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()))
198            .or_else(|| val.get("return").and_then(|r| r.get("connToken")).and_then(|t| t.as_str()))
199            .map(|t| t.to_string());
200
201        let channel = val.get("channel")
202            .and_then(|c| c.as_str())
203            .or_else(|| val.get("data").and_then(|d| d.get("channel")).and_then(|c| c.as_str()))
204            .or_else(|| val.get("return").and_then(|r| r.get("channel")).and_then(|c| c.as_str()))
205            .map(|c| c.to_string());
206
207        match (token, channel) {
208            (Some(t), Some(c)) => Ok((t, c)),
209            (Some(t), None) => Ok((t, "private:orders".to_string())), // Fallback channel
210            _ => Err(IndodaxError::WsToken(format!("No token or channel in response: {}", body_text))),
211        }
212    }
213
214    pub async fn public_get_v2<T: DeserializeOwned>(
215        &self,
216        path: &str,
217        params: &[(&str, &str)],
218    ) -> Result<T, IndodaxError> {
219        let url = format!("{}{}", PUBLIC_BASE_URL, path);
220        let resp = self.retry_get_with_params(&url, params).await?;
221        self.handle_response(resp).await
222    }
223
224    async fn handle_v1_response<T: DeserializeOwned>(
225        &self,
226        resp: Response,
227    ) -> Result<T, IndodaxError> {
228        let body_text = resp.text().await?;
229        let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
230            IndodaxError::Parse(format!(
231                "Failed to parse response: {} (body: {})",
232                e, body_text
233            ))
234        })?;
235
236        if envelope.success == 1 {
237            envelope.return_data.ok_or_else(|| {
238                IndodaxError::Parse("API returned success but no 'return' data".into())
239            })
240        } else {
241            Err(IndodaxError::api(
242                envelope.error.unwrap_or_else(|| "Unknown error".into()),
243                match envelope.error_code.as_deref() {
244                    Some("invalid_credentials") => ErrorCategory::Authentication,
245                    Some("rate_limit") => ErrorCategory::RateLimit,
246                    Some(c) if c.contains("invalid") => ErrorCategory::Validation,
247                    _ => ErrorCategory::Unknown,
248                },
249                envelope.error_code,
250            ))
251        }
252    }
253
254    pub async fn private_post_v1<T: DeserializeOwned>(
255        &self,
256        method: &str,
257        params: &HashMap<String, String>,
258    ) -> Result<T, IndodaxError> {
259        let signer = self.signer.as_ref().ok_or_else(|| {
260            IndodaxError::Config("API credentials required for private endpoints".into())
261        })?;
262
263        let mut full_params: BTreeMap<String, String> = params
264            .iter()
265            .map(|(k, v)| (k.clone(), v.clone()))
266            .collect();
267        
268        full_params.insert("method".into(), method.to_string());
269        full_params.insert("nonce".into(), signer.next_nonce_str());
270
271        let body = serde_urlencoded_str(&full_params);
272        let (_, signature) = signer.sign_v1(&body)?;
273
274        let resp = self
275            .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
276            .await?;
277
278        self.handle_v1_response(resp).await
279    }
280
281    pub async fn private_get_v2<T: DeserializeOwned>(
282        &self,
283        path: &str,
284        params: &HashMap<String, String>,
285    ) -> Result<T, IndodaxError> {
286        let signer = self.signer.as_ref().ok_or_else(|| {
287            IndodaxError::Config("API credentials required for private endpoints".into())
288        })?;
289
290        let mut qs_parts: Vec<String> = params
291            .iter()
292            .map(|(k, v)| format!("{}={}", k, v))
293            .collect();
294        let timestamp = Signer::now_millis();
295        qs_parts.push(format!("timestamp={}", timestamp));
296        qs_parts.push("recvWindow=5000".to_string());
297        qs_parts.sort();
298        let query_string = qs_parts.join("&");
299
300        let signature = signer.sign_v2(&query_string, timestamp)?;
301        let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
302
303        let req = self
304            .http
305            .get(&url)
306            .header("X-APIKEY", signer.api_key())
307            .header("Sign", &signature)
308            .header("Accept", "application/json")
309            .header("Content-Type", "application/json");
310        let resp = self.send_with_retry(req).await?;
311
312        let body_text = resp.text().await?;
313        let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
314            IndodaxError::Parse(format!(
315                "Failed to parse v2 response: {} (body: {})",
316                e, body_text
317            ))
318        })?;
319
320        if let Some(data) = envelope.data {
321            Ok(data)
322        } else if let Some(error) = envelope.error {
323            Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
324        } else {
325            Ok(serde_json::from_str(&body_text)?)
326        }
327    }
328
329    async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
330        let req = self.http.get(url);
331        self.send_with_retry(req).await
332    }
333
334    async fn retry_get_with_params(
335        &self,
336        url: &str,
337        params: &[(&str, &str)],
338    ) -> Result<Response, IndodaxError> {
339        let req = self.http.get(url).query(params);
340        self.send_with_retry(req).await
341    }
342
343    async fn retry_post(
344        &self,
345        url: &str,
346        body: &str,
347        api_key: &str,
348        signature: &str,
349    ) -> Result<Response, IndodaxError> {
350        let req = self
351            .http
352            .post(url)
353            .header("Key", api_key)
354            .header("Sign", signature)
355            .header("Content-Type", "application/x-www-form-urlencoded")
356            .body(body.to_string());
357        self.send_with_retry(req).await
358    }
359
360    async fn send_with_retry(
361        &self,
362        builder: RequestBuilder,
363    ) -> Result<Response, IndodaxError> {
364        self.rate_limiter.acquire().await;
365        let mut last_err = None;
366
367        for attempt in 0..=MAX_RETRIES {
368            if attempt > 0 {
369                tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(attempt - 1))).await;
370            }
371
372            let req = builder
373                .try_clone()
374                .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
375
376            match req.send().await {
377                Ok(resp) => {
378                    let status = resp.status();
379                    if status.is_success() {
380                        return Ok(resp);
381                    }
382
383                    if status == StatusCode::TOO_MANY_REQUESTS {
384                        last_err = Some(IndodaxError::api(
385                            format!("Rate limited (HTTP {})", status.as_u16()),
386                            ErrorCategory::RateLimit,
387                            None,
388                        ));
389                        continue;
390                    }
391
392                    if status.is_server_error() {
393                        last_err = Some(IndodaxError::api(
394                            format!("Server error (HTTP {})", status.as_u16()),
395                            ErrorCategory::Server,
396                            None,
397                        ));
398                        continue;
399                    }
400
401                    last_err = Some(IndodaxError::api(
402                        format!("HTTP {}", status.as_u16()),
403                        ErrorCategory::Unknown,
404                        None,
405                    ));
406                    break;
407                }
408                Err(e) => {
409                    if e.is_timeout() || e.is_connect() {
410                        last_err = Some(IndodaxError::Http(e));
411                        continue;
412                    }
413                    return Err(IndodaxError::Http(e));
414                }
415            }
416        }
417
418        Err(last_err.unwrap_or_else(|| {
419            IndodaxError::Other("Max retries exceeded".into())
420        }))
421    }
422
423    async fn handle_response<T: DeserializeOwned>(
424        &self,
425        resp: Response,
426    ) -> Result<T, IndodaxError> {
427        let body_text = resp.text().await?;
428        serde_json::from_str(&body_text).map_err(|e| {
429            IndodaxError::Parse(format!(
430                "Failed to parse response: {} (body: {})",
431                e, body_text
432            ))
433        })
434    }
435}
436
437fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
438    params
439        .iter()
440        .map(|(k, v)| {
441            format!(
442                "{}={}",
443                url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
444                url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
445            )
446        })
447        .collect::<Vec<_>>()
448        .join("&")
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::auth::Signer;
455
456    #[test]
457    fn test_indodax_client_new_with_signer() {
458        let signer = Signer::new("key", "secret");
459        let client = IndodaxClient::new(Some(signer)).unwrap();
460        assert!(client.signer().is_some());
461    }
462
463    #[test]
464    fn test_indodax_client_new_without_signer() {
465        let client = IndodaxClient::new(None).unwrap();
466        assert!(client.signer().is_none());
467    }
468
469    #[test]
470    fn test_indodax_client_signer() {
471        let signer = Signer::new("mykey", "mysecret");
472        let client = IndodaxClient::new(Some(signer)).unwrap();
473        let s = client.signer().unwrap();
474        assert_eq!(s.api_key(), "mykey");
475    }
476
477    #[test]
478    fn test_indodax_v1_response_success() {
479        let json = serde_json::json!({
480            "success": 1,
481            "return": {"balance": {"btc": "1.0"}},
482            "error": null,
483            "error_code": null
484        });
485        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
486        assert_eq!(resp.success, 1);
487        assert!(resp.return_data.is_some());
488        assert!(resp.error.is_none());
489    }
490
491    #[test]
492    fn test_indodax_v1_response_failure() {
493        let json = serde_json::json!({
494            "success": 0,
495            "return": null,
496            "error": "Invalid credentials",
497            "error_code": "invalid_credentials"
498        });
499        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
500        assert_eq!(resp.success, 0);
501        assert!(resp.return_data.is_none());
502        assert!(resp.error.is_some());
503        assert!(resp.error_code.is_some());
504    }
505
506    #[test]
507    fn test_indodax_v2_response_success() {
508        let json = serde_json::json!({
509            "data": {"name": "test"},
510            "code": null,
511            "error": null
512        });
513        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
514        assert!(resp.data.is_some());
515        assert!(resp.error.is_none());
516    }
517
518    #[test]
519    fn test_indodax_v2_response_error() {
520        let json = serde_json::json!({
521            "data": null,
522            "code": 400,
523            "error": "Bad request"
524        });
525        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
526        assert!(resp.data.is_none());
527        assert!(resp.error.is_some());
528        assert!(resp.code.is_some());
529    }
530
531    #[test]
532    fn test_serde_urlencoded_str_single() {
533        let mut params = std::collections::BTreeMap::new();
534        params.insert("method".into(), "getInfo".into());
535        params.insert("nonce".into(), "12345".into());
536        
537        let result = serde_urlencoded_str(&params);
538        assert!(result.contains("method=getInfo"));
539        assert!(result.contains("nonce=12345"));
540    }
541
542    #[test]
543    fn test_serde_urlencoded_str_empty() {
544        let params = std::collections::BTreeMap::new();
545        let result = serde_urlencoded_str(&params);
546        assert_eq!(result, "");
547    }
548
549    #[test]
550    fn test_serde_urlencoded_str_special_chars() {
551        let mut params = std::collections::BTreeMap::new();
552        params.insert("key with space".into(), "value&more".into());
553        
554        let result = serde_urlencoded_str(&params);
555        // Should be URL encoded
556        assert!(result.contains("%20") || result.contains("+"));
557    }
558
559    #[test]
560    fn test_public_base_url() {
561        assert!(PUBLIC_BASE_URL.contains("indodax.com"));
562    }
563
564    #[test]
565    fn test_private_v1_url() {
566        assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
567    }
568
569    #[test]
570    fn test_private_v2_base() {
571        assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
572    }
573
574    #[test]
575    fn test_max_retries_constant() {
576        assert_eq!(MAX_RETRIES, 3);
577    }
578
579    #[test]
580    fn test_indodax_v1_response_debug() {
581        let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
582            success: 1,
583            return_data: Some(serde_json::json!({})),
584            error: None,
585            error_code: None,
586        };
587        let debug_str = format!("{:?}", resp);
588        assert!(debug_str.contains("success"));
589    }
590
591    #[test]
592    fn test_indodax_v2_response_debug() {
593        let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
594            data: Some(serde_json::json!({})),
595            code: None,
596            error: None,
597        };
598        let debug_str = format!("{:?}", resp);
599        assert!(debug_str.contains("data"));
600    }
601
602    #[test]
603    fn test_rate_limiter_from_env_default() {
604        // Without env var, should default to 10
605        let rl = RateLimiter::from_env();
606        // If INDODAX_RATE_LIMIT is set in environment, test may fail
607        // so we just verify it doesn't panic
608        assert!(rl.capacity > 0);
609        assert!(rl.refill_per_sec > 0);
610    }
611
612    #[tokio::test]
613    async fn test_rate_limiter_acquire_single() {
614        let rl = RateLimiter::new(5, 5);
615        rl.acquire().await;
616        let state = rl.state.lock().await;
617        assert_eq!(state.tokens, 4);
618    }
619
620    #[tokio::test]
621    async fn test_rate_limiter_token_exhaustion_refills() {
622        let rl = RateLimiter::new(3, 10);
623        for _ in 0..3 {
624            rl.acquire().await;
625        }
626        {
627            let state = rl.state.lock().await;
628            assert_eq!(state.tokens, 0);
629        }
630
631        {
632            let mut state = rl.state.lock().await;
633            state.last_refill = Instant::now() - Duration::from_secs(1);
634        }
635
636        rl.acquire().await;
637        let state = rl.state.lock().await;
638        assert_eq!(state.tokens, 2);
639    }
640
641    #[tokio::test]
642    async fn test_rate_limiter_refill_capped_at_capacity() {
643        let rl = RateLimiter::new(5, 100);
644        for _ in 0..5 {
645            rl.acquire().await;
646        }
647        {
648            let state = rl.state.lock().await;
649            assert_eq!(state.tokens, 0);
650        }
651
652        {
653            let mut state = rl.state.lock().await;
654            state.last_refill = Instant::now() - Duration::from_secs(10);
655        }
656
657        rl.acquire().await;
658        let state = rl.state.lock().await;
659        assert_eq!(state.tokens, 4);
660    }
661
662    #[test]
663    fn test_rate_limiter_new_custom() {
664        let rl = RateLimiter::new(25, 25);
665        assert_eq!(rl.capacity, 25);
666        assert_eq!(rl.refill_per_sec, 25);
667    }
668}