1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//! Broker helpers.
#![feature(async_closure)]

#[macro_use]
extern crate log;

extern crate futures;
extern crate hashbrown;

extern crate cxmr_api_clients;
extern crate cxmr_broker;
extern crate cxmr_currency;
extern crate cxmr_http_client;

use futures::future::join_all;
use hashbrown::HashMap;

use cxmr_api::Account;
use cxmr_api_clients::{fetch_accounts, fetch_markets};
use cxmr_broker::{Broker, BrokerAccount, BrokerExchanges, Error, ExchangeBroker, SharedBroker};
use cxmr_currency::CurrencyPair;
use cxmr_exchanges::{Exchange, MarketOrder};
use cxmr_http_client::HttpsClient;

/// Initializes accounts after fetching latest information.
pub async fn new_accounts(
    client: HttpsClient,
    accounts: &Vec<Account>,
) -> Result<Vec<BrokerAccount>, Error> {
    let accounts = fetch_accounts(client.clone(), accounts).await?;
    Ok(accounts
        .into_iter()
        .map(|(info, client)| BrokerAccount::new(info, client))
        .collect::<Vec<BrokerAccount>>())
}

/// Creates new empty broker.
pub async fn new_broker_from_exchanges(
    client: HttpsClient,
    exchanges: BrokerExchanges,
    accounts: Vec<Account>,
) -> Result<SharedBroker, Error> {
    let accounts = new_accounts(client.clone(), &accounts).await?;
    let mut broker = Broker::new(accounts, exchanges);
    let accounts_with_pairs = broker
        .accounts()
        .into_iter()
        .filter_map(|account| Some((account, broker.get(account.exchange())?.trading_pairs())))
        .collect::<Vec<_>>();
    // make requests
    let requests = accounts_with_pairs
        .into_iter()
        .map(async move |(account, pairs)| {
            let orders = get_account_orders(account.clone(), pairs).await?;
            let data_key = account.client().user_data_stream(None).await?;
            let account = account.name().to_owned();
            Ok((account, orders, data_key))
        })
        .collect::<Vec<_>>();

    let responses = join_all(requests)
        .await
        .into_iter()
        .collect::<Result<Vec<_>, Error>>()?;

    for (account, orders, data_key) in responses {
        broker.insert_orders(account.clone(), orders);
        broker.set_user_data_key(account, data_key);
    }

    Ok(broker.into_shared())
}

/// Creates new empty broker.
pub async fn new_broker(
    client: HttpsClient,
    exchanges: &Vec<Exchange>,
    accounts: Vec<Account>,
) -> Result<SharedBroker, Error> {
    let exchanges = new_exchanges_brokers(client.clone(), exchanges).await?;
    new_broker_from_exchanges(client, exchanges, accounts).await
}

/// Fetches and initializes broker exchanges.
pub async fn new_exchanges_brokers(
    client: HttpsClient,
    exchanges: &Vec<Exchange>,
) -> Result<BrokerExchanges, Error> {
    trace!("Fetching markets");
    let info = fetch_markets(client, exchanges).await?;
    let mut exchanges = HashMap::new();
    info.into_iter().for_each(|info| {
        exchanges.insert(info.exchange.clone(), ExchangeBroker::new(info));
    });
    Ok(exchanges)
}

/// Gets account open orders.
pub async fn get_account_orders(
    account: BrokerAccount,
    pairs: Vec<CurrencyPair>,
) -> Result<Vec<MarketOrder>, Error> {
    let client = account.client();
    let requests = pairs.into_iter().map(async move |pair| {
        let orders = client.open_orders(pair).await?;
        Ok(orders) as Result<_, Error>
    });
    Ok(join_all(requests)
        .await
        .into_iter()
        .collect::<Result<Vec<_>, _>>()?
        .into_iter()
        .flatten()
        .collect())
}