ibapi 3.0.1

A Rust implementation of the Interactive Brokers TWS API, providing a reliable and user friendly interface for TWS and IB Gateway. Designed with a focus on simplicity and performance.
Documentation
use std::{
    collections::HashSet,
    sync::{LazyLock, Mutex, RwLock},
};

#[cfg(feature = "sync")]
use std::sync::Arc;

#[cfg(feature = "sync")]
use crossbeam::channel;

use crate::messages::{OutgoingMessages, ResponseMessage};
use crate::Error;

#[cfg(feature = "sync")]
use crate::transport::{InternalSubscription, MessageBus, SubscriptionBuilder};

#[cfg(feature = "async")]
use {
    crate::transport::{
        r#async::{AsyncInternalSubscription, CleanupSignal},
        AsyncMessageBus,
    },
    async_trait::async_trait,
    tokio::sync::broadcast,
};

#[cfg(feature = "async")]
const TEST_BROADCAST_CAPACITY: usize = 1024;

pub(crate) struct MessageBusStub {
    pub request_messages: RwLock<Vec<Vec<u8>>>,
    pub response_messages: Vec<String>,
    /// Pre-built responses (text or proto, in any order). When non-empty,
    /// supersedes `response_messages` — supports true interleaving for tests
    /// that mix dual-format decoders (e.g. OpenOrder text + ExecutionData proto
    /// in the same `place_order` flow at floor 203).
    pub ordered_responses: Vec<ResponseMessage>,
    // pub next_request_id: i32,
    // pub server_version: i32,
    // pub order_id: i32,
}

// Separate tracking for order update subscriptions to maintain backward compatibility
static ORDER_UPDATE_SUBSCRIPTION_TRACKER: LazyLock<Mutex<HashSet<usize>>> = LazyLock::new(|| Mutex::new(HashSet::new()));

impl Default for MessageBusStub {
    fn default() -> Self {
        Self {
            request_messages: RwLock::new(vec![]),
            response_messages: vec![],
            ordered_responses: vec![],
        }
    }
}

impl Drop for MessageBusStub {
    fn drop(&mut self) {
        // Clean up the subscription tracker to prevent test isolation issues
        let stub_id = self as *const _ as usize;
        ORDER_UPDATE_SUBSCRIPTION_TRACKER.lock().unwrap().remove(&stub_id);
    }
}

impl MessageBusStub {
    pub fn with_responses(response_messages: Vec<String>) -> Self {
        Self {
            request_messages: RwLock::new(vec![]),
            response_messages,
            ordered_responses: vec![],
        }
    }

    /// Construct a stub that plays back a heterogeneous, ordered sequence of
    /// pre-built `ResponseMessage` values. Use this when a test interleaves
    /// text- and proto-framed responses (e.g. `place_order` flow with
    /// dual-format `OpenOrder` text alongside proto-only `ExecutionData`).
    pub fn with_ordered_responses(ordered_responses: Vec<ResponseMessage>) -> Self {
        Self {
            request_messages: RwLock::new(vec![]),
            response_messages: vec![],
            ordered_responses,
        }
    }

    pub fn request_messages(&self) -> Vec<Vec<u8>> {
        self.request_messages.read().unwrap().clone()
    }

    /// Materialise configured responses as `ResponseMessage` instances.
    /// Prefers `ordered_responses` (true interleaving) over the legacy
    /// text-only `response_messages` field; only one is non-empty per test.
    pub(crate) fn response_messages_decoded(&self) -> Vec<ResponseMessage> {
        if !self.ordered_responses.is_empty() {
            return self.ordered_responses.clone();
        }
        self.response_messages
            .iter()
            .map(|m| ResponseMessage::from(&m.replace('|', "\0")))
            .collect()
    }
}

#[cfg(feature = "sync")]
impl MessageBus for MessageBusStub {
    fn send_request(&self, request_id: i32, message: &[u8]) -> Result<InternalSubscription, Error> {
        Ok(mock_request(self, Some(request_id), None, message))
    }

    fn cancel_subscription(&self, request_id: i32, packet: &[u8]) -> Result<(), Error> {
        mock_request(self, Some(request_id), None, packet);
        Ok(())
    }

    fn send_order_request(&self, request_id: i32, message: &[u8]) -> Result<InternalSubscription, Error> {
        Ok(mock_request(self, Some(request_id), None, message))
    }

    fn send_message(&self, message: &[u8]) -> Result<(), Error> {
        self.request_messages.write().unwrap().push(message.to_vec());
        Ok(())
    }

    fn create_order_update_subscription(&self) -> Result<InternalSubscription, Error> {
        // Use pointer address as unique identifier for this stub instance
        let stub_id = self as *const _ as usize;

        let mut tracker = ORDER_UPDATE_SUBSCRIPTION_TRACKER.lock().unwrap();
        if !tracker.insert(stub_id) {
            return Err(Error::AlreadySubscribed);
        }
        drop(tracker); // Release lock early

        let (sender, receiver) = channel::unbounded();
        let (signaler, _) = channel::unbounded();

        // Send any pre-configured response messages
        for message in self.response_messages_decoded() {
            sender.send(message.into()).unwrap();
        }

        let subscription = SubscriptionBuilder::new().receiver(receiver).signaler(signaler).build();

        Ok(subscription)
    }

    fn cancel_order_subscription(&self, request_id: i32, packet: &[u8]) -> Result<(), Error> {
        mock_request(self, Some(request_id), None, packet);

        let stub_id = self as *const _ as usize;
        ORDER_UPDATE_SUBSCRIPTION_TRACKER.lock().unwrap().remove(&stub_id);

        Ok(())
    }

    fn send_shared_request(&self, message_type: OutgoingMessages, message: &[u8]) -> Result<InternalSubscription, Error> {
        Ok(mock_request(self, None, Some(message_type), message))
    }

    fn cancel_shared_subscription(&self, message_type: OutgoingMessages, packet: &[u8]) -> Result<(), Error> {
        mock_request(self, None, Some(message_type), packet);
        Ok(())
    }

    fn notice_subscribe(&self) -> crate::subscriptions::notice_stream::sync_impl::NoticeStream {
        // No global notices delivered through the stub; hand back an empty,
        // already-closed channel so callers see end-of-stream cleanly.
        let (_sender, receiver) = channel::unbounded();
        crate::subscriptions::notice_stream::sync_impl::NoticeStream::new(receiver)
    }

    fn ensure_shutdown(&self) {}

    fn is_connected(&self) -> bool {
        true // Stub always returns connected
    }

    // fn process_messages(&mut self, _server_version: i32) -> Result<(), Error> {
    //     Ok(())
    // }
}

#[cfg(feature = "sync")]
fn mock_request(stub: &MessageBusStub, request_id: Option<i32>, message_type: Option<OutgoingMessages>, message: &[u8]) -> InternalSubscription {
    stub.request_messages.write().unwrap().push(message.to_vec());

    let (sender, receiver) = channel::unbounded();
    let (s1, _r1) = channel::unbounded();

    for message in stub.response_messages_decoded() {
        sender.send(message.into()).unwrap();
    }

    let mut subscription = SubscriptionBuilder::new().signaler(s1);
    if let Some(request_id) = request_id {
        subscription = subscription.receiver(receiver).request_id(request_id);
    } else if let Some(message_type) = message_type {
        subscription = subscription.shared_receiver(Arc::new(receiver)).message_type(message_type);
    }

    subscription.build()
}

#[cfg(feature = "async")]
#[async_trait]
impl AsyncMessageBus for MessageBusStub {
    async fn send_request(&self, _request_id: i32, message: Vec<u8>) -> Result<AsyncInternalSubscription, Error> {
        self.request_messages.write().unwrap().push(message);

        let (sender, receiver) = broadcast::channel(TEST_BROADCAST_CAPACITY);
        // Send pre-configured response messages
        for message in self.response_messages_decoded() {
            sender.send(message.into()).unwrap();
        }

        Ok(AsyncInternalSubscription::new(receiver))
    }

    async fn send_order_request(&self, _order_id: i32, message: Vec<u8>) -> Result<AsyncInternalSubscription, Error> {
        self.request_messages.write().unwrap().push(message);

        let (sender, receiver) = broadcast::channel(TEST_BROADCAST_CAPACITY);
        // Send pre-configured response messages
        for message in self.response_messages_decoded() {
            sender.send(message.into()).unwrap();
        }

        Ok(AsyncInternalSubscription::new(receiver))
    }

    async fn send_shared_request(&self, _message_type: OutgoingMessages, message: Vec<u8>) -> Result<AsyncInternalSubscription, Error> {
        self.request_messages.write().unwrap().push(message);

        let (sender, receiver) = broadcast::channel(TEST_BROADCAST_CAPACITY);
        // Send pre-configured response messages
        for message in self.response_messages_decoded() {
            sender.send(message.into()).unwrap();
        }

        Ok(AsyncInternalSubscription::new(receiver))
    }

    async fn send_message(&self, message: Vec<u8>) -> Result<(), Error> {
        self.request_messages.write().unwrap().push(message);
        Ok(())
    }

    async fn cancel_subscription(&self, _request_id: i32, message: Vec<u8>) -> Result<(), Error> {
        self.request_messages.write().unwrap().push(message);
        Ok(())
    }

    async fn cancel_order_subscription(&self, _order_id: i32, _message: Vec<u8>) -> Result<(), Error> {
        Ok(())
    }

    async fn create_order_update_subscription(&self) -> Result<AsyncInternalSubscription, Error> {
        let stub_id = self as *const _ as usize;
        let mut tracker = ORDER_UPDATE_SUBSCRIPTION_TRACKER.lock().unwrap();
        if !tracker.insert(stub_id) {
            return Err(Error::AlreadySubscribed);
        }
        drop(tracker);

        let (sender, receiver) = broadcast::channel(TEST_BROADCAST_CAPACITY);

        // Send pre-configured response messages
        for message in self.response_messages_decoded() {
            sender.send(message.into()).unwrap();
        }

        let (cleanup_sender, mut cleanup_receiver) = tokio::sync::mpsc::unbounded_channel();
        tokio::spawn(async move {
            while let Some(signal) = cleanup_receiver.recv().await {
                if matches!(signal, CleanupSignal::OrderUpdateStream) {
                    ORDER_UPDATE_SUBSCRIPTION_TRACKER.lock().unwrap().remove(&stub_id);
                    break;
                }
            }
        });

        Ok(AsyncInternalSubscription::with_cleanup(
            receiver,
            cleanup_sender,
            CleanupSignal::OrderUpdateStream,
        ))
    }

    fn notice_subscribe(&self) -> crate::subscriptions::notice_stream::async_impl::NoticeStream {
        // No global notices delivered through the stub; the broadcast channel
        // closes immediately so callers see end-of-stream cleanly.
        let (_sender, receiver) = broadcast::channel(1);
        crate::subscriptions::notice_stream::async_impl::NoticeStream::new(receiver)
    }

    async fn ensure_shutdown(&self) {
        // No-op for test stub
    }

    fn request_shutdown_sync(&self) {
        // No-op for test stub
    }

    fn is_connected(&self) -> bool {
        true // Stub always returns connected
    }
}

#[cfg(test)]
#[path = "stubs_tests.rs"]
mod tests;