use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use log::{error, warn};
use time::OffsetDateTime;
use crate::client::blocking::ClientRequestBuilders;
use crate::contracts::Contract;
use crate::messages::IncomingMessages;
use crate::protocol::{check_version, Features};
use crate::subscriptions::common::{RoutedItem, SubscriptionItem};
use crate::subscriptions::sync::{FilterData, Subscription, SubscriptionItemIterExt};
use crate::transport::{InternalSubscription, MessageBus};
use crate::{client::sync::Client, Error, MAX_RETRIES};
use super::common::tick::{classify, TickAction};
use super::common::{self, decoders, encoders};
use super::{BarSize, Duration, HistogramEntry, HistoricalBarUpdate, HistoricalData, Schedule, TickDecoder, WhatToShow};
use crate::market_data::TradingHours;
impl Client {
pub fn head_timestamp(&self, contract: &Contract, what_to_show: WhatToShow, trading_hours: TradingHours) -> Result<OffsetDateTime, Error> {
check_version(self.server_version(), Features::HEAD_TIMESTAMP)?;
let builder = self.request();
let request = encoders::encode_request_head_timestamp(builder.request_id(), contract, what_to_show, trading_hours.use_rth())?;
let subscription = builder.send_raw(request)?;
match subscription.next() {
Some(Ok(message)) if message.message_type() == IncomingMessages::HeadTimestamp => Ok(decoders::decode_head_timestamp(&message)?),
Some(Ok(message)) => Err(Error::unexpected_response(&message)),
Some(Err(Error::ConnectionReset)) => self.head_timestamp(contract, what_to_show, trading_hours),
Some(Err(e)) => Err(e),
None => Err(Error::UnexpectedEndOfStream),
}
}
pub fn historical_data<'a>(&'a self, contract: &'a Contract, bar_size: BarSize) -> super::HistoricalDataBuilder<'a, Self> {
super::HistoricalDataBuilder::new(self, contract, bar_size)
}
pub fn historical_schedules<'a>(&'a self, contract: &'a Contract, duration: Duration) -> super::HistoricalScheduleBuilder<'a, Self> {
super::HistoricalScheduleBuilder::new(self, contract, duration)
}
pub fn historical_ticks<'a>(&'a self, contract: &'a Contract, number_of_ticks: i32) -> super::HistoricalTicksBuilder<'a, Self> {
super::HistoricalTicksBuilder::new(self, contract, number_of_ticks)
}
pub fn cancel_historical_ticks(&self, request_id: i32) -> Result<(), Error> {
check_version(self.server_version(), Features::CANCEL_CONTRACT_DATA)?;
let message = encoders::encode_cancel_historical_ticks(request_id)?;
self.send_message(message)?;
Ok(())
}
pub fn histogram_data(&self, contract: &Contract, trading_hours: TradingHours, period: BarSize) -> Result<Vec<HistogramEntry>, Error> {
check_version(self.server_version(), Features::HISTOGRAM)?;
loop {
let builder = self.request();
let request = encoders::encode_request_histogram_data(builder.request_id(), contract, trading_hours.use_rth(), period)?;
let subscription = builder.send_raw(request)?;
match subscription.next() {
Some(Ok(message)) => return decoders::decode_histogram_data(&message),
Some(Err(Error::ConnectionReset)) => continue,
Some(Err(e)) => return Err(e),
None => return Ok(Vec::new()),
}
}
}
}
pub(crate) fn historical_data(
client: &Client,
contract: &Contract,
end_date: Option<OffsetDateTime>,
duration: Duration,
bar_size: BarSize,
what_to_show: WhatToShow,
trading_hours: TradingHours,
) -> Result<HistoricalData, Error> {
common::validate_historical_data(client.server_version(), contract, end_date, Some(what_to_show))?;
for _ in 0..MAX_RETRIES {
let builder = client.request();
let request = encoders::encode_request_historical_data(
builder.request_id(),
contract,
end_date,
duration,
bar_size,
Some(what_to_show),
trading_hours.use_rth(),
false,
&Vec::<crate::contracts::TagValue>::default(),
)?;
let subscription = builder.send_raw(request)?;
match subscription.next() {
Some(Ok(message)) if message.message_type() == IncomingMessages::HistoricalData => {
let mut data = decoders::decode_historical_data(&message)?;
if let Some(Ok(end_msg)) = subscription.next() {
let (start, end) = decoders::decode_historical_data_end(&end_msg)?;
data.start = start;
data.end = end;
}
return Ok(data);
}
Some(Ok(message)) if message.message_type() == IncomingMessages::Error => return Err(Error::from(message)),
Some(Ok(message)) => return Err(Error::unexpected_response(&message)),
Some(Err(Error::ConnectionReset)) => {}
Some(Err(e)) => return Err(e),
None => return Err(Error::UnexpectedEndOfStream),
}
}
Err(Error::ConnectionReset)
}
pub(crate) fn historical_data_stream(
client: &Client,
contract: &Contract,
duration: Duration,
bar_size: BarSize,
what_to_show: WhatToShow,
trading_hours: TradingHours,
) -> Result<Subscription<HistoricalBarUpdate>, Error> {
if !contract.trading_class.is_empty() || contract.contract_id > 0 {
check_version(client.server_version(), Features::TRADING_CLASS)?;
}
let builder = client.request();
let request = encoders::encode_request_historical_data(
builder.request_id(),
contract,
None, duration,
bar_size,
Some(what_to_show),
trading_hours.use_rth(),
true, &Vec::<crate::contracts::TagValue>::default(),
)?;
builder.send::<HistoricalBarUpdate>(request)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn historical_ticks<T: TickDecoder<T>>(
client: &Client,
contract: &Contract,
start: Option<OffsetDateTime>,
end: Option<OffsetDateTime>,
number_of_ticks: i32,
what_to_show: WhatToShow,
trading_hours: TradingHours,
ignore_size: bool,
) -> Result<TickSubscription<T>, Error> {
check_version(client.server_version(), Features::HISTORICAL_TICKS)?;
let builder = client.request();
let request = encoders::encode_request_historical_ticks(
builder.request_id(),
contract,
start,
end,
number_of_ticks,
what_to_show,
trading_hours.use_rth(),
ignore_size,
)?;
let request_id = builder.request_id();
let subscription = builder.send_raw(request)?;
Ok(TickSubscription::new(subscription, request_id, Arc::clone(&client.message_bus)))
}
pub(crate) fn historical_schedule(
client: &Client,
contract: &Contract,
end_date: Option<OffsetDateTime>,
duration: Duration,
) -> Result<Schedule, Error> {
common::validate_historical_data(client.server_version(), contract, end_date, Some(WhatToShow::Schedule))?;
loop {
let builder = client.request();
let request = encoders::encode_request_historical_data(
builder.request_id(),
contract,
end_date,
duration,
BarSize::Day,
Some(WhatToShow::Schedule),
true,
false,
&Vec::<crate::contracts::TagValue>::default(),
)?;
let subscription = builder.send_raw(request)?;
match subscription.next() {
Some(Ok(message)) if message.message_type() == IncomingMessages::HistoricalSchedule => {
return decoders::decode_historical_schedule(&message)
}
Some(Ok(message)) => return Err(Error::unexpected_response(&message)),
Some(Err(Error::ConnectionReset)) => {}
Some(Err(e)) => return Err(e),
None => return Err(Error::UnexpectedEndOfStream),
}
}
}
#[must_use = "TickSubscription must be polled (.next() or .iter_data()) to receive ticks; dropping it cancels the request"]
pub struct TickSubscription<T: TickDecoder<T>> {
done: AtomicBool,
stream_ended: AtomicBool,
messages: InternalSubscription,
buffer: Mutex<VecDeque<T>>,
request_id: i32,
message_bus: Arc<dyn MessageBus>,
cancelled: AtomicBool,
}
impl<T: TickDecoder<T>> TickSubscription<T> {
fn new(messages: InternalSubscription, request_id: i32, message_bus: Arc<dyn MessageBus>) -> Self {
Self {
done: false.into(),
stream_ended: AtomicBool::new(false),
messages,
buffer: Mutex::new(VecDeque::new()),
request_id,
message_bus,
cancelled: AtomicBool::new(false),
}
}
pub fn cancel(&self) {
if self.cancelled.swap(true, Ordering::Relaxed) {
return;
}
match encoders::encode_cancel_historical_ticks(self.request_id) {
Ok(message) => {
if let Err(e) = self.message_bus.cancel_subscription(self.request_id, &message) {
warn!("error cancelling historical ticks subscription: {e}");
}
self.messages.cancel();
}
Err(e) => error!("error encoding cancel historical ticks: {e}"),
}
}
pub fn iter(&self) -> TickSubscriptionIter<'_, T> {
TickSubscriptionIter { subscription: self }
}
pub fn try_iter(&self) -> TickSubscriptionTryIter<'_, T> {
TickSubscriptionTryIter { subscription: self }
}
pub fn timeout_iter(&self, duration: std::time::Duration) -> TickSubscriptionTimeoutIter<'_, T> {
TickSubscriptionTimeoutIter {
subscription: self,
timeout: duration,
}
}
pub fn iter_data(&self) -> FilterData<TickSubscriptionIter<'_, T>> {
self.iter().filter_data()
}
pub fn try_iter_data(&self) -> FilterData<TickSubscriptionTryIter<'_, T>> {
self.try_iter().filter_data()
}
pub fn timeout_iter_data(&self, duration: std::time::Duration) -> FilterData<TickSubscriptionTimeoutIter<'_, T>> {
self.timeout_iter(duration).filter_data()
}
pub fn next(&self) -> Option<Result<SubscriptionItem<T>, Error>> {
self.next_helper(|| self.messages.next_routed())
}
pub fn try_next(&self) -> Option<Result<SubscriptionItem<T>, Error>> {
self.next_helper(|| self.messages.try_next_routed())
}
pub fn next_timeout(&self, duration: std::time::Duration) -> Option<Result<SubscriptionItem<T>, Error>> {
self.next_helper(|| self.messages.next_timeout_routed(duration))
}
pub fn next_data(&self) -> Option<Result<T, Error>> {
self.iter_data().next()
}
fn next_helper<F>(&self, next_routed: F) -> Option<Result<SubscriptionItem<T>, Error>>
where
F: Fn() -> Option<RoutedItem>,
{
loop {
if let Some(tick) = self.next_buffered() {
return Some(Ok(SubscriptionItem::Data(tick)));
}
if self.done.load(Ordering::Relaxed) || self.stream_ended.load(Ordering::Relaxed) {
return None;
}
let item = next_routed()?;
match classify::<T>(item) {
TickAction::Batch(ticks, done) => {
self.buffer.lock().unwrap().extend(ticks);
self.done.store(done, Ordering::Relaxed);
}
TickAction::Skip => {}
TickAction::Notice(notice) => return Some(Ok(SubscriptionItem::Notice(notice))),
TickAction::EndOfStream => {
self.stream_ended.store(true, Ordering::Relaxed);
return None;
}
TickAction::Error(e) => {
self.stream_ended.store(true, Ordering::Relaxed);
return Some(Err(e));
}
}
}
}
fn next_buffered(&self) -> Option<T> {
let mut buffer = self.buffer.lock().unwrap();
buffer.pop_front()
}
}
impl<T: TickDecoder<T>> Drop for TickSubscription<T> {
fn drop(&mut self) {
if !self.done.load(Ordering::Relaxed) {
self.cancel();
}
}
}
pub struct TickSubscriptionIter<'a, T: TickDecoder<T>> {
subscription: &'a TickSubscription<T>,
}
impl<T: TickDecoder<T>> Iterator for TickSubscriptionIter<'_, T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
impl<'a, T: TickDecoder<T>> IntoIterator for &'a TickSubscription<T> {
type Item = Result<SubscriptionItem<T>, Error>;
type IntoIter = TickSubscriptionIter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct TickSubscriptionOwnedIter<T: TickDecoder<T>> {
subscription: TickSubscription<T>,
}
impl<T: TickDecoder<T>> Iterator for TickSubscriptionOwnedIter<T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next()
}
}
impl<T: TickDecoder<T>> IntoIterator for TickSubscription<T> {
type Item = Result<SubscriptionItem<T>, Error>;
type IntoIter = TickSubscriptionOwnedIter<T>;
fn into_iter(self) -> Self::IntoIter {
TickSubscriptionOwnedIter { subscription: self }
}
}
pub struct TickSubscriptionTryIter<'a, T: TickDecoder<T>> {
subscription: &'a TickSubscription<T>,
}
impl<T: TickDecoder<T>> Iterator for TickSubscriptionTryIter<'_, T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.try_next()
}
}
pub struct TickSubscriptionTimeoutIter<'a, T: TickDecoder<T>> {
subscription: &'a TickSubscription<T>,
timeout: std::time::Duration,
}
impl<T: TickDecoder<T>> Iterator for TickSubscriptionTimeoutIter<'_, T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.subscription.next_timeout(self.timeout)
}
}
#[cfg(test)]
#[path = "sync_tests.rs"]
mod tests;