1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
use chrono::{DateTime, Utc}; use futures::{stream, StreamExt}; use itertools::Itertools; use std::error::Error; pub mod api; pub mod common; pub mod csv; pub mod request; pub mod streamer; pub mod symbol; pub use crate::{api::*, common::*, request::*}; const MAX_SYMBOL_SUMMARY_BATCH_SIZE: usize = 500; const PARALLEL_REQUESTS: usize = 10; pub async fn accounts() -> Result<Vec<accounts::Item>, Box<dyn Error>> { let response: api::Response<accounts::Response> = request("customers/me/accounts", "").await?.json().await?; Ok(response.data.items) } pub async fn public_watchlists() -> Result<Vec<watchlists::Item>, Box<dyn Error>> { let response: api::Response<watchlists::Response> = request("public-watchlists", "").await?.json().await?; Ok(response.data.items) } pub async fn positions( account: &accounts::Account, ) -> Result<Vec<positions::Item>, Box<dyn Error>> { let url = format!("accounts/{}/positions", account.account_number); let response: api::Response<positions::Response> = request(&url, "").await?.json().await?; Ok(response.data.items) } pub async fn transactions( account: &accounts::Account, start_date: DateTime<Utc>, end_date: DateTime<Utc>, prev_pagination: Option<Pagination>, ) -> Result<Option<(Vec<transactions::Item>, Option<Pagination>)>, Box<dyn Error>> { let page_offset = if let Some(api::Pagination { page_offset, total_pages, .. }) = prev_pagination { if page_offset + 1 >= total_pages { return Ok(None); } page_offset + 1 } else { 0 }; let url = format!("accounts/{}/transactions", account.account_number); let parameters = format!( "start-date={}&end-date={}&page-offset={}", start_date, end_date, page_offset ); let response: api::Response<transactions::Response> = request(&url, ¶meters).await?.json().await?; Ok(Some((response.data.items, response.pagination))) } pub async fn market_metrics( symbols: &[String], ) -> Result<Vec<market_metrics::Item>, Box<dyn Error>> { let mut results = stream::iter(symbols.chunks(MAX_SYMBOL_SUMMARY_BATCH_SIZE).map( |batch| async move { let symbols = batch.iter().cloned().join(","); let url_path = "market-metrics"; let params_string = &format!("symbols={}", symbols); let response = request(url_path, params_string).await?; let json_string = response.text().await?; let result: Result<api::Response<market_metrics::Response>, Box<dyn Error>> = serde_json::from_str(&json_string).map_err(|e| { log::error!( "Error deserializing {}?{}: {:?}", url_path, params_string, e, ); e.into() }); result }, )) .buffered(PARALLEL_REQUESTS) .collect::<Vec<_>>() .await; let mut json = vec![]; for result in results.drain(..) { json.append(&mut result?.data.items); } Ok(json) }