#![feature(try_trait)]
extern crate actix;
extern crate env_logger;
extern crate signalr_rs;
#[macro_use]
extern crate serde;
use actix::{Arbiter, System};
use base64;
use futures::io;
use libflate::deflate::Decoder;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use serde_derive;
use serde_json;
use serde_json::{Map, Value};
use signalr_rs::hub::client::{HubClient, HubClientError, HubClientHandler, HubQuery, RestartPolicy};
use std::io::Read;
struct BittrexHandler {
hub: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct FillEntry {
#[serde(alias = "F")]
FillType: String,
#[serde(alias = "I")]
Id: i32,
#[serde(alias = "OT")]
OrderType: String,
#[serde(alias = "P")]
Price: f32,
#[serde(alias = "Q")]
Quantity: f32,
#[serde(alias = "T")]
TimeStamp: i64,
#[serde(alias = "U")]
Uuid: String,
#[serde(alias = "t")]
Total: f32,
}
#[derive(Debug, Serialize, Deserialize)]
struct OrderPair {
#[serde(alias = "Q")]
Q: f32,
#[serde(alias = "R")]
R: f32,
}
#[derive(Debug, Serialize, Deserialize)]
struct ExchangeState {
#[serde(alias = "M")]
MarketName: String,
#[serde(alias = "N")]
Nonce: i32,
#[serde(alias = "Z")]
Buys: Vec<OrderPair>,
#[serde(alias = "S")]
Sells: Vec<OrderPair>,
#[serde(alias = "f")]
Fills: Vec<FillEntry>,
}
#[derive(Debug, Serialize, Deserialize)]
struct Order {
#[serde(alias = "U")]
Uuid: String,
#[serde(alias = "OU")]
OrderUuid: String,
#[serde(alias = "I")]
Id: i64,
#[serde(alias = "E")]
Exchange: String,
#[serde(alias = "OT")]
OrderType: String,
#[serde(alias = "Q")]
Quantity: f32,
#[serde(alias = "q")]
QuantityRemaining: f32,
#[serde(alias = "X")]
Limit: f32,
#[serde(alias = "n")]
CommissionPaid: f32,
#[serde(alias = "P")]
Price: f32,
#[serde(alias = "PU")]
PricePerUnit: f32,
#[serde(alias = "Y")]
Opened: i64,
#[serde(alias = "C")]
Closed: i64,
#[serde(alias = "i")]
IsOpen: bool,
#[serde(alias = "CI")]
CancelInitiated: bool,
#[serde(alias = "K")]
ImmediateOrCancel: bool,
#[serde(alias = "k")]
IsConditional: bool,
#[serde(alias = "J")]
Condition: String,
#[serde(alias = "j")]
ConditionTarget: f32,
#[serde(alias = "u")]
Updated: i64,
}
#[derive(Debug, Serialize, Deserialize)]
struct OrderDelta {
#[serde(alias = "w")]
AccountUuid: String,
#[serde(alias = "N")]
Nonce: i32,
#[serde(alias = "TY")]
Type: i32,
#[serde(alias = "o")]
Order: Order,
}
#[derive(Debug, Serialize, Deserialize)]
enum TradeType {
ADD,
REMOVE,
UPDATE,
}
#[derive(Debug, Serialize, Deserialize)]
struct OrderLog {
#[serde(alias = "TY")]
Type: TradeType,
#[serde(alias = "R")]
Rate: f32,
#[serde(alias = "Q")]
Quantity: f32,
}
#[derive(Debug, Serialize, Deserialize)]
struct Fill {
#[serde(alias = "FI")]
FillId: i32,
#[serde(alias = "OT")]
OrderType: String,
#[serde(alias = "R")]
Rate: f32,
#[serde(alias = "Q")]
Quantity: f32,
#[serde(alias = "T")]
TimeStamp: i64,
}
#[derive(Debug, Serialize, Deserialize)]
struct MarketDelta {
#[serde(alias = "M")]
MarketName: String,
#[serde(alias = "N")]
Nonce: i32,
#[serde(alias = "Z")]
Buys: Vec<OrderLog>,
#[serde(alias = "S")]
Sells: Vec<OrderLog>,
#[serde(alias = "f")]
Fills: Vec<Fill>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SummaryDelta {
#[serde(alias = "M")]
MarketName: String,
#[serde(alias = "H")]
High: f32,
#[serde(alias = "L")]
Low: f32,
#[serde(alias = "V")]
Volume: f32,
#[serde(alias = "l")]
Last: f32,
#[serde(alias = "m")]
BaseVolume: f32,
#[serde(alias = "T")]
TimeStamp: i64,
#[serde(alias = "B")]
Bid: f32,
#[serde(alias = "A")]
Ask: f32,
#[serde(alias = "G")]
OpenBuyOrders: i32,
#[serde(alias = "g")]
OpenSellOrders: i32,
#[serde(alias = "PD")]
PrevDay: f32,
#[serde(alias = "x")]
Created: i64,
}
#[derive(Debug, Serialize, Deserialize)]
struct SummaryDeltaResponse {
#[serde(alias = "N")]
Nonce: i32,
#[serde(alias = "d")]
Delta: SummaryDelta,
}
impl BittrexHandler {
fn deflate<T>(binary: &String) -> Result<T, HubClientError>
where
T: DeserializeOwned,
{
let decoded = base64::decode(binary)?;
let mut decoder = Decoder::new(&decoded[..]);
let mut decoded_data: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decoded_data);
let v: &[u8] = &decoded_data;
serde_json::from_slice::<T>(v).map_err(|e| HubClientError::ParseError(e))
}
fn deflate_array<T>(a: &Value) -> Result<T, HubClientError>
where
T: DeserializeOwned,
{
let data: Vec<String> = serde_json::from_value(a.clone())?;
let binary = data.first().ok_or(HubClientError::MissingData)?;
BittrexHandler::deflate::<T>(binary)
}
fn deflate_string<T>(a: &Value) -> Result<T, HubClientError>
where
T: DeserializeOwned,
{
let binary: String = serde_json::from_value(a.clone())?;
BittrexHandler::deflate::<T>(&binary)
}
}
impl HubClientHandler for BittrexHandler {
fn connected(&self) {}
fn error(&self, id: Option<&str>, msg: &Value) {}
fn handle(&mut self, method: &str, message: &Value) {
match method {
"uE" => {
let delta = BittrexHandler::deflate_array::<MarketDelta>(message);
println!("Market Delta : {:?}", delta)
}
"uS" => {
let delta = BittrexHandler::deflate_array::<SummaryDeltaResponse>(message);
println!("Summary Delta : {:?}", delta)
}
s if s.starts_with("QE") => {
let delta = BittrexHandler::deflate_string::<ExchangeState>(message).unwrap();
let r = serde_json::to_string(&delta);
println!("Exchange State message : {:?}", r)
}
_ => println!(
"Unknown message : method {:?} message {:?}",
method, message
),
}
}
}
fn main() -> io::Result<()> {
env_logger::init();
let sys = System::new("websocket-client");
Arbiter::spawn(async {
let hub = "c2";
let handler = Box::new(BittrexHandler {
hub: hub.to_string(),
});
let client = HubClient::new(hub, "https://socket.bittrex.com/signalr/", 100, RestartPolicy::Always, handler).await;
match client {
Ok(addr) => {
addr.do_send(HubQuery::new(hub.to_string(), "SubscribeToExchangeDeltas".to_string(), vec!["BTC-NEO"], "1".to_string()));
addr.do_send(HubQuery::new(
hub.to_string(),
"QueryExchangeState".to_string(),
vec!["BTC-NEO"],
"QE2".to_string(),
));
}
Err(e) => {
println!("Hub client error : {:?}", e);
System::current().stop();
}
}
});
sys.run().unwrap();
Ok(())
}