use log::debug;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::client::{DataStream, ResponseContext, Subscription};
use crate::contracts::tick_types::TickType;
use crate::contracts::{Contract, OptionComputation};
use crate::messages::{self, IncomingMessages, Notice, OutgoingMessages, RequestMessage, ResponseMessage};
use crate::orders::TagValue;
use crate::server_versions;
use crate::ToField;
use crate::{Client, Error};
mod decoders;
pub(crate) mod encoders;
#[cfg(test)]
mod tests;
#[derive(Clone, Debug, Copy, Serialize, Deserialize, PartialEq)]
pub enum BarSize {
Sec5,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct BidAsk {
pub time: OffsetDateTime,
pub bid_price: f64,
pub ask_price: f64,
pub bid_size: f64,
pub ask_size: f64,
pub bid_ask_attribute: BidAskAttribute,
}
impl DataStream<BidAsk> for BidAsk {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::TickByTick];
fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickByTick => decoders::decode_bid_ask_tick(message),
IncomingMessages::Error => Err(Error::from(message.clone())),
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
fn cancel_message(_server_version: i32, request_id: Option<i32>, _context: &ResponseContext) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel realtime bars");
encoders::encode_cancel_tick_by_tick(request_id)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct BidAskAttribute {
pub bid_past_low: bool,
pub ask_past_high: bool,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct MidPoint {
pub time: OffsetDateTime,
pub mid_point: f64,
}
impl DataStream<MidPoint> for MidPoint {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::TickByTick];
fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickByTick => decoders::decode_mid_point_tick(message),
IncomingMessages::Error => Err(Error::from(message.clone())),
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
fn cancel_message(_server_version: i32, request_id: Option<i32>, _context: &ResponseContext) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel mid point ticks");
encoders::encode_cancel_tick_by_tick(request_id)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Bar {
pub date: OffsetDateTime,
pub open: f64,
pub high: f64,
pub low: f64,
pub close: f64,
pub volume: f64,
pub wap: f64,
pub count: i32,
}
impl DataStream<Bar> for Bar {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::RealTimeBars];
fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
decoders::decode_realtime_bar(message)
}
fn cancel_message(_server_version: i32, request_id: Option<i32>, _context: &ResponseContext) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel realtime bars");
encoders::encode_cancel_realtime_bars(request_id)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct Trade {
pub tick_type: String,
pub time: OffsetDateTime,
pub price: f64,
pub size: f64,
pub trade_attribute: TradeAttribute,
pub exchange: String,
pub special_conditions: String,
}
impl DataStream<Trade> for Trade {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::TickByTick];
fn decode(_client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickByTick => decoders::decode_trade_tick(message),
IncomingMessages::Error => Err(Error::from(message.clone())),
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
fn cancel_message(_server_version: i32, request_id: Option<i32>, _context: &ResponseContext) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel realtime bars");
encoders::encode_cancel_tick_by_tick(request_id)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct TradeAttribute {
pub past_limit: bool,
pub unreported: bool,
}
#[derive(Clone, Debug, Copy)]
pub enum WhatToShow {
Trades,
MidPoint,
Bid,
Ask,
}
impl std::fmt::Display for WhatToShow {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Trades => write!(f, "TRADES"),
Self::MidPoint => write!(f, "MIDPOINT"),
Self::Bid => write!(f, "BID"),
Self::Ask => write!(f, "ASK"),
}
}
}
impl ToField for WhatToShow {
fn to_field(&self) -> String {
self.to_string()
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum MarketDepths {
MarketDepth(MarketDepth),
MarketDepthL2(MarketDepthL2),
Notice(Notice),
}
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct MarketDepth {
pub position: i32,
pub operation: i32,
pub side: i32,
pub price: f64,
pub size: f64,
}
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct MarketDepthL2 {
pub position: i32,
pub market_maker: String,
pub operation: i32,
pub side: i32,
pub price: f64,
pub size: f64,
pub smart_depth: bool,
}
impl DataStream<MarketDepths> for MarketDepths {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[IncomingMessages::MarketDepth, IncomingMessages::MarketDepthL2, IncomingMessages::Error];
fn decode(client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::MarketDepth => Ok(MarketDepths::MarketDepth(decoders::decode_market_depth(message)?)),
IncomingMessages::MarketDepthL2 => Ok(MarketDepths::MarketDepthL2(decoders::decode_market_depth_l2(
client.server_version,
message,
)?)),
IncomingMessages::Error => {
let code = message.peek_int(messages::CODE_INDEX).unwrap();
if (2100..2200).contains(&code) {
Ok(MarketDepths::Notice(Notice::from(message)))
} else {
Err(Error::from(message.clone()))
}
}
_ => Err(Error::UnexpectedResponse(message.clone())),
}
}
fn cancel_message(server_version: i32, request_id: Option<i32>, context: &ResponseContext) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel realtime bars");
encoders::encode_cancel_market_depth(server_version, request_id, context.is_smart_depth)
}
}
#[derive(Debug, Default)]
pub struct DepthMarketDataDescription {
pub exchange_name: String,
pub security_type: String,
pub listing_exchange: String,
pub service_data_type: String,
pub aggregated_group: Option<String>,
}
#[derive(Debug)]
pub enum TickTypes {
Price(TickPrice),
Size(TickSize),
String(TickString),
EFP(TickEFP),
Generic(TickGeneric),
OptionComputation(OptionComputation),
SnapshotEnd,
Notice(Notice),
RequestParameters(TickRequestParameters),
PriceSize(TickPriceSize),
}
impl DataStream<TickTypes> for TickTypes {
const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[
IncomingMessages::TickPrice,
IncomingMessages::TickSize,
IncomingMessages::TickString,
IncomingMessages::TickEFP,
IncomingMessages::TickGeneric,
IncomingMessages::TickOptionComputation,
IncomingMessages::TickSnapshotEnd,
IncomingMessages::Error,
IncomingMessages::TickReqParams,
];
fn decode(client: &Client, message: &mut ResponseMessage) -> Result<Self, Error> {
match message.message_type() {
IncomingMessages::TickPrice => Ok(decoders::decode_tick_price(client.server_version, message)?),
IncomingMessages::TickSize => Ok(TickTypes::Size(decoders::decode_tick_size(message)?)),
IncomingMessages::TickString => Ok(TickTypes::String(decoders::decode_tick_string(message)?)),
IncomingMessages::TickEFP => Ok(TickTypes::EFP(decoders::decode_tick_efp(message)?)),
IncomingMessages::TickGeneric => Ok(TickTypes::Generic(decoders::decode_tick_generic(message)?)),
IncomingMessages::TickOptionComputation => Ok(TickTypes::OptionComputation(decoders::decode_tick_option_computation(
client.server_version,
message,
)?)),
IncomingMessages::TickReqParams => Ok(TickTypes::RequestParameters(decoders::decode_tick_request_parameters(message)?)),
IncomingMessages::TickSnapshotEnd => Ok(TickTypes::SnapshotEnd),
IncomingMessages::Error => Ok(TickTypes::Notice(Notice::from(message))),
_ => Err(Error::NotImplemented),
}
}
fn cancel_message(_server_version: i32, request_id: Option<i32>, _context: &ResponseContext) -> Result<RequestMessage, Error> {
let request_id = request_id.expect("Request ID required to encode cancel realtime bars");
encoders::encode_cancel_market_data(request_id)
}
fn is_snapshot_end(&self) -> bool {
matches!(self, TickTypes::SnapshotEnd)
}
}
#[derive(Debug, Default)]
pub struct TickPrice {
pub tick_type: TickType,
pub price: f64,
pub attributes: TickAttribute,
}
#[derive(Debug, PartialEq, Default)]
pub struct TickAttribute {
pub can_auto_execute: bool,
pub past_limit: bool,
pub pre_open: bool,
}
#[derive(Debug, Default)]
pub struct TickSize {
pub tick_type: TickType,
pub size: f64,
}
#[derive(Debug, Default)]
pub struct TickPriceSize {
pub price_tick_type: TickType,
pub price: f64,
pub attributes: TickAttribute,
pub size_tick_type: TickType,
pub size: f64,
}
#[derive(Debug, Default)]
pub struct TickString {
pub tick_type: TickType,
pub value: String,
}
#[derive(Debug, Default)]
pub struct TickEFP {
pub tick_type: TickType,
pub basis_points: f64,
pub formatted_basis_points: String,
pub implied_futures_price: f64,
pub hold_days: i32,
pub future_last_trade_date: String,
pub dividend_impact: f64,
pub dividends_to_last_trade_date: f64,
}
#[derive(Debug, Default)]
pub struct TickGeneric {
pub tick_type: TickType,
pub value: f64,
}
#[derive(Debug, Default)]
pub struct TickRequestParameters {
pub min_tick: f64,
pub bbo_exchange: String,
pub snapshot_permissions: i32,
}
pub(crate) fn realtime_bars<'a>(
client: &'a Client,
contract: &Contract,
bar_size: &BarSize,
what_to_show: &WhatToShow,
use_rth: bool,
options: Vec<TagValue>,
) -> Result<Subscription<'a, Bar>, Error> {
let request_id = client.next_request_id();
let request = encoders::encode_request_realtime_bars(client.server_version(), request_id, contract, bar_size, what_to_show, use_rth, options)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(client, subscription, ResponseContext::default()))
}
pub(crate) fn tick_by_tick_all_last<'a>(
client: &'a Client,
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, Trade>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;
let server_version = client.server_version();
let request_id = client.next_request_id();
let request = encoders::encode_tick_by_tick(server_version, request_id, contract, "AllLast", number_of_ticks, ignore_size)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(client, subscription, ResponseContext::default()))
}
fn validate_tick_by_tick_request(client: &Client, _contract: &Contract, number_of_ticks: i32, ignore_size: bool) -> Result<(), Error> {
client.check_server_version(server_versions::TICK_BY_TICK, "It does not support tick-by-tick requests.")?;
if number_of_ticks != 0 || ignore_size {
client.check_server_version(
server_versions::TICK_BY_TICK_IGNORE_SIZE,
"It does not support ignore_size and number_of_ticks parameters in tick-by-tick requests.",
)?;
}
Ok(())
}
pub(crate) fn tick_by_tick_last<'a>(
client: &'a Client,
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, Trade>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;
let server_version = client.server_version();
let request_id = client.next_request_id();
let request = encoders::encode_tick_by_tick(server_version, request_id, contract, "Last", number_of_ticks, ignore_size)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(client, subscription, ResponseContext::default()))
}
pub(crate) fn tick_by_tick_bid_ask<'a>(
client: &'a Client,
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, BidAsk>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;
let server_version = client.server_version();
let request_id = client.next_request_id();
let request = encoders::encode_tick_by_tick(server_version, request_id, contract, "BidAsk", number_of_ticks, ignore_size)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(client, subscription, ResponseContext::default()))
}
pub(crate) fn tick_by_tick_midpoint<'a>(
client: &'a Client,
contract: &Contract,
number_of_ticks: i32,
ignore_size: bool,
) -> Result<Subscription<'a, MidPoint>, Error> {
validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;
let server_version = client.server_version();
let request_id = client.next_request_id();
let request = encoders::encode_tick_by_tick(server_version, request_id, contract, "MidPoint", number_of_ticks, ignore_size)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(client, subscription, ResponseContext::default()))
}
pub(crate) fn market_depth<'a>(
client: &'a Client,
contract: &Contract,
number_of_rows: i32,
is_smart_depth: bool,
) -> Result<Subscription<'a, MarketDepths>, Error> {
if is_smart_depth {
client.check_server_version(server_versions::SMART_DEPTH, "It does not support SMART depth request.")?;
}
if !contract.primary_exchange.is_empty() {
client.check_server_version(
server_versions::MKT_DEPTH_PRIM_EXCHANGE,
"It does not support primary_exchange parameter in request_market_depth",
)?;
}
let request_id = client.next_request_id();
let request = encoders::encode_request_market_depth(client.server_version, request_id, contract, number_of_rows, is_smart_depth)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(
client,
subscription,
ResponseContext {
is_smart_depth,
..Default::default()
},
))
}
pub fn market_depth_exchanges(client: &Client) -> Result<Vec<DepthMarketDataDescription>, Error> {
client.check_server_version(
server_versions::REQ_MKT_DEPTH_EXCHANGES,
"It does not support market depth exchanges requests.",
)?;
loop {
let request = encoders::encode_request_market_depth_exchanges()?;
let subscription = client.send_shared_request(OutgoingMessages::RequestMktDepthExchanges, request)?;
let response = subscription.next();
match response {
Some(Ok(mut message)) => return decoders::decode_market_depth_exchanges(client.server_version(), &mut message),
Some(Err(Error::ConnectionReset)) => {
debug!("connection reset. retrying market_depth_exchanges");
continue;
}
Some(Err(e)) => return Err(e),
None => return Ok(Vec::new()),
}
}
}
pub fn market_data<'a>(
client: &'a Client,
contract: &Contract,
generic_ticks: &[&str],
snapshot: bool,
regulatory_snapshot: bool,
) -> Result<Subscription<'a, TickTypes>, Error> {
let request_id = client.next_request_id();
let request = encoders::encode_request_market_data(
client.server_version(),
request_id,
contract,
generic_ticks,
snapshot,
regulatory_snapshot,
)?;
let subscription = client.send_request(request_id, request)?;
Ok(Subscription::new(client, subscription, ResponseContext::default()))
}