use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::Stream;
use log::{error, warn};
use time::OffsetDateTime;
use crate::client::ClientRequestBuilders;
use crate::contracts::Contract;
use crate::messages::IncomingMessages;
use crate::protocol::{check_version, Features};
use crate::subscriptions::common::SubscriptionItem;
use crate::subscriptions::r#async::Subscription;
use crate::transport::{AsyncInternalSubscription, AsyncMessageBus};
use crate::{Client, Error, MAX_RETRIES};
use super::common::tick::{classify, TickAction};
use super::common::{self, decoders, encoders};
use super::{BarSize, Duration, HistogramEntry, HistoricalBarUpdate, HistoricalData, Schedule, TickDecoder, WhatToShow};
use crate::market_data::TradingHours;
impl Client {
pub async fn head_timestamp(&self, contract: &Contract, what_to_show: WhatToShow, trading_hours: TradingHours) -> Result<OffsetDateTime, Error> {
check_version(self.server_version(), Features::HEAD_TIMESTAMP)?;
let builder = self.request();
let request = encoders::encode_request_head_timestamp(builder.request_id(), contract, what_to_show, trading_hours.use_rth())?;
let mut subscription = builder.send_raw(request).await?;
match subscription.next().await {
Some(Ok(message)) if message.message_type() == IncomingMessages::HeadTimestamp => Ok(decoders::decode_head_timestamp(&message)?),
Some(Ok(message)) => Err(Error::unexpected_response(&message)),
Some(Err(e)) => Err(e),
None => {
Box::pin(self.head_timestamp(contract, what_to_show, trading_hours)).await
}
}
}
pub fn historical_data<'a>(&'a self, contract: &'a Contract, bar_size: BarSize) -> super::HistoricalDataBuilder<'a, Self> {
super::HistoricalDataBuilder::new(self, contract, bar_size)
}
pub fn historical_schedules<'a>(&'a self, contract: &'a Contract, duration: Duration) -> super::HistoricalScheduleBuilder<'a, Self> {
super::HistoricalScheduleBuilder::new(self, contract, duration)
}
pub fn historical_ticks<'a>(&'a self, contract: &'a Contract, number_of_ticks: i32) -> super::HistoricalTicksBuilder<'a, Self> {
super::HistoricalTicksBuilder::new(self, contract, number_of_ticks)
}
pub async fn cancel_historical_ticks(&self, request_id: i32) -> Result<(), Error> {
check_version(self.server_version(), Features::CANCEL_CONTRACT_DATA)?;
let message = encoders::encode_cancel_historical_ticks(request_id)?;
self.send_message(message).await?;
Ok(())
}
pub async fn histogram_data(&self, contract: &Contract, trading_hours: TradingHours, period: BarSize) -> Result<Vec<HistogramEntry>, Error> {
check_version(self.server_version(), Features::HISTOGRAM)?;
loop {
let builder = self.request();
let request = encoders::encode_request_histogram_data(builder.request_id(), contract, trading_hours.use_rth(), period)?;
let mut subscription = builder.send_raw(request).await?;
match subscription.next().await {
Some(Ok(message)) => return decoders::decode_histogram_data(&message),
Some(Err(e)) => return Err(e),
None => continue, }
}
}
}
pub(crate) async fn historical_data(
client: &Client,
contract: &Contract,
end_date: Option<OffsetDateTime>,
duration: Duration,
bar_size: BarSize,
what_to_show: WhatToShow,
trading_hours: TradingHours,
) -> Result<HistoricalData, Error> {
common::validate_historical_data(client.server_version(), contract, end_date, Some(what_to_show))?;
for _ in 0..MAX_RETRIES {
let builder = client.request();
let request = encoders::encode_request_historical_data(
builder.request_id(),
contract,
end_date,
duration,
bar_size,
Some(what_to_show),
trading_hours.use_rth(),
false,
&Vec::<crate::contracts::TagValue>::default(),
)?;
let mut subscription = builder.send_raw(request).await?;
match subscription.next().await {
Some(Ok(message)) if message.message_type() == IncomingMessages::HistoricalData => {
let mut data = decoders::decode_historical_data(&message)?;
if let Some(Ok(end_msg)) = subscription.next().await {
let (start, end) = decoders::decode_historical_data_end(&end_msg)?;
data.start = start;
data.end = end;
}
return Ok(data);
}
Some(Ok(message)) if message.message_type() == IncomingMessages::Error => return Err(Error::from(message)),
Some(Ok(message)) => return Err(Error::unexpected_response(&message)),
Some(Err(e)) => return Err(e),
None => continue, }
}
Err(Error::ConnectionReset)
}
pub(crate) async fn historical_data_stream(
client: &Client,
contract: &Contract,
duration: Duration,
bar_size: BarSize,
what_to_show: WhatToShow,
trading_hours: TradingHours,
) -> Result<Subscription<HistoricalBarUpdate>, Error> {
if !contract.trading_class.is_empty() || contract.contract_id > 0 {
check_version(client.server_version(), Features::TRADING_CLASS)?;
}
let builder = client.request();
let request = encoders::encode_request_historical_data(
builder.request_id(),
contract,
None, duration,
bar_size,
Some(what_to_show),
trading_hours.use_rth(),
true, &Vec::<crate::contracts::TagValue>::default(),
)?;
builder.send::<HistoricalBarUpdate>(request).await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn historical_ticks<T: TickDecoder<T> + Send>(
client: &Client,
contract: &Contract,
start: Option<OffsetDateTime>,
end: Option<OffsetDateTime>,
number_of_ticks: i32,
what_to_show: WhatToShow,
trading_hours: TradingHours,
ignore_size: bool,
) -> Result<TickSubscription<T>, Error> {
check_version(client.server_version(), Features::HISTORICAL_TICKS)?;
let builder = client.request();
let request = encoders::encode_request_historical_ticks(
builder.request_id(),
contract,
start,
end,
number_of_ticks,
what_to_show,
trading_hours.use_rth(),
ignore_size,
)?;
let request_id = builder.request_id();
let subscription = builder.send_raw(request).await?;
Ok(TickSubscription::new(subscription, request_id, Arc::clone(&client.message_bus)))
}
pub(crate) async fn historical_schedule(
client: &Client,
contract: &Contract,
end_date: Option<OffsetDateTime>,
duration: Duration,
) -> Result<Schedule, Error> {
common::validate_historical_data(client.server_version(), contract, end_date, Some(WhatToShow::Schedule))?;
loop {
let builder = client.request();
let request = encoders::encode_request_historical_data(
builder.request_id(),
contract,
end_date,
duration,
BarSize::Day,
Some(WhatToShow::Schedule),
true,
false,
&Vec::<crate::contracts::TagValue>::default(),
)?;
let mut subscription = builder.send_raw(request).await?;
match subscription.next().await {
Some(Ok(message)) if message.message_type() == IncomingMessages::HistoricalSchedule => {
return decoders::decode_historical_schedule(&message)
}
Some(Ok(message)) => return Err(Error::unexpected_response(&message)),
Some(Err(e)) => return Err(e),
None => continue, }
}
}
#[must_use = "TickSubscription must be polled (.next().await or .filter_data()) to receive ticks; dropping it cancels the request"]
pub struct TickSubscription<T: TickDecoder<T> + Send> {
done: bool,
stream_ended: bool,
messages: AsyncInternalSubscription,
buffer: VecDeque<T>,
request_id: i32,
message_bus: Arc<dyn AsyncMessageBus>,
cancelled: AtomicBool,
}
impl<T: TickDecoder<T> + Send> TickSubscription<T> {
fn new(messages: AsyncInternalSubscription, request_id: i32, message_bus: Arc<dyn AsyncMessageBus>) -> Self {
Self {
done: false,
stream_ended: false,
messages,
buffer: VecDeque::new(),
request_id,
message_bus,
cancelled: AtomicBool::new(false),
}
}
pub async fn cancel(&self) {
if self.cancelled.swap(true, Ordering::Relaxed) {
return;
}
match encoders::encode_cancel_historical_ticks(self.request_id) {
Ok(message) => {
if let Err(e) = self.message_bus.cancel_subscription(self.request_id, message).await {
warn!("error cancelling historical ticks subscription: {e}");
}
}
Err(e) => error!("error encoding cancel historical ticks: {e}"),
}
}
}
impl<T: TickDecoder<T> + Send + Unpin> Stream for TickSubscription<T> {
type Item = Result<SubscriptionItem<T>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(tick) = this.buffer.pop_front() {
return Poll::Ready(Some(Ok(SubscriptionItem::Data(tick))));
}
if this.done || this.stream_ended {
return Poll::Ready(None);
}
let routed = match Pin::new(&mut this.messages.stream).poll_next(cx) {
Poll::Ready(Some(Ok(item))) => item,
Poll::Ready(Some(Err(_lagged))) => continue, Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
};
match classify::<T>(routed) {
TickAction::Batch(ticks, done) => {
this.buffer.extend(ticks);
this.done = done;
}
TickAction::Skip => {}
TickAction::Notice(notice) => return Poll::Ready(Some(Ok(SubscriptionItem::Notice(notice)))),
TickAction::EndOfStream => {
this.stream_ended = true;
return Poll::Ready(None);
}
TickAction::Error(e) => {
this.stream_ended = true;
return Poll::Ready(Some(Err(e)));
}
}
}
}
}
impl<T: TickDecoder<T> + Send> Drop for TickSubscription<T> {
fn drop(&mut self) {
if self.done || self.cancelled.swap(true, Ordering::Relaxed) {
return;
}
let request_id = self.request_id;
let message_bus = self.message_bus.clone();
if let Ok(message) = encoders::encode_cancel_historical_ticks(request_id) {
tokio::spawn(async move {
if let Err(e) = message_bus.cancel_subscription(request_id, message).await {
warn!("error sending cancel historical ticks in drop: {e}");
}
});
}
}
}
#[cfg(test)]
#[path = "async_tests.rs"]
mod tests;