1use anyhow::{anyhow, Context, Result};
49use reqwest::header::{HeaderMap, AUTHORIZATION, USER_AGENT};
50use serde::de::DeserializeOwned;
51use serde_json::Value as JsonValue;
52use std::collections::HashMap;
53use std::sync::{atomic::AtomicU64, Arc};
54use std::time::Duration;
55
56use crate::models::common::{KiteError, KiteResult};
58
59use std::sync::Mutex;
61use std::time::{Duration as StdDuration, SystemTime};
62
63#[cfg(all(feature = "wasm", target_arch = "wasm32"))]
65use web_sys::console;
66
67pub mod auth;
69pub mod endpoints;
70pub mod gtt;
71pub mod market_data;
72pub mod mutual_funds;
73pub mod orders;
74pub mod portfolio;
75pub mod rate_limiter;
76pub mod utils;
77
78pub use endpoints::{Endpoint, HttpMethod, KiteEndpoint, RateLimitCategory};
80pub use rate_limiter::{CategoryStats, RateLimiter, RateLimiterStats};
81pub use utils::{RequestHandler, URL};
82
83#[derive(Debug, Clone)]
85pub struct RetryConfig {
86 pub max_retries: u32,
87 pub base_delay: Duration,
88 pub max_delay: Duration,
89 pub exponential_backoff: bool,
90}
91
92impl Default for RetryConfig {
93 fn default() -> Self {
94 Self {
95 max_retries: 3,
96 base_delay: Duration::from_millis(200),
97 max_delay: Duration::from_secs(5),
98 exponential_backoff: true,
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct CacheConfig {
106 pub enable_instruments_cache: bool,
107 pub cache_ttl_minutes: u64,
108 pub max_cache_size: usize,
109}
110
111impl Default for CacheConfig {
112 fn default() -> Self {
113 Self {
114 enable_instruments_cache: true,
115 cache_ttl_minutes: 60, max_cache_size: 1000,
117 }
118 }
119}
120
121#[derive(Debug)]
123pub(crate) struct ResponseCache {
124 instruments_cache: Option<(JsonValue, SystemTime)>,
125 ttl_minutes: u64,
126}
127
128impl ResponseCache {
129 fn new(ttl_minutes: u64) -> Self {
130 Self {
131 instruments_cache: None,
132 ttl_minutes,
133 }
134 }
135
136 fn get_instruments(&self) -> Option<JsonValue> {
137 if let Some((data, timestamp)) = &self.instruments_cache {
138 let elapsed = timestamp.elapsed().ok()?;
139 if elapsed < StdDuration::from_secs(self.ttl_minutes * 60) {
140 return Some(data.clone());
141 }
142 }
143 None
144 }
145
146 fn set_instruments(&mut self, data: JsonValue) {
147 self.instruments_cache = Some((data, SystemTime::now()));
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct KiteConnectConfig {
154 pub base_url: String,
155 pub timeout: u64,
156 pub retry_config: RetryConfig,
157 pub cache_config: Option<CacheConfig>,
158 pub max_idle_connections: usize,
159 pub idle_timeout: u64,
160 pub enable_rate_limiting: bool,
161}
162
163impl Default for KiteConnectConfig {
164 fn default() -> Self {
165 Self {
166 base_url: "https://api.kite.trade".to_string(),
167 timeout: 30,
168 retry_config: RetryConfig::default(),
169 cache_config: Some(CacheConfig::default()),
170 max_idle_connections: 10,
171 idle_timeout: 30,
172 enable_rate_limiting: true,
173 }
174 }
175}
176
177#[derive(Clone, Debug)]
230pub struct KiteConnect {
231 pub(crate) api_key: String,
233 pub(crate) access_token: String,
235 pub(crate) root: String,
237 #[allow(dead_code)]
239 pub(crate) timeout: u64,
240 pub(crate) session_expiry_hook: Option<fn() -> ()>,
242 pub(crate) client: reqwest::Client,
244
245 pub(crate) retry_config: RetryConfig,
248 pub(crate) cache_config: Option<CacheConfig>,
250 pub(crate) request_counter: Arc<AtomicU64>,
252 pub(crate) response_cache: Arc<Mutex<Option<ResponseCache>>>,
254 pub(crate) rate_limiter: rate_limiter::RateLimiter,
256}
257
258impl Default for KiteConnect {
259 fn default() -> Self {
260 KiteConnect {
261 api_key: "<API-KEY>".to_string(),
262 access_token: "<ACCESS-TOKEN>".to_string(),
263 root: URL.to_string(),
264 timeout: 30,
265 session_expiry_hook: None,
266 client: reqwest::Client::new(),
267 retry_config: RetryConfig::default(),
268 cache_config: Some(CacheConfig::default()),
269 request_counter: Arc::new(AtomicU64::new(0)),
270 response_cache: Arc::new(Mutex::new(None)),
271 rate_limiter: rate_limiter::RateLimiter::new(true),
272 }
273 }
274}
275
276impl KiteConnect {
277 pub(crate) fn build_url(&self, path: &str, param: Option<Vec<(&str, &str)>>) -> reqwest::Url {
279 let url: &str = &format!("{}/{}", self.root, &path[1..]);
280 let mut url = reqwest::Url::parse(url).unwrap();
281
282 if let Some(data) = param {
283 url.query_pairs_mut().extend_pairs(data.iter());
284 }
285 url
286 }
287
288 pub fn new(api_key: &str, access_token: &str) -> Self {
307 Self {
308 api_key: api_key.to_string(),
309 access_token: access_token.to_string(),
310 root: URL.to_string(),
311 timeout: 30,
312 session_expiry_hook: None,
313 client: reqwest::Client::new(),
314 retry_config: RetryConfig::default(),
315 cache_config: Some(CacheConfig::default()),
316 request_counter: Arc::new(AtomicU64::new(0)),
317 response_cache: Arc::new(Mutex::new(None)),
318 rate_limiter: rate_limiter::RateLimiter::new(true),
319 }
320 }
321
322 pub fn new_with_config(api_key: &str, config: KiteConnectConfig) -> Self {
348 let client = reqwest::Client::builder()
349 .timeout(Duration::from_secs(config.timeout))
350 .pool_max_idle_per_host(config.max_idle_connections)
351 .pool_idle_timeout(Duration::from_secs(config.idle_timeout))
352 .user_agent(format!("kiteconnect-rust/{}", env!("CARGO_PKG_VERSION")))
353 .build()
354 .expect("Failed to create HTTP client");
355
356 Self {
357 api_key: api_key.to_string(),
358 access_token: String::new(),
359 root: config.base_url,
360 timeout: config.timeout,
361 session_expiry_hook: None,
362 client,
363 retry_config: config.retry_config,
364 cache_config: config.cache_config.clone(),
365 request_counter: Arc::new(AtomicU64::new(0)),
366 response_cache: Arc::new(Mutex::new(
367 config
368 .cache_config
369 .as_ref()
370 .map(|c| ResponseCache::new(c.cache_ttl_minutes)),
371 )),
372 rate_limiter: rate_limiter::RateLimiter::new(config.enable_rate_limiting),
373 }
374 }
375
376 pub(crate) async fn raise_or_return_json(&self, resp: reqwest::Response) -> Result<JsonValue> {
378 if resp.status().is_success() {
379 let jsn: JsonValue = resp.json().await.with_context(|| "Serialization failed")?;
380 Ok(jsn)
381 } else {
382 let status_code = resp.status().as_u16();
383 let status = status_code.to_string();
384 let error_text = resp.text().await?;
385
386 if let Ok(error_json) = serde_json::from_str::<JsonValue>(&error_text) {
388 let message = error_json["message"]
389 .as_str()
390 .unwrap_or(&error_text)
391 .to_string();
392 let error_type = error_json["error_type"].as_str().map(|s| s.to_string());
393
394 let kite_error =
395 KiteError::from_api_response(status_code, status, message, error_type);
396 Err(anyhow::Error::new(kite_error))
397 } else {
398 let kite_error =
399 KiteError::from_api_response(status_code, status, error_text, None);
400 Err(anyhow::Error::new(kite_error))
401 }
402 }
403 }
404
405 pub(crate) async fn send_request_with_retry(
407 &self,
408 url: reqwest::Url,
409 method: &str,
410 data: Option<HashMap<&str, &str>>,
411 ) -> KiteResult<reqwest::Response> {
412 let mut last_error = None;
413
414 for attempt in 0..=self.retry_config.max_retries {
415 self.request_counter
417 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
418
419 match self.send_request(url.clone(), method, data.clone()).await {
420 Ok(response) => {
421 if response.status().is_server_error() || response.status() == 429 {
423 let status = response.status().as_u16().to_string();
424 let error_text = response
425 .text()
426 .await
427 .unwrap_or_else(|_| "Unknown server error".to_string());
428
429 let error = KiteError::Api {
430 status,
431 message: error_text,
432 error_type: Some("ServerError".to_string()),
433 };
434
435 if attempt < self.retry_config.max_retries && self.should_retry(&error) {
436 last_error = Some(error);
437 let delay = self.calculate_retry_delay(attempt);
438
439 #[cfg(feature = "debug")]
440 log::debug!(
441 "Request failed, retrying in {:?}. Attempt {}/{}",
442 delay,
443 attempt + 1,
444 self.retry_config.max_retries
445 );
446
447 tokio::time::sleep(delay).await;
448 continue;
449 } else {
450 return Err(error);
451 }
452 }
453
454 return Ok(response);
455 }
456 Err(e) => {
457 let kite_error = KiteError::Legacy(e);
458
459 if attempt < self.retry_config.max_retries && self.should_retry(&kite_error) {
460 last_error = Some(kite_error);
461 let delay = self.calculate_retry_delay(attempt);
462
463 #[cfg(feature = "debug")]
464 log::debug!(
465 "Request failed, retrying in {:?}. Attempt {}/{}",
466 delay,
467 attempt + 1,
468 self.retry_config.max_retries
469 );
470
471 tokio::time::sleep(delay).await;
472 continue;
473 } else {
474 return Err(kite_error);
475 }
476 }
477 }
478 }
479
480 Err(last_error
482 .unwrap_or_else(|| KiteError::General("All retry attempts failed".to_string())))
483 }
484
485 pub(crate) async fn raise_or_return_json_typed(
487 &self,
488 resp: reqwest::Response,
489 ) -> KiteResult<JsonValue> {
490 if resp.status().is_success() {
491 resp.json().await.map_err(KiteError::Http)
492 } else {
493 let status_code = resp.status().as_u16();
494 let status = status_code.to_string();
495 let error_text = resp.text().await.map_err(KiteError::Http)?;
496
497 if let Ok(error_json) = serde_json::from_str::<JsonValue>(&error_text) {
499 let message = error_json["message"]
500 .as_str()
501 .unwrap_or(&error_text)
502 .to_string();
503 let error_type = error_json["error_type"].as_str().map(|s| s.to_string());
504
505 Err(KiteError::from_api_response(
506 status_code,
507 status,
508 message,
509 error_type,
510 ))
511 } else {
512 Err(KiteError::from_api_response(
513 status_code,
514 status,
515 error_text,
516 None,
517 ))
518 }
519 }
520 }
521
522 pub fn set_session_expiry_hook(&mut self, method: fn() -> ()) {
544 self.session_expiry_hook = Some(method);
545 }
546
547 pub fn session_expiry_hook(&self) -> Option<fn() -> ()> {
555 self.session_expiry_hook
556 }
557
558 pub fn set_access_token(&mut self, access_token: &str) {
576 self.access_token = access_token.to_string();
577 }
578
579 pub fn access_token(&self) -> &str {
581 &self.access_token
582 }
583
584 fn parse_response<T: DeserializeOwned>(&self, response: JsonValue) -> KiteResult<T> {
589 serde_json::from_value(response).map_err(KiteError::Json)
590 }
591
592 fn should_retry(&self, error: &KiteError) -> bool {
594 error.is_retryable()
595 }
596
597 fn calculate_retry_delay(&self, attempt: u32) -> Duration {
599 if self.retry_config.exponential_backoff {
600 let delay = self.retry_config.base_delay * 2_u32.pow(attempt);
601 std::cmp::min(delay, self.retry_config.max_delay)
602 } else {
603 self.retry_config.base_delay
604 }
605 }
606
607 pub fn request_count(&self) -> u64 {
609 self.request_counter
610 .load(std::sync::atomic::Ordering::Relaxed)
611 }
612
613 pub async fn rate_limiter_stats(&self) -> rate_limiter::RateLimiterStats {
615 self.rate_limiter.get_stats().await
616 }
617
618 pub fn set_rate_limiting_enabled(&mut self, enabled: bool) {
620 self.rate_limiter.set_enabled(enabled);
621 }
622
623 pub fn is_rate_limiting_enabled(&self) -> bool {
625 self.rate_limiter.is_enabled()
626 }
627
628 pub async fn can_request_immediately(&self, endpoint: &KiteEndpoint) -> bool {
630 self.rate_limiter.can_request_immediately(endpoint).await
631 }
632
633 pub async fn get_delay_for_request(&self, endpoint: &KiteEndpoint) -> std::time::Duration {
635 self.rate_limiter.get_delay_for_request(endpoint).await
636 }
637
638 pub async fn wait_for_request(&self, endpoint: &KiteEndpoint) {
640 self.rate_limiter.wait_for_request(endpoint).await
641 }
642
643 async fn send_request_with_rate_limiting_and_retry(
645 &self,
646 endpoint: KiteEndpoint,
647 path_segments: &[&str],
648 query_params: Option<Vec<(&str, &str)>>,
649 data: Option<HashMap<&str, &str>>,
650 ) -> KiteResult<reqwest::Response> {
651 self.rate_limiter.wait_for_request(&endpoint).await;
653
654 let config = endpoint.config();
656 let full_path = if path_segments.is_empty() {
657 config.path.to_string()
658 } else {
659 format!("{}/{}", config.path, path_segments.join("/"))
660 };
661
662 let url = self.build_url(&full_path, query_params);
663
664 self.send_request_with_retry(url, config.method.as_str(), data)
666 .await
667 }
668
669 pub(crate) async fn send_json_with_rate_limiting_and_retry<T: serde::Serialize + ?Sized>(
671 &self,
672 endpoint: KiteEndpoint,
673 path_segments: &[&str],
674 query_params: Option<Vec<(&str, &str)>>,
675 json_body: &T,
676 ) -> KiteResult<reqwest::Response> {
677 self.rate_limiter.wait_for_request(&endpoint).await;
679
680 let config = endpoint.config();
682 let full_path = if path_segments.is_empty() {
683 config.path.to_string()
684 } else {
685 format!("{}/{}", config.path, path_segments.join("/"))
686 };
687
688 let url = self.build_url(&full_path, query_params);
689
690 use reqwest::header::{HeaderMap, AUTHORIZATION, USER_AGENT, CONTENT_TYPE};
692 let mut headers = HeaderMap::new();
693 headers.insert("XKiteVersion", "3".parse().unwrap());
694 headers.insert(
695 AUTHORIZATION,
696 format!("token {}:{}", self.api_key, self.access_token)
697 .parse()
698 .unwrap(),
699 );
700 headers.insert(USER_AGENT, "Rust".parse().unwrap());
701 headers.insert(CONTENT_TYPE, "application/json".parse().unwrap());
702
703 let mut last_error = None;
705 for attempt in 0..=self.retry_config.max_retries {
706 let resp_res = self
707 .client
708 .post(url.clone())
709 .headers(headers.clone())
710 .json(json_body)
711 .send()
712 .await;
713
714 match resp_res {
715 Ok(resp) => {
716 if resp.status().is_server_error() || resp.status() == 429 {
717 let status = resp.status().as_u16().to_string();
718 let error_text = resp.text().await.unwrap_or_default();
719 let error = KiteError::Api {
720 status,
721 message: error_text,
722 error_type: Some("ServerError".to_string()),
723 };
724 if attempt < self.retry_config.max_retries && self.should_retry(&error) {
725 last_error = Some(error);
726 let delay = self.calculate_retry_delay(attempt);
727 tokio::time::sleep(delay).await;
728 continue;
729 } else {
730 return Err(error);
731 }
732 }
733 return Ok(resp);
734 }
735 Err(e) => {
736 let ke = KiteError::Http(e);
737 if attempt < self.retry_config.max_retries && self.should_retry(&ke) {
738 last_error = Some(ke);
739 let delay = self.calculate_retry_delay(attempt);
740 tokio::time::sleep(delay).await;
741 continue;
742 } else {
743 return Err(ke);
744 }
745 }
746 }
747 }
748
749 Err(last_error.unwrap_or_else(|| KiteError::General("All retry attempts failed".into())))
750 }
751}
752
753impl RequestHandler for KiteConnect {
755 async fn send_request(
756 &self,
757 url: reqwest::Url,
758 method: &str,
759 data: Option<HashMap<&str, &str>>,
760 ) -> Result<reqwest::Response> {
761 #[cfg(feature = "debug")]
762 log::debug!("Sending {} request to: {}", method, url);
763
764 #[cfg(all(feature = "debug", feature = "wasm", target_arch = "wasm32"))]
765 console::log_1(&format!("KiteConnect: {} {}", method, url).into());
766
767 let mut headers = HeaderMap::new();
768 headers.insert("XKiteVersion", "3".parse().unwrap());
769 headers.insert(
770 AUTHORIZATION,
771 format!("token {}:{}", self.api_key, self.access_token)
772 .parse()
773 .unwrap(),
774 );
775 headers.insert(USER_AGENT, "Rust".parse().unwrap());
776
777 let response = match method {
778 "GET" => self.client.get(url).headers(headers).send().await?,
779 "POST" => {
780 self.client
781 .post(url)
782 .headers(headers)
783 .form(&data)
784 .send()
785 .await?
786 }
787 "DELETE" => {
788 self.client
789 .delete(url)
790 .headers(headers)
791 .json(&data)
792 .send()
793 .await?
794 }
795 "PUT" => {
796 self.client
797 .put(url)
798 .headers(headers)
799 .form(&data)
800 .send()
801 .await?
802 }
803 _ => return Err(anyhow!("Unknown method!")),
804 };
805
806 #[cfg(feature = "debug")]
807 log::debug!("Response status: {}", response.status());
808
809 Ok(response)
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use super::*;
816
817 #[tokio::test]
818 async fn test_build_url() {
819 let kiteconnect = KiteConnect::new("key", "token");
820 let url = kiteconnect.build_url("/my-holdings", None);
821 assert_eq!(url.as_str(), format!("{}/my-holdings", URL).as_str());
822
823 let params: Vec<(&str, &str)> = vec![("one", "1")];
824 let url = kiteconnect.build_url("/my-holdings", Some(params));
825 assert_eq!(url.as_str(), format!("{}/my-holdings?one=1", URL).as_str());
826 }
827
828 #[tokio::test]
829 async fn test_set_access_token() {
830 let mut kiteconnect = KiteConnect::new("key", "token");
831 assert_eq!(kiteconnect.access_token(), "token");
832 kiteconnect.set_access_token("my_token");
833 assert_eq!(kiteconnect.access_token(), "my_token");
834 }
835
836 #[tokio::test]
837 async fn test_session_expiry_hook() {
838 let mut kiteconnect = KiteConnect::new("key", "token");
839 assert_eq!(kiteconnect.session_expiry_hook(), None);
840
841 fn mock_hook() {
842 println!("Session expired");
843 }
844
845 kiteconnect.set_session_expiry_hook(mock_hook);
846 assert_ne!(kiteconnect.session_expiry_hook(), None);
847 }
848
849 #[tokio::test]
850 async fn test_login_url() {
851 let kiteconnect = KiteConnect::new("key", "token");
852 assert_eq!(
853 kiteconnect.login_url(),
854 "https://kite.trade/connect/login?api_key=key&v3"
855 );
856 }
857
858 }