Skip to main content

indodax_cli/
client.rs

1use crate::auth::Signer;
2use crate::errors::{ErrorCategory, IndodaxError};
3use reqwest::{Client, RequestBuilder, Response, StatusCode};
4use serde::de::DeserializeOwned;
5use std::collections::{HashMap, BTreeMap};
6use tokio::sync::Mutex;
7
8use std::time::{Duration, Instant};
9
10const PUBLIC_BASE_URL: &str = "https://indodax.com";
11const PRIVATE_V1_URL: &str = "https://indodax.com/tapi";
12const PRIVATE_V2_BASE: &str = "https://tapi.btcapi.net";
13const WS_TOKEN_URL: &str = "https://indodax.com/api/private_ws/v1/generate_token";
14const MAX_RETRIES: u32 = 3;
15
16#[derive(Debug)]
17struct RateLimiterState {
18    tokens: u64,
19    last_refill: Instant,
20}
21
22/// Token-bucket rate limiter for proactive 429 avoidance.
23#[derive(Debug)]
24struct RateLimiter {
25    capacity: u64,
26    refill_per_sec: u64,
27    state: Mutex<RateLimiterState>,
28}
29
30impl RateLimiter {
31    fn new(capacity: u64, refill_per_sec: u64) -> Self {
32        Self {
33            capacity,
34            refill_per_sec,
35            state: Mutex::new(RateLimiterState {
36                tokens: capacity,
37                last_refill: Instant::now(),
38            }),
39        }
40    }
41
42    fn from_env() -> Self {
43        let rps = std::env::var("INDODAX_RATE_LIMIT")
44            .ok()
45            .and_then(|v| v.parse::<u64>().ok())
46            .unwrap_or(5)
47            .max(1);
48        Self::new(rps, rps)
49    }
50
51    async fn acquire(&self) {
52        loop {
53            let mut state = self.state.lock().await;
54            let elapsed = state.last_refill.elapsed();
55            if elapsed >= Duration::from_secs(1) {
56                let secs = elapsed.as_secs();
57                let capped = secs.min(60); // cap to 60s to guard against clock jumps
58                let add = self.refill_per_sec * capped;
59                state.tokens = state.tokens.saturating_add(add).min(self.capacity);
60                state.last_refill += Duration::from_secs(capped);
61            }
62            if state.tokens > 0 {
63                state.tokens -= 1;
64                return;
65            }
66            let elapsed_ms = elapsed.as_millis().min(u128::from(u64::MAX)) as u64;
67            let wait = if elapsed_ms < 1000 {
68                Duration::from_millis(1000 - elapsed_ms)
69            } else {
70                Duration::from_millis(50)
71            };
72            drop(state);
73            tokio::time::sleep(wait.max(Duration::from_millis(10))).await;
74        }
75    }
76}
77
78#[derive(Debug)]
79pub struct IndodaxClient {
80    http: Client,
81    signer: Option<Signer>,
82    rate_limiter: RateLimiter,
83    ws_token: Option<String>,
84}
85
86#[derive(Debug, serde::Deserialize)]
87pub struct IndodaxV1Response<T> {
88    pub success: i32,
89    #[serde(rename = "return")]
90    pub return_data: Option<T>,
91    pub error: Option<String>,
92    pub error_code: Option<String>,
93}
94
95#[derive(Debug, serde::Deserialize)]
96pub struct IndodaxV2Response<T> {
97    pub data: Option<T>,
98    pub code: Option<i64>,
99    pub error: Option<String>,
100}
101
102impl IndodaxClient {
103    pub fn new(signer: Option<Signer>) -> Result<Self, IndodaxError> {
104        let http = Client::builder()
105            .user_agent(format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")))
106            .timeout(Duration::from_secs(30))
107            .pool_max_idle_per_host(2)
108            .build()
109            .map_err(|e| IndodaxError::Other(format!("Failed to create HTTP client: {}", e)))?;
110
111        Ok(Self {
112            http,
113            signer,
114            rate_limiter: RateLimiter::from_env(),
115            ws_token: None,
116        })
117    }
118
119    pub fn with_ws_token(mut self, token: Option<String>) -> Self {
120        self.ws_token = token;
121        self
122    }
123
124    pub fn signer(&self) -> Option<&Signer> {
125        self.signer.as_ref()
126    }
127
128    pub fn ws_token(&self) -> Option<&str> {
129        self.ws_token.as_deref()
130    }
131
132    pub fn http_client(&self) -> &Client {
133        &self.http
134    }
135
136    pub async fn public_get<T: DeserializeOwned>(
137        &self,
138        path: &str,
139    ) -> Result<T, IndodaxError> {
140        let url = format!("{}{}", PUBLIC_BASE_URL, path);
141        let resp = self.retry_get(&url).await?;
142        self.handle_response(resp).await
143    }
144
145    pub async fn countdown_cancel_all(
146        &self,
147        pair: Option<&str>,
148        countdown_time: u64,
149    ) -> Result<serde_json::Value, IndodaxError> {
150        let signer = self.signer.as_ref().ok_or_else(|| {
151            IndodaxError::Config("API credentials required for countdown cancel all".into())
152        })?;
153
154        let mut body_parts: Vec<String> = vec![
155            format!("countdownTime={}", countdown_time),
156        ];
157        if let Some(p) = pair {
158            body_parts.push(format!("pair={}", p));
159        }
160
161        let body = body_parts.join("&");
162        let (payload, signature) = signer.sign_v1(&body)?;
163
164        let url = format!("{}/countdownCancelAll", PRIVATE_V1_URL);
165        let req = self
166            .http
167            .post(&url)
168            .header("Key", signer.api_key())
169            .header("Sign", &signature)
170            .header("Content-Type", "application/x-www-form-urlencoded")
171            .body(payload);
172        let resp = self.send_with_retry(req).await?;
173        self.handle_v1_response(resp).await
174    }
175
176    pub async fn generate_ws_token(&self) -> Result<(String, String), IndodaxError> {
177        let signer = self.signer.as_ref().ok_or_else(|| {
178            IndodaxError::Config("API credentials required for WebSocket token generation".into())
179        })?;
180
181        let nonce = signer.next_nonce_str();
182        let (_, signature) = signer.sign_v1(&nonce)?;
183
184        let req = self
185            .http
186            .post(WS_TOKEN_URL)
187            .header("Key", signer.api_key())
188            .header("Sign", &signature)
189            .header("Content-Type", "application/x-www-form-urlencoded")
190            .body(format!("nonce={}", nonce));
191        let resp = self.send_with_retry(req).await?;
192
193        let body_text = resp.text().await?;
194        let val: serde_json::Value = serde_json::from_str(&body_text)?;
195
196        let token = val.get("token")
197            .and_then(|t| t.as_str())
198            .or_else(|| val.get("data").and_then(|d| d.get("token")).and_then(|t| t.as_str()))
199            .or_else(|| val.get("return").and_then(|r| r.get("connToken")).and_then(|t| t.as_str()))
200            .map(|t| t.to_string());
201
202        let channel = val.get("channel")
203            .and_then(|c| c.as_str())
204            .or_else(|| val.get("data").and_then(|d| d.get("channel")).and_then(|c| c.as_str()))
205            .or_else(|| val.get("return").and_then(|r| r.get("channel")).and_then(|c| c.as_str()))
206            .map(|c| c.to_string());
207
208        match (token, channel) {
209            (Some(t), Some(c)) => Ok((t, c)),
210            (Some(t), None) => Ok((t, "private:orders".to_string())), // Fallback channel
211            _ => Err(IndodaxError::WsToken(format!("No token or channel in response: {}", body_text))),
212        }
213    }
214
215    pub async fn public_get_v2<T: DeserializeOwned>(
216        &self,
217        path: &str,
218        params: &[(&str, &str)],
219    ) -> Result<T, IndodaxError> {
220        let url = format!("{}{}", PUBLIC_BASE_URL, path);
221        let resp = self.retry_get_with_params(&url, params).await?;
222        self.handle_response(resp).await
223    }
224
225    async fn handle_v1_response<T: DeserializeOwned>(
226        &self,
227        resp: Response,
228    ) -> Result<T, IndodaxError> {
229        let body_text = resp.text().await?;
230        let envelope: IndodaxV1Response<T> = serde_json::from_str(&body_text).map_err(|e| {
231            IndodaxError::Parse(format!(
232                "Failed to parse response: {} (body: {})",
233                e, body_text
234            ))
235        })?;
236
237        if envelope.success == 1 {
238            envelope.return_data.ok_or_else(|| {
239                IndodaxError::Parse("API returned success but no 'return' data".into())
240            })
241        } else {
242            Err(IndodaxError::api(
243                envelope.error.unwrap_or_else(|| "Unknown error".into()),
244                match envelope.error_code.as_deref() {
245                    Some("invalid_credentials") => ErrorCategory::Authentication,
246                    Some("rate_limit") => ErrorCategory::RateLimit,
247                    Some(c) if c.contains("invalid") => ErrorCategory::Validation,
248                    _ => ErrorCategory::Unknown,
249                },
250                envelope.error_code,
251            ))
252        }
253    }
254
255    pub async fn private_post_v1<T: DeserializeOwned>(
256        &self,
257        method: &str,
258        params: &HashMap<String, String>,
259    ) -> Result<T, IndodaxError> {
260        let signer = self.signer.as_ref().ok_or_else(|| {
261            IndodaxError::Config("API credentials required for private endpoints".into())
262        })?;
263
264        let mut full_params: BTreeMap<String, String> = params
265            .iter()
266            .map(|(k, v)| (k.clone(), v.clone()))
267            .collect();
268        
269        full_params.insert("method".into(), method.to_string());
270        full_params.insert("nonce".into(), signer.next_nonce_str());
271
272        let body = serde_urlencoded_str(&full_params);
273        let (_, signature) = signer.sign_v1(&body)?;
274
275        let resp = self
276            .retry_post(PRIVATE_V1_URL, &body, signer.api_key(), &signature)
277            .await?;
278
279        self.handle_v1_response(resp).await
280    }
281
282    pub async fn private_get_v2<T: DeserializeOwned>(
283        &self,
284        path: &str,
285        params: &HashMap<String, String>,
286    ) -> Result<T, IndodaxError> {
287        let signer = self.signer.as_ref().ok_or_else(|| {
288            IndodaxError::Config("API credentials required for private endpoints".into())
289        })?;
290
291        let mut qs_parts: Vec<String> = params
292            .iter()
293            .map(|(k, v)| format!("{}={}", k, v))
294            .collect();
295        let timestamp = crate::commands::helpers::now_millis();
296        qs_parts.push(format!("timestamp={}", timestamp));
297        qs_parts.push("recvWindow=5000".to_string());
298        qs_parts.sort();
299        let query_string = qs_parts.join("&");
300
301        let signature = signer.sign_v2(&query_string)?;
302        let url = format!("{}{}?{}", PRIVATE_V2_BASE, path, query_string);
303
304        let req = self
305            .http
306            .get(&url)
307            .header("X-APIKEY", signer.api_key())
308            .header("Sign", &signature)
309            .header("Accept", "application/json")
310            .header("Content-Type", "application/json");
311        let resp = self.send_with_retry(req).await?;
312
313        let body_text = resp.text().await?;
314        let envelope: IndodaxV2Response<T> = serde_json::from_str(&body_text).map_err(|e| {
315            IndodaxError::Parse(format!(
316                "Failed to parse v2 response: {} (body: {})",
317                e, body_text
318            ))
319        })?;
320
321        if let Some(data) = envelope.data {
322            Ok(data)
323        } else if let Some(error) = envelope.error {
324            Err(IndodaxError::api(error, ErrorCategory::Unknown, None))
325        } else {
326            Ok(serde_json::from_str(&body_text)?)
327        }
328    }
329
330    async fn retry_get(&self, url: &str) -> Result<Response, IndodaxError> {
331        let req = self.http.get(url);
332        self.send_with_retry(req).await
333    }
334
335    async fn retry_get_with_params(
336        &self,
337        url: &str,
338        params: &[(&str, &str)],
339    ) -> Result<Response, IndodaxError> {
340        let req = self.http.get(url).query(params);
341        self.send_with_retry(req).await
342    }
343
344    async fn retry_post(
345        &self,
346        url: &str,
347        body: &str,
348        api_key: &str,
349        signature: &str,
350    ) -> Result<Response, IndodaxError> {
351        let req = self
352            .http
353            .post(url)
354            .header("Key", api_key)
355            .header("Sign", signature)
356            .header("Content-Type", "application/x-www-form-urlencoded")
357            .body(body.to_string());
358        self.send_with_retry(req).await
359    }
360
361    async fn send_with_retry(
362        &self,
363        builder: RequestBuilder,
364    ) -> Result<Response, IndodaxError> {
365        self.rate_limiter.acquire().await;
366        let mut last_err = None;
367        let mut total_retries = 0u32;
368        let mut backoff_count = 0u32;
369
370        while total_retries <= MAX_RETRIES {
371            if backoff_count > 0 {
372                tokio::time::sleep(Duration::from_millis(500 * 2u64.pow(backoff_count - 1))).await;
373            }
374
375            let req = builder
376                .try_clone()
377                .ok_or_else(|| IndodaxError::Other("Failed to clone request".into()))?;
378
379            match req.send().await {
380                Ok(resp) => {
381                    let status = resp.status();
382                    if status.is_success() {
383                        return Ok(resp);
384                    }
385
386                    if status == StatusCode::TOO_MANY_REQUESTS {
387                        let retry_after = resp.headers()
388                            .get("Retry-After")
389                            .and_then(|v| v.to_str().ok())
390                            .and_then(|s| s.parse::<u64>().ok())
391                            .map(Duration::from_secs);
392                        if let Some(delay) = retry_after {
393                            tokio::time::sleep(delay).await;
394                            backoff_count = 0;
395                        } else {
396                            backoff_count += 1;
397                        }
398                        total_retries += 1;
399                        last_err = Some(IndodaxError::api(
400                            format!("Rate limited (HTTP {})", status.as_u16()),
401                            ErrorCategory::RateLimit,
402                            None,
403                        ));
404                        continue;
405                    }
406
407                    if status.is_server_error() {
408                        total_retries += 1;
409                        backoff_count += 1;
410                        last_err = Some(IndodaxError::api(
411                            format!("Server error (HTTP {})", status.as_u16()),
412                            ErrorCategory::Server,
413                            None,
414                        ));
415                        continue;
416                    }
417
418                    last_err = Some(IndodaxError::api(
419                        format!("HTTP {}", status.as_u16()),
420                        ErrorCategory::Unknown,
421                        None,
422                    ));
423                    break;
424                }
425                Err(e) => {
426                    total_retries += 1;
427                    backoff_count += 1;
428                    if e.is_timeout() || e.is_connect() {
429                        last_err = Some(IndodaxError::Http(e));
430                        continue;
431                    }
432                    return Err(IndodaxError::Http(e));
433                }
434            }
435        }
436
437        Err(last_err.unwrap_or_else(|| {
438            IndodaxError::Other("Max retries exceeded".into())
439        }))
440    }
441
442    async fn handle_response<T: DeserializeOwned>(
443        &self,
444        resp: Response,
445    ) -> Result<T, IndodaxError> {
446        let body_text = resp.text().await?;
447        serde_json::from_str(&body_text).map_err(|e| {
448            IndodaxError::Parse(format!(
449                "Failed to parse response: {} (body: {})",
450                e, body_text
451            ))
452        })
453    }
454}
455
456fn serde_urlencoded_str(params: &BTreeMap<String, String>) -> String {
457    params
458        .iter()
459        .map(|(k, v)| {
460            format!(
461                "{}={}",
462                url::form_urlencoded::byte_serialize(k.as_bytes()).collect::<String>(),
463                url::form_urlencoded::byte_serialize(v.as_bytes()).collect::<String>()
464            )
465        })
466        .collect::<Vec<_>>()
467        .join("&")
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use crate::auth::Signer;
474
475    #[test]
476    fn test_indodax_client_new_with_signer() {
477        let signer = Signer::new("key", "secret");
478        let client = IndodaxClient::new(Some(signer)).unwrap();
479        assert!(client.signer().is_some());
480    }
481
482    #[test]
483    fn test_indodax_client_new_without_signer() {
484        let client = IndodaxClient::new(None).unwrap();
485        assert!(client.signer().is_none());
486    }
487
488    #[test]
489    fn test_indodax_client_signer() {
490        let signer = Signer::new("mykey", "mysecret");
491        let client = IndodaxClient::new(Some(signer)).unwrap();
492        let s = client.signer().unwrap();
493        assert_eq!(s.api_key(), "mykey");
494    }
495
496    #[test]
497    fn test_indodax_v1_response_success() {
498        let json = serde_json::json!({
499            "success": 1,
500            "return": {"balance": {"btc": "1.0"}},
501            "error": null,
502            "error_code": null
503        });
504        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
505        assert_eq!(resp.success, 1);
506        assert!(resp.return_data.is_some());
507        assert!(resp.error.is_none());
508    }
509
510    #[test]
511    fn test_indodax_v1_response_failure() {
512        let json = serde_json::json!({
513            "success": 0,
514            "return": null,
515            "error": "Invalid credentials",
516            "error_code": "invalid_credentials"
517        });
518        let resp: IndodaxV1Response<serde_json::Value> = serde_json::from_value(json).unwrap();
519        assert_eq!(resp.success, 0);
520        assert!(resp.return_data.is_none());
521        assert!(resp.error.is_some());
522        assert!(resp.error_code.is_some());
523    }
524
525    #[test]
526    fn test_indodax_v2_response_success() {
527        let json = serde_json::json!({
528            "data": {"name": "test"},
529            "code": null,
530            "error": null
531        });
532        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
533        assert!(resp.data.is_some());
534        assert!(resp.error.is_none());
535    }
536
537    #[test]
538    fn test_indodax_v2_response_error() {
539        let json = serde_json::json!({
540            "data": null,
541            "code": 400,
542            "error": "Bad request"
543        });
544        let resp: IndodaxV2Response<serde_json::Value> = serde_json::from_value(json).unwrap();
545        assert!(resp.data.is_none());
546        assert!(resp.error.is_some());
547        assert!(resp.code.is_some());
548    }
549
550    #[test]
551    fn test_serde_urlencoded_str_single() {
552        let mut params = std::collections::BTreeMap::new();
553        params.insert("method".into(), "getInfo".into());
554        params.insert("nonce".into(), "12345".into());
555        
556        let result = serde_urlencoded_str(&params);
557        assert!(result.contains("method=getInfo"));
558        assert!(result.contains("nonce=12345"));
559    }
560
561    #[test]
562    fn test_serde_urlencoded_str_empty() {
563        let params = std::collections::BTreeMap::new();
564        let result = serde_urlencoded_str(&params);
565        assert_eq!(result, "");
566    }
567
568    #[test]
569    fn test_serde_urlencoded_str_special_chars() {
570        let mut params = std::collections::BTreeMap::new();
571        params.insert("key with space".into(), "value&more".into());
572        
573        let result = serde_urlencoded_str(&params);
574        // Should be URL encoded
575        assert!(result.contains("%20") || result.contains("+"));
576    }
577
578    #[test]
579    fn test_public_base_url() {
580        assert!(PUBLIC_BASE_URL.contains("indodax.com"));
581    }
582
583    #[test]
584    fn test_private_v1_url() {
585        assert!(PRIVATE_V1_URL.contains("indodax.com/tapi"));
586    }
587
588    #[test]
589    fn test_private_v2_base() {
590        assert!(PRIVATE_V2_BASE.contains("tapi.btcapi.net"));
591    }
592
593    #[test]
594    fn test_max_retries_constant() {
595        assert_eq!(MAX_RETRIES, 3);
596    }
597
598    #[test]
599    fn test_indodax_v1_response_debug() {
600        let resp: IndodaxV1Response<serde_json::Value> = IndodaxV1Response {
601            success: 1,
602            return_data: Some(serde_json::json!({})),
603            error: None,
604            error_code: None,
605        };
606        let debug_str = format!("{:?}", resp);
607        assert!(debug_str.contains("success"));
608    }
609
610    #[test]
611    fn test_indodax_v2_response_debug() {
612        let resp: IndodaxV2Response<serde_json::Value> = IndodaxV2Response {
613            data: Some(serde_json::json!({})),
614            code: None,
615            error: None,
616        };
617        let debug_str = format!("{:?}", resp);
618        assert!(debug_str.contains("data"));
619    }
620
621    #[test]
622    fn test_rate_limiter_from_env_default() {
623        // Without env var, should default to 10
624        let rl = RateLimiter::from_env();
625        // If INDODAX_RATE_LIMIT is set in environment, test may fail
626        // so we just verify it doesn't panic
627        assert!(rl.capacity > 0);
628        assert!(rl.refill_per_sec > 0);
629    }
630
631    #[tokio::test]
632    async fn test_rate_limiter_acquire_single() {
633        let rl = RateLimiter::new(5, 5);
634        rl.acquire().await;
635        let state = rl.state.lock().await;
636        assert_eq!(state.tokens, 4);
637    }
638
639    #[tokio::test]
640    async fn test_rate_limiter_token_exhaustion_refills() {
641        let rl = RateLimiter::new(3, 10);
642        for _ in 0..3 {
643            rl.acquire().await;
644        }
645        {
646            let state = rl.state.lock().await;
647            assert_eq!(state.tokens, 0);
648        }
649
650        {
651            let mut state = rl.state.lock().await;
652            state.last_refill = Instant::now() - Duration::from_secs(1);
653        }
654
655        rl.acquire().await;
656        let state = rl.state.lock().await;
657        assert_eq!(state.tokens, 2);
658    }
659
660    #[tokio::test]
661    async fn test_rate_limiter_refill_capped_at_capacity() {
662        let rl = RateLimiter::new(5, 100);
663        for _ in 0..5 {
664            rl.acquire().await;
665        }
666        {
667            let state = rl.state.lock().await;
668            assert_eq!(state.tokens, 0);
669        }
670
671        {
672            let mut state = rl.state.lock().await;
673            state.last_refill = Instant::now() - Duration::from_secs(10);
674        }
675
676        rl.acquire().await;
677        let state = rl.state.lock().await;
678        assert_eq!(state.tokens, 4);
679    }
680
681    #[test]
682    fn test_rate_limiter_new_custom() {
683        let rl = RateLimiter::new(25, 25);
684        assert_eq!(rl.capacity, 25);
685        assert_eq!(rl.refill_per_sec, 25);
686    }
687}