1use serde_json::Value as JsonValue;
49use anyhow::{anyhow, Context, Result};
50use std::collections::HashMap;
51use reqwest::header::{HeaderMap, AUTHORIZATION, USER_AGENT};
52use std::sync::{Arc, atomic::AtomicU64};
53use std::time::Duration;
54use serde::de::DeserializeOwned;
55
56use crate::models::common::{KiteError, KiteResult};
58
59use std::sync::Mutex;
61use std::time::{SystemTime, Duration as StdDuration};
62
63#[cfg(all(feature = "wasm", target_arch = "wasm32"))]
65use web_sys::console;
66
67pub mod utils;
69pub mod auth;
70pub mod portfolio;
71pub mod orders;
72pub mod market_data;
73pub mod mutual_funds;
74pub mod gtt;
75pub mod endpoints;
76pub mod rate_limiter;
77
78pub use utils::{RequestHandler, URL};
80pub use endpoints::{KiteEndpoint, HttpMethod, RateLimitCategory, Endpoint};
81pub use rate_limiter::{RateLimiter, RateLimiterStats, CategoryStats};
82
83#[derive(Debug, Clone)]
85pub struct RetryConfig {
86 pub max_retries: u32,
87 pub base_delay: Duration,
88 pub max_delay: Duration,
89 pub exponential_backoff: bool,
90}
91
92impl Default for RetryConfig {
93 fn default() -> Self {
94 Self {
95 max_retries: 3,
96 base_delay: Duration::from_millis(200),
97 max_delay: Duration::from_secs(5),
98 exponential_backoff: true,
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct CacheConfig {
106 pub enable_instruments_cache: bool,
107 pub cache_ttl_minutes: u64,
108 pub max_cache_size: usize,
109}
110
111impl Default for CacheConfig {
112 fn default() -> Self {
113 Self {
114 enable_instruments_cache: true,
115 cache_ttl_minutes: 60, max_cache_size: 1000,
117 }
118 }
119}
120
121#[derive(Debug)]
123struct ResponseCache {
124 instruments_cache: Option<(JsonValue, SystemTime)>,
125 ttl_minutes: u64,
126}
127
128impl ResponseCache {
129 fn new(ttl_minutes: u64) -> Self {
130 Self {
131 instruments_cache: None,
132 ttl_minutes,
133 }
134 }
135
136 fn get_instruments(&self) -> Option<JsonValue> {
137 if let Some((data, timestamp)) = &self.instruments_cache {
138 let elapsed = timestamp.elapsed().ok()?;
139 if elapsed < StdDuration::from_secs(self.ttl_minutes * 60) {
140 return Some(data.clone());
141 }
142 }
143 None
144 }
145
146 fn set_instruments(&mut self, data: JsonValue) {
147 self.instruments_cache = Some((data, SystemTime::now()));
148 }
149}
150
151#[derive(Debug, Clone)]
153pub struct KiteConnectConfig {
154 pub base_url: String,
155 pub timeout: u64,
156 pub retry_config: RetryConfig,
157 pub cache_config: Option<CacheConfig>,
158 pub max_idle_connections: usize,
159 pub idle_timeout: u64,
160 pub enable_rate_limiting: bool,
161}
162
163impl Default for KiteConnectConfig {
164 fn default() -> Self {
165 Self {
166 base_url: "https://api.kite.trade".to_string(),
167 timeout: 30,
168 retry_config: RetryConfig::default(),
169 cache_config: Some(CacheConfig::default()),
170 max_idle_connections: 10,
171 idle_timeout: 30,
172 enable_rate_limiting: true,
173 }
174 }
175}
176
177#[derive(Clone, Debug)]
230pub struct KiteConnect {
231 pub(crate) api_key: String,
233 pub(crate) access_token: String,
235 pub(crate) root: String,
237 pub(crate) timeout: u64,
239 pub(crate) session_expiry_hook: Option<fn() -> ()>,
241 pub(crate) client: reqwest::Client,
243
244 pub(crate) retry_config: RetryConfig,
247 pub(crate) cache_config: Option<CacheConfig>,
249 pub(crate) request_counter: Arc<AtomicU64>,
251 pub(crate) response_cache: Arc<Mutex<Option<ResponseCache>>>,
253 pub(crate) rate_limiter: rate_limiter::RateLimiter,
255}
256
257impl Default for KiteConnect {
258 fn default() -> Self {
259 KiteConnect {
260 api_key: "<API-KEY>".to_string(),
261 access_token: "<ACCESS-TOKEN>".to_string(),
262 root: URL.to_string(),
263 timeout: 30,
264 session_expiry_hook: None,
265 client: reqwest::Client::new(),
266 retry_config: RetryConfig::default(),
267 cache_config: Some(CacheConfig::default()),
268 request_counter: Arc::new(AtomicU64::new(0)),
269 response_cache: Arc::new(Mutex::new(None)),
270 rate_limiter: rate_limiter::RateLimiter::new(true),
271 }
272 }
273}
274
275impl KiteConnect {
276 pub(crate) fn build_url(&self, path: &str, param: Option<Vec<(&str, &str)>>) -> reqwest::Url {
278 let url: &str = &format!("{}/{}", self.root, &path[1..]);
279 let mut url = reqwest::Url::parse(url).unwrap();
280
281 if let Some(data) = param {
282 url.query_pairs_mut().extend_pairs(data.iter());
283 }
284 url
285 }
286
287 pub fn new(api_key: &str, access_token: &str) -> Self {
306 Self {
307 api_key: api_key.to_string(),
308 access_token: access_token.to_string(),
309 root: URL.to_string(),
310 timeout: 30,
311 session_expiry_hook: None,
312 client: reqwest::Client::new(),
313 retry_config: RetryConfig::default(),
314 cache_config: Some(CacheConfig::default()),
315 request_counter: Arc::new(AtomicU64::new(0)),
316 response_cache: Arc::new(Mutex::new(None)),
317 rate_limiter: rate_limiter::RateLimiter::new(true),
318 }
319 }
320
321 pub fn new_with_config(api_key: &str, config: KiteConnectConfig) -> Self {
347 let client = reqwest::Client::builder()
348 .timeout(Duration::from_secs(config.timeout))
349 .pool_max_idle_per_host(config.max_idle_connections)
350 .pool_idle_timeout(Duration::from_secs(config.idle_timeout))
351 .user_agent(&format!("kiteconnect-rust/{}", env!("CARGO_PKG_VERSION")))
352 .build()
353 .expect("Failed to create HTTP client");
354
355 Self {
356 api_key: api_key.to_string(),
357 access_token: String::new(),
358 root: config.base_url,
359 timeout: config.timeout,
360 session_expiry_hook: None,
361 client,
362 retry_config: config.retry_config,
363 cache_config: config.cache_config.clone(),
364 request_counter: Arc::new(AtomicU64::new(0)),
365 response_cache: Arc::new(Mutex::new(
366 config.cache_config.as_ref()
367 .map(|c| ResponseCache::new(c.cache_ttl_minutes))
368 )),
369 rate_limiter: rate_limiter::RateLimiter::new(config.enable_rate_limiting),
370 }
371 }
372
373 pub(crate) async fn raise_or_return_json(&self, resp: reqwest::Response) -> Result<JsonValue> {
375 if resp.status().is_success() {
376 let jsn: JsonValue = resp.json().await.with_context(|| "Serialization failed")?;
377 Ok(jsn)
378 } else {
379 let status_code = resp.status().as_u16();
380 let status = status_code.to_string();
381 let error_text = resp.text().await?;
382
383 if let Ok(error_json) = serde_json::from_str::<JsonValue>(&error_text) {
385 let message = error_json["message"].as_str()
386 .unwrap_or(&error_text)
387 .to_string();
388 let error_type = error_json["error_type"].as_str()
389 .map(|s| s.to_string());
390
391 let kite_error = KiteError::from_api_response(status_code, status, message, error_type);
392 Err(anyhow::Error::new(kite_error))
393 } else {
394 let kite_error = KiteError::from_api_response(status_code, status, error_text, None);
395 Err(anyhow::Error::new(kite_error))
396 }
397 }
398 }
399
400 pub(crate) async fn send_request_with_retry(
402 &self,
403 url: reqwest::Url,
404 method: &str,
405 data: Option<HashMap<&str, &str>>,
406 ) -> KiteResult<reqwest::Response> {
407 let mut last_error = None;
408
409 for attempt in 0..=self.retry_config.max_retries {
410 self.request_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
412
413 match self.send_request(url.clone(), method, data.clone()).await {
414 Ok(response) => {
415 if response.status().is_server_error() || response.status() == 429 {
417 let status = response.status().as_u16().to_string();
418 let error_text = response.text().await
419 .unwrap_or_else(|_| "Unknown server error".to_string());
420
421 let error = KiteError::Api {
422 status,
423 message: error_text,
424 error_type: Some("ServerError".to_string()),
425 };
426
427 if attempt < self.retry_config.max_retries && self.should_retry(&error) {
428 last_error = Some(error);
429 let delay = self.calculate_retry_delay(attempt);
430
431 #[cfg(feature = "debug")]
432 log::debug!("Request failed, retrying in {:?}. Attempt {}/{}",
433 delay, attempt + 1, self.retry_config.max_retries);
434
435 tokio::time::sleep(delay).await;
436 continue;
437 } else {
438 return Err(error);
439 }
440 }
441
442 return Ok(response);
443 }
444 Err(e) => {
445 let kite_error = KiteError::Legacy(e);
446
447 if attempt < self.retry_config.max_retries && self.should_retry(&kite_error) {
448 last_error = Some(kite_error);
449 let delay = self.calculate_retry_delay(attempt);
450
451 #[cfg(feature = "debug")]
452 log::debug!("Request failed, retrying in {:?}. Attempt {}/{}",
453 delay, attempt + 1, self.retry_config.max_retries);
454
455 tokio::time::sleep(delay).await;
456 continue;
457 } else {
458 return Err(kite_error);
459 }
460 }
461 }
462 }
463
464 Err(last_error.unwrap_or_else(|| KiteError::General("All retry attempts failed".to_string())))
466 }
467
468 pub(crate) async fn raise_or_return_json_typed(&self, resp: reqwest::Response) -> KiteResult<JsonValue> {
470 if resp.status().is_success() {
471 resp.json().await.map_err(|e| KiteError::Http(e))
472 } else {
473 let status_code = resp.status().as_u16();
474 let status = status_code.to_string();
475 let error_text = resp.text().await
476 .map_err(KiteError::Http)?;
477
478 if let Ok(error_json) = serde_json::from_str::<JsonValue>(&error_text) {
480 let message = error_json["message"].as_str()
481 .unwrap_or(&error_text)
482 .to_string();
483 let error_type = error_json["error_type"].as_str()
484 .map(|s| s.to_string());
485
486 Err(KiteError::from_api_response(status_code, status, message, error_type))
487 } else {
488 Err(KiteError::from_api_response(status_code, status, error_text, None))
489 }
490 }
491 }
492
493 pub fn set_session_expiry_hook(&mut self, method: fn() -> ()) {
515 self.session_expiry_hook = Some(method);
516 }
517
518 pub fn session_expiry_hook(&self) -> Option<fn() -> ()> {
526 self.session_expiry_hook
527 }
528
529 pub fn set_access_token(&mut self, access_token: &str) {
547 self.access_token = access_token.to_string();
548 }
549
550 pub fn access_token(&self) -> &str {
552 &self.access_token
553 }
554
555 fn parse_response<T: DeserializeOwned>(&self, response: JsonValue) -> KiteResult<T> {
560 serde_json::from_value(response)
561 .map_err(|e| KiteError::Json(e))
562 }
563
564 fn should_retry(&self, error: &KiteError) -> bool {
566 error.is_retryable()
567 }
568
569 fn calculate_retry_delay(&self, attempt: u32) -> Duration {
571 if self.retry_config.exponential_backoff {
572 let delay = self.retry_config.base_delay * 2_u32.pow(attempt);
573 std::cmp::min(delay, self.retry_config.max_delay)
574 } else {
575 self.retry_config.base_delay
576 }
577 }
578
579 pub fn request_count(&self) -> u64 {
581 self.request_counter.load(std::sync::atomic::Ordering::Relaxed)
582 }
583
584 pub async fn rate_limiter_stats(&self) -> rate_limiter::RateLimiterStats {
586 self.rate_limiter.get_stats().await
587 }
588
589 pub fn set_rate_limiting_enabled(&mut self, enabled: bool) {
591 self.rate_limiter.set_enabled(enabled);
592 }
593
594 pub fn is_rate_limiting_enabled(&self) -> bool {
596 self.rate_limiter.is_enabled()
597 }
598
599 pub async fn can_request_immediately(&self, endpoint: &KiteEndpoint) -> bool {
601 self.rate_limiter.can_request_immediately(endpoint).await
602 }
603
604 pub async fn get_delay_for_request(&self, endpoint: &KiteEndpoint) -> std::time::Duration {
606 self.rate_limiter.get_delay_for_request(endpoint).await
607 }
608
609 pub async fn wait_for_request(&self, endpoint: &KiteEndpoint) {
611 self.rate_limiter.wait_for_request(endpoint).await
612 }
613
614 async fn send_request_with_rate_limiting_and_retry(
616 &self,
617 endpoint: KiteEndpoint,
618 path_segments: &[&str],
619 query_params: Option<Vec<(&str, &str)>>,
620 data: Option<HashMap<&str, &str>>,
621 ) -> KiteResult<reqwest::Response> {
622 self.rate_limiter.wait_for_request(&endpoint).await;
624
625 let config = endpoint.config();
627 let full_path = if path_segments.is_empty() {
628 config.path.to_string()
629 } else {
630 format!("{}/{}", config.path, path_segments.join("/"))
631 };
632
633 let url = self.build_url(&full_path, query_params);
634
635 self.send_request_with_retry(url, config.method.as_str(), data).await
637 }
638}
639
640impl RequestHandler for KiteConnect {
642 fn send_request(
643 &self,
644 url: reqwest::Url,
645 method: &str,
646 data: Option<HashMap<&str, &str>>,
647 ) -> impl std::future::Future<Output = Result<reqwest::Response>> + Send {
648 async move {
649 #[cfg(feature = "debug")]
650 log::debug!("Sending {} request to: {}", method, url);
651
652 #[cfg(all(feature = "debug", feature = "wasm", target_arch = "wasm32"))]
653 console::log_1(&format!("KiteConnect: {} {}", method, url).into());
654
655 let mut headers = HeaderMap::new();
656 headers.insert("XKiteVersion", "3".parse().unwrap());
657 headers.insert(
658 AUTHORIZATION,
659 format!("token {}:{}", self.api_key, self.access_token)
660 .parse()
661 .unwrap(),
662 );
663 headers.insert(USER_AGENT, "Rust".parse().unwrap());
664
665 let response = match method {
666 "GET" => self.client.get(url).headers(headers).send().await?,
667 "POST" => self.client.post(url).headers(headers).form(&data).send().await?,
668 "DELETE" => self.client.delete(url).headers(headers).json(&data).send().await?,
669 "PUT" => self.client.put(url).headers(headers).form(&data).send().await?,
670 _ => return Err(anyhow!("Unknown method!")),
671 };
672
673 #[cfg(feature = "debug")]
674 log::debug!("Response status: {}", response.status());
675
676 Ok(response)
677 }
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684
685 #[tokio::test]
686 async fn test_build_url() {
687 let kiteconnect = KiteConnect::new("key", "token");
688 let url = kiteconnect.build_url("/my-holdings", None);
689 assert_eq!(url.as_str(), format!("{}/my-holdings", URL).as_str());
690
691 let mut params: Vec<(&str, &str)> = Vec::new();
692 params.push(("one", "1"));
693 let url = kiteconnect.build_url("/my-holdings", Some(params));
694 assert_eq!(url.as_str(), format!("{}/my-holdings?one=1", URL).as_str());
695 }
696
697 #[tokio::test]
698 async fn test_set_access_token() {
699 let mut kiteconnect = KiteConnect::new("key", "token");
700 assert_eq!(kiteconnect.access_token(), "token");
701 kiteconnect.set_access_token("my_token");
702 assert_eq!(kiteconnect.access_token(), "my_token");
703 }
704
705 #[tokio::test]
706 async fn test_session_expiry_hook() {
707 let mut kiteconnect = KiteConnect::new("key", "token");
708 assert_eq!(kiteconnect.session_expiry_hook(), None);
709
710 fn mock_hook() {
711 println!("Session expired");
712 }
713
714 kiteconnect.set_session_expiry_hook(mock_hook);
715 assert_ne!(kiteconnect.session_expiry_hook(), None);
716 }
717
718 #[tokio::test]
719 async fn test_login_url() {
720 let kiteconnect = KiteConnect::new("key", "token");
721 assert_eq!(kiteconnect.login_url(), "https://kite.trade/connect/login?api_key=key&v3");
722 }
723
724 }