ibapi 3.0.0

A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance.
Documentation
use log::debug;

use crate::client::ClientRequestBuilders;
use crate::contracts::{Contract, TagValue};
use crate::messages::OutgoingMessages;
use crate::protocol::{check_version, Features};
use crate::subscriptions::Subscription;
use crate::{server_versions, Client, Error};

use super::common::{decoders, encoders};
use super::{Bar, DepthMarketDataDescription, MarketDepthBuilder, MarketDepths, RealtimeBarsBuilder, TickByTickBuilder, TickTypes, WhatToShow};
use crate::market_data::{SmartDepth, TradingHours};
use crate::subscriptions::StreamDecoder;

impl Client {
    /// Switches market data type returned from request_market_data requests to Live, Frozen, Delayed, or FrozenDelayed.
    ///
    /// # Arguments
    /// * `market_data_type` - Type of market data to retrieve.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ibapi::prelude::*;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let client = Client::connect("127.0.0.1:4002", 100).await.expect("connection failed");
    ///
    ///     let market_data_type = MarketDataType::Realtime;
    ///     client.switch_market_data_type(market_data_type).await.expect("request failed");
    ///     println!("market data switched: {market_data_type:?}");
    /// }
    /// ```
    pub async fn switch_market_data_type(&self, market_data_type: crate::market_data::MarketDataType) -> Result<(), Error> {
        self.check_server_version(server_versions::REQ_MARKET_DATA_TYPE, "It does not support market data type requests.")?;

        let message = crate::market_data::encoders::encode_request_market_data_type(market_data_type)?;
        self.send_message(message).await
    }

    /// Returns a builder for a real-time 5-second bar subscription.
    ///
    /// Defaults to `WhatToShow::Trades` and `TradingHours::Regular`. See
    /// [`RealtimeBarsBuilder`] for the chained methods.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ibapi::prelude::*;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let client = Client::connect("127.0.0.1:4002", 100).await.expect("connection failed");
    ///     let contract = Contract::stock("TSLA").build();
    ///
    ///     let mut subscription = client
    ///         .realtime_bars(&contract)
    ///         .what_to_show(RealtimeWhatToShow::Trades)
    ///         .trading_hours(TradingHours::Extended)
    ///         .subscribe()
    ///         .await
    ///         .expect("realtime bars request failed");
    ///
    ///     while let Some(item) = subscription.next().await {
    ///         match item {
    ///             Ok(SubscriptionItem::Data(bar)) => println!("{bar:?}"),
    ///             Ok(SubscriptionItem::Notice(n)) => eprintln!("notice: {n}"),
    ///             Err(e) => { eprintln!("error: {e}"); break; }
    ///         }
    ///     }
    /// }
    /// ```
    pub fn realtime_bars<'a>(&'a self, contract: &'a Contract) -> RealtimeBarsBuilder<'a, Self> {
        RealtimeBarsBuilder::new(self, contract)
    }

    pub(crate) async fn subscribe_realtime_bars(
        &self,
        contract: &Contract,
        what_to_show: &WhatToShow,
        trading_hours: TradingHours,
        options: &[TagValue],
    ) -> Result<Subscription<Bar>, Error> {
        let builder = self.request();
        let request = encoders::encode_request_realtime_bars(builder.request_id(), contract, what_to_show, trading_hours.use_rth(), options)?;

        builder.send::<Bar>(request).await
    }

    /// Returns a builder for a tick-by-tick real-time subscription.
    ///
    /// Pick the tick stream with the terminal — `.last()` / `.all_last()` /
    /// `.bid_ask(IgnoreSize)` / `.mid_point()`. See [`TickByTickBuilder`].
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ibapi::prelude::*;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let client = Client::connect("127.0.0.1:4002", 100).await.expect("connection failed");
    ///     let contract = Contract::stock("AAPL").build();
    ///
    ///     let mut quotes = client
    ///         .tick_by_tick(&contract, 10)
    ///         .bid_ask(IgnoreSize::No)
    ///         .await
    ///         .expect("tick-by-tick bid/ask request failed");
    ///
    ///     while let Some(item) = quotes.next().await {
    ///         match item {
    ///             Ok(SubscriptionItem::Data(q)) => println!("{q:?}"),
    ///             Ok(SubscriptionItem::Notice(n)) => eprintln!("notice: {n}"),
    ///             Err(e) => { eprintln!("error: {e}"); break; }
    ///         }
    ///     }
    /// }
    /// ```
    pub fn tick_by_tick<'a>(&'a self, contract: &'a Contract, number_of_ticks: i32) -> TickByTickBuilder<'a, Self> {
        TickByTickBuilder::new(self, contract, number_of_ticks)
    }

    /// Returns a builder for a level-2 market-depth (order book) subscription.
    ///
    /// Defaults to `SmartDepth::No`. See [`MarketDepthBuilder`] for the chained
    /// methods.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ibapi::prelude::*;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let client = Client::connect("127.0.0.1:4002", 100).await.expect("connection failed");
    ///     let contract = Contract::stock("AAPL").build();
    ///
    ///     let mut subscription = client
    ///         .market_depth(&contract, 5)
    ///         .smart_depth(SmartDepth::Yes)
    ///         .subscribe()
    ///         .await
    ///         .expect("market depth request failed");
    ///
    ///     while let Some(item) = subscription.next().await {
    ///         match item {
    ///             Ok(SubscriptionItem::Data(row)) => println!("{row:?}"),
    ///             Ok(SubscriptionItem::Notice(n)) => eprintln!("notice: {n}"),
    ///             Err(e) => { eprintln!("error: {e}"); break; }
    ///         }
    ///     }
    /// }
    /// ```
    pub fn market_depth<'a>(&'a self, contract: &'a Contract, number_of_rows: i32) -> MarketDepthBuilder<'a, Self> {
        MarketDepthBuilder::new(self, contract, number_of_rows)
    }

    /// Requests venues for which market data is returned to market_depth (those with market makers)
    ///
    /// # Examples
    ///
    /// ```no_run
    /// use ibapi::prelude::*;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let client = Client::connect("127.0.0.1:4002", 100).await.expect("connection failed");
    ///     let exchanges = client.market_depth_exchanges().await.expect("error requesting market depth exchanges");
    ///     for exchange in &exchanges {
    ///         println!("{exchange:?}");
    ///     }
    /// }
    /// ```
    pub async fn market_depth_exchanges(&self) -> Result<Vec<DepthMarketDataDescription>, Error> {
        check_version(self.server_version(), Features::REQ_MKT_DEPTH_EXCHANGES)?;

        loop {
            let request = encoders::encode_request_market_depth_exchanges()?;
            let mut subscription = self.shared_request(OutgoingMessages::RequestMktDepthExchanges).send_raw(request).await?;
            let response = subscription.next().await;

            match response {
                Some(Ok(message)) => return decoders::decode_market_depth_exchanges(&message),
                Some(Err(e)) => return Err(e),
                None => {
                    debug!("connection reset. retrying market_depth_exchanges");
                    continue;
                }
            }
        }
    }

    /// Requests real time market data (low-level).
    pub(crate) async fn subscribe_market_data(
        &self,
        contract: &Contract,
        generic_ticks: &[&str],
        snapshot: bool,
        regulatory_snapshot: bool,
    ) -> Result<Subscription<TickTypes>, Error> {
        let builder = self.request();
        let request = encoders::encode_request_market_data(builder.request_id(), contract, generic_ticks, snapshot, regulatory_snapshot)?;

        builder.send::<TickTypes>(request).await
    }
}

/// Validates that server supports the given request.
pub(super) fn validate_tick_by_tick_request(client: &Client, _contract: &Contract, number_of_ticks: i32, ignore_size: bool) -> Result<(), Error> {
    check_version(client.server_version(), Features::TICK_BY_TICK)?;

    if number_of_ticks != 0 || ignore_size {
        check_version(client.server_version(), Features::TICK_BY_TICK_IGNORE_SIZE)?;
    }

    Ok(())
}

pub(crate) async fn market_depth(
    client: &Client,
    contract: &Contract,
    number_of_rows: i32,
    smart_depth: SmartDepth,
) -> Result<Subscription<MarketDepths>, Error> {
    let is_smart_depth = matches!(smart_depth, SmartDepth::Yes);
    if is_smart_depth {
        check_version(client.server_version(), Features::SMART_DEPTH)?;
    }
    if !contract.primary_exchange.is_empty() {
        check_version(client.server_version(), Features::MKT_DEPTH_PRIM_EXCHANGE)?;
    }

    let builder = client.request();
    let request = encoders::encode_request_market_depth(builder.request_id(), contract, number_of_rows, is_smart_depth)?;
    builder
        .send_with_context::<MarketDepths>(request, client.decoder_context().with_smart_depth(is_smart_depth))
        .await
}

pub(crate) async fn tick_by_tick<T: StreamDecoder<T> + Send + 'static>(
    client: &Client,
    contract: &Contract,
    tick_type: &str,
    number_of_ticks: i32,
    ignore_size: bool,
) -> Result<Subscription<T>, Error> {
    validate_tick_by_tick_request(client, contract, number_of_ticks, ignore_size)?;

    let builder = client.request();
    let request = encoders::encode_tick_by_tick(builder.request_id(), contract, tick_type, number_of_ticks, ignore_size)?;
    builder.send::<T>(request).await
}

#[cfg(test)]
#[path = "async_tests.rs"]
mod tests;