1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#![feature(async_closure)]
#[macro_use]
extern crate log;
extern crate futures;
extern crate hashbrown;
extern crate cxmr_api_clients;
extern crate cxmr_broker;
extern crate cxmr_currency;
extern crate cxmr_http_client;
use futures::future::join_all;
use hashbrown::HashMap;
use cxmr_api::Account;
use cxmr_api_clients::{fetch_accounts, fetch_markets};
use cxmr_broker::{Broker, BrokerAccount, BrokerExchanges, Error, ExchangeBroker, SharedBroker};
use cxmr_currency::CurrencyPair;
use cxmr_exchanges::{Exchange, MarketOrder};
use cxmr_http_client::HttpsClient;
pub async fn new_accounts(
client: HttpsClient,
accounts: &Vec<Account>,
) -> Result<Vec<BrokerAccount>, Error> {
let accounts = fetch_accounts(client.clone(), accounts).await?;
Ok(accounts
.into_iter()
.map(|(info, client)| BrokerAccount::new(info, client))
.collect::<Vec<BrokerAccount>>())
}
pub async fn new_broker_from_exchanges(
client: HttpsClient,
exchanges: BrokerExchanges,
accounts: Vec<Account>,
) -> Result<SharedBroker, Error> {
let accounts = new_accounts(client.clone(), &accounts).await?;
let mut broker = Broker::new(accounts, exchanges);
let accounts_with_pairs = broker
.accounts()
.into_iter()
.filter_map(|account| Some((account, broker.get(account.exchange())?.trading_pairs())))
.collect::<Vec<_>>();
let requests = accounts_with_pairs
.into_iter()
.map(async move |(account, pairs)| {
let orders = get_account_orders(account.clone(), pairs).await?;
let data_key = account.client().user_data_stream(None).await?;
let account = account.name().to_owned();
Ok((account, orders, data_key))
})
.collect::<Vec<_>>();
let responses = join_all(requests)
.await
.into_iter()
.collect::<Result<Vec<_>, Error>>()?;
for (account, orders, data_key) in responses {
broker.insert_orders(account.clone(), orders);
broker.set_user_data_key(account, data_key);
}
Ok(broker.into_shared())
}
pub async fn new_broker(
client: HttpsClient,
exchanges: &Vec<Exchange>,
accounts: Vec<Account>,
) -> Result<SharedBroker, Error> {
let exchanges = new_exchanges_brokers(client.clone(), exchanges).await?;
new_broker_from_exchanges(client, exchanges, accounts).await
}
pub async fn new_exchanges_brokers(
client: HttpsClient,
exchanges: &Vec<Exchange>,
) -> Result<BrokerExchanges, Error> {
trace!("Fetching markets");
let info = fetch_markets(client, exchanges).await?;
let mut exchanges = HashMap::new();
info.into_iter().for_each(|info| {
exchanges.insert(info.exchange.clone(), ExchangeBroker::new(info));
});
Ok(exchanges)
}
pub async fn get_account_orders(
account: BrokerAccount,
pairs: Vec<CurrencyPair>,
) -> Result<Vec<MarketOrder>, Error> {
let client = account.client();
let requests = pairs.into_iter().map(async move |pair| {
let orders = client.open_orders(pair).await?;
Ok(orders) as Result<_, Error>
});
Ok(join_all(requests)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect())
}