ibapi 2.11.2

A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance.
Documentation
//! Asynchronous implementation of scanner functionality

use super::common::{decoders, encoders};
use super::*;
use crate::messages::OutgoingMessages;
#[cfg(not(feature = "sync"))]
use crate::messages::{IncomingMessages, RequestMessage, ResponseMessage};
use crate::orders::TagValue;
use crate::subscriptions::Subscription;
#[cfg(not(feature = "sync"))]
use crate::subscriptions::{DecoderContext, StreamDecoder};
use crate::{server_versions, Client, Error};

#[cfg(not(feature = "sync"))]
impl StreamDecoder<Vec<ScannerData>> for Vec<ScannerData> {
    const RESPONSE_MESSAGE_IDS: &'static [IncomingMessages] = &[IncomingMessages::ScannerData];

    fn decode(_context: &DecoderContext, message: &mut ResponseMessage) -> Result<Vec<ScannerData>, Error> {
        decoders::decode_scanner_message(message)
    }

    fn cancel_message(_server_version: i32, request_id: Option<i32>, _context: Option<&DecoderContext>) -> Result<RequestMessage, Error> {
        let request_id = request_id.expect("Request ID required to encode cancel scanner subscription.");
        encoders::encode_cancel_scanner_subscription(request_id)
    }
}

/// Requests an XML list of scanner parameters valid in TWS.
pub(crate) async fn scanner_parameters(client: &Client) -> Result<String, Error> {
    let request = encoders::encode_scanner_parameters()?;
    let mut subscription = client.send_shared_request(OutgoingMessages::RequestScannerParameters, request).await?;

    match subscription.next().await {
        Some(Ok(message)) => decoders::decode_scanner_parameters(message),
        Some(Err(e)) => Err(e),
        None => Err(Error::UnexpectedEndOfStream),
    }
}

/// Starts a subscription to market scan results based on the provided parameters.
pub(crate) async fn scanner_subscription(
    client: &Client,
    subscription: &ScannerSubscription,
    filter: &Vec<TagValue>,
) -> Result<Subscription<Vec<ScannerData>>, Error> {
    if !filter.is_empty() {
        client.check_server_version(
            server_versions::SCANNER_GENERIC_OPTS,
            "It does not support API scanner subscription generic filter options.",
        )?
    }

    let request_id = client.next_request_id();
    let request = encoders::encode_scanner_subscription(request_id, client.server_version(), subscription, filter)?;
    let internal_subscription = client.send_request(request_id, request).await?;

    Ok(Subscription::new_from_internal::<Vec<ScannerData>>(
        internal_subscription,
        client.message_bus.clone(),
        Some(request_id),
        None,
        None,
        client.decoder_context(),
    ))
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::contracts::{Exchange, SecurityType, Symbol};
    use crate::orders::TagValue;
    use crate::server_versions;
    use crate::stubs::MessageBusStub;
    use std::sync::{Arc, RwLock};

    #[tokio::test]
    async fn test_scanner_parameters() {
        let message_bus = Arc::new(MessageBusStub {
            request_messages: RwLock::new(vec![]),
            response_messages: vec![
                "19|2|<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<ScanParameterResponse>\n<InstrumentList>...</InstrumentList>\n</ScanParameterResponse>".to_owned(),
            ],
        });

        let client = Client::stubbed(message_bus.clone(), server_versions::SCANNER_GENERIC_OPTS);

        let result = client.scanner_parameters().await;
        assert!(result.is_ok(), "failed to request scanner parameters: {}", result.err().unwrap());

        let request_messages = message_bus.request_messages.read().unwrap();
        assert_eq!(request_messages[0].encode_simple(), "24|1|");

        let scanner_params = result.unwrap();
        assert!(scanner_params.contains("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"));
        assert!(scanner_params.contains("<ScanParameterResponse>"));
        assert!(scanner_params.contains("<InstrumentList>"));
    }

    #[tokio::test]
    async fn test_scanner_subscription() {
        let message_bus = Arc::new(MessageBusStub {
            request_messages: RwLock::new(vec![]),
            response_messages: vec![
                "20\03\09000\010\00\0670777621\0SVMH\0STK\0\00\0\0SMART\0USD\0SVMH\0NMS\0NMS\0\0\0\0\01\0536918651\0GTI\0STK\0\00\0\0SMART\0USD\0GTI\0NMS\0NMS\0\0\0\0\02\0526726639\0LITM\0STK\0\00\0\0SMART\0USD\0LITM\0SCM\0SCM\0\0\0\0\03\0504716446\0LCID\0STK\0\00\0\0SMART\0USD\0LCID\0NMS\0NMS\0\0\0\0\04\0547605251\0RGTI\0STK\0\00\0\0SMART\0USD\0RGTI\0SCM\0SCM\0\0\0\0\05\0653568762\0AVGR\0STK\0\00\0\0SMART\0USD\0AVGR\0SCM\0SCM\0\0\0\0\06\04815747\0NVDA\0STK\0\00\0\0SMART\0USD\0NVDA\0NMS\0NMS\0\0\0\0\07\0534453483\0HOUR\0STK\0\00\0\0SMART\0USD\0HOUR\0SCM\0SCM\0\0\0\0\08\0631370187\0LAES\0STK\0\00\0\0SMART\0USD\0LAES\0SCM\0SCM\0\0\0\0\09\0689954925\0XTIA\0STK\0\00\0\0SMART\0USD\0XTIA\0SCM\0SCM\0\0\0\0\0".to_owned(),
            ],
        });

        let client = Client::stubbed(message_bus.clone(), server_versions::SCANNER_GENERIC_OPTS);

        let subscription = ScannerSubscription {
            number_of_rows: 10,
            instrument: Some("FUT".to_string()),
            location_code: Some("FUT.US".to_string()),
            scan_code: Some("TOP_PERC_GAIN".to_string()),
            above_price: Some(50.0),
            below_price: Some(100.0),
            above_volume: Some(1000),
            average_option_volume_above: Some(100),
            market_cap_above: Some(1000000.0),
            market_cap_below: Some(10000000.0),
            moody_rating_above: Some("A".to_string()),
            moody_rating_below: Some("AAA".to_string()),
            sp_rating_above: Some("A".to_string()),
            sp_rating_below: Some("AAA".to_string()),
            maturity_date_above: Some("20230101".to_string()),
            maturity_date_below: Some("20231231".to_string()),
            coupon_rate_above: Some(2.0),
            coupon_rate_below: Some(5.0),
            exclude_convertible: true,
            scanner_setting_pairs: Some("Annual,true".to_string()),
            stock_type_filter: Some("CORP".to_string()),
        };

        let filter = vec![
            TagValue {
                tag: "scannerType".to_string(),
                value: "TOP_PERC_GAIN".to_string(),
            },
            TagValue {
                tag: "numberOfRows".to_string(),
                value: "10".to_string(),
            },
        ];

        let result = client.scanner_subscription(&subscription, &filter).await;
        assert!(result.is_ok(), "failed to request scanner subscription: {}", result.err().unwrap());

        // Now verify we can parse the scanner data responses
        let mut subscription = result.unwrap();
        let scanner_data = match subscription.next().await {
            Some(Ok(data)) => data,
            Some(Err(e)) => panic!("Error getting scanner results: {e}"),
            None => panic!("Unexpected end of stream"),
        };

        assert_eq!(scanner_data.len(), 10);

        // Verify first scanner data entry
        let first = &scanner_data[0];
        assert_eq!(first.rank, 0);
        assert_eq!(first.contract_details.contract.symbol, Symbol::from("SVMH"));
        assert_eq!(first.contract_details.contract.security_type, SecurityType::Stock);
        assert_eq!(first.contract_details.contract.exchange, Exchange::from("SMART"));

        // Verify second scanner data entry
        let second = &scanner_data[1];
        assert_eq!(second.rank, 1);
        assert_eq!(second.contract_details.contract.symbol, Symbol::from("GTI"));
        assert_eq!(second.contract_details.contract.security_type, SecurityType::Stock);
        assert_eq!(second.contract_details.contract.exchange, Exchange::from("SMART"));

        // Verify third scanner data entry
        let third = &scanner_data[2];
        assert_eq!(third.rank, 2);
        assert_eq!(third.contract_details.contract.symbol, Symbol::from("LITM"));
        assert_eq!(third.contract_details.contract.security_type, SecurityType::Stock);
        assert_eq!(third.contract_details.contract.exchange, Exchange::from("SMART"));

        // Verify request parameters were encoded correctly
        {
            let request_messages = message_bus.request_messages.read().unwrap();
            let scanner_request = format!(
                "22|9000|10|FUT|FUT.US|TOP_PERC_GAIN|50|100|1000|1000000|10000000|A|AAA|A|AAA|20230101|20231231|2|5|1|100|Annual,true|CORP|scannerType=TOP_PERC_GAIN;numberOfRows=10;||",
            );
            assert_eq!(request_messages[0].encode_simple(), scanner_request);
        } // Lock is released here

        // Explicitly cancel the subscription
        subscription.cancel().await;

        // Verify cancel request was sent
        let request_messages = message_bus.request_messages.read().unwrap();
        assert_eq!(request_messages.len(), 2, "Expected 2 messages (request + cancel)");
        assert_eq!(request_messages[1].encode_simple(), "23|1|9000|");
    }

    #[tokio::test]
    async fn test_scanner_subscription_drop_sends_cancel() {
        let message_bus = Arc::new(MessageBusStub {
            request_messages: RwLock::new(vec![]),
            response_messages: vec!["20\03\09000\01\00\0670777621\0SVMH\0STK\0\00\0\0SMART\0USD\0SVMH\0NMS\0NMS\0\0\0\0\0".to_owned()],
        });

        let client = Client::stubbed(message_bus.clone(), server_versions::SCANNER_GENERIC_OPTS);

        let subscription = ScannerSubscription {
            number_of_rows: 1,
            scan_code: Some("TOP_PERC_GAIN".to_string()),
            ..Default::default()
        };

        let result = client.scanner_subscription(&subscription, &vec![]).await;
        assert!(result.is_ok(), "failed to request scanner subscription: {}", result.err().unwrap());

        let mut subscription = result.unwrap();

        // Read one message to ensure subscription is active
        let _ = subscription.next().await;

        // Verify initial request was sent
        {
            let request_messages = message_bus.request_messages.read().unwrap();
            assert_eq!(request_messages.len(), 1, "Expected 1 request message");
        } // Lock is released here

        // Drop the subscription without calling cancel()
        drop(subscription);

        // Give some time for the async drop task to complete
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

        // Verify cancel request was sent by drop
        let request_messages = message_bus.request_messages.read().unwrap();
        assert_eq!(request_messages.len(), 2, "Expected 2 messages (request + cancel from drop)");
        assert_eq!(request_messages[1].encode_simple(), "23|1|9000|");
    }

    #[tokio::test]
    async fn test_scanner_subscription_no_double_cancel() {
        let message_bus = Arc::new(MessageBusStub {
            request_messages: RwLock::new(vec![]),
            response_messages: vec!["20\03\09000\01\00\0670777621\0SVMH\0STK\0\00\0\0SMART\0USD\0SVMH\0NMS\0NMS\0\0\0\0\0".to_owned()],
        });

        let client = Client::stubbed(message_bus.clone(), server_versions::SCANNER_GENERIC_OPTS);

        let subscription = ScannerSubscription {
            number_of_rows: 1,
            scan_code: Some("TOP_PERC_GAIN".to_string()),
            ..Default::default()
        };

        let result = client.scanner_subscription(&subscription, &vec![]).await;
        let subscription = result.unwrap();

        // Explicitly cancel
        subscription.cancel().await;

        {
            let request_messages = message_bus.request_messages.read().unwrap();
            assert_eq!(request_messages.len(), 2, "Expected 2 messages after cancel");
        }

        // Drop the already-cancelled subscription
        drop(subscription);

        // Give some time to ensure no additional cancel is sent
        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

        // Verify no additional cancel request was sent
        let request_messages = message_bus.request_messages.read().unwrap();
        assert_eq!(request_messages.len(), 2, "Expected still only 2 messages (no double cancel)");
    }
}